51工具盒子

依楼听风雨
笑看云卷云舒,淡观潮起潮落

Zookeeper实现分布式锁的原理是什么?

# (一)引言 {#一-引言}

在单体环境中,遇到临界资源的时候我们会使用Synchronized或者RetreenLock在调用临界资源前上锁。但是在分布式的环境下,锁住单体资源就不起作用了,这个时候就需要用到分布式锁。分布式锁的原理就是借用外部的一个系统来充当锁的作用,比如Mysql、Redis、Zookeeper等都可以用作分布式锁。在实际业务中,Redis和Zookeeper用到的最多。

# (二)Zookeeper锁的原理 {#二-zookeeper锁的原理}

锁分为两种:共享锁 (读锁)和排他锁 (写锁) 读锁:当有一个线程获取读锁后,其他线程也可以获取读锁,但是在读锁没有完全被释放之前,其他线程不能获取写锁。 写锁:当有一个线程获取写锁后,其他线程就无法获取读锁和写锁了

zookeeper有一种节点类型叫做临时序号节点,它会按序号自增地创建临时节点,这正好可以作为分布式锁的实现工具。

读锁获取原理: 1、根据资源的id创建临时序号节点:/lock/mylockR0000000005 Read 2、获取/lock下的所有子节点,判断比他小的节点是否全是 读锁,如果是读锁则获取锁成功 3、如果不是,则阻塞等待,监听自己的前一个节点。 4、当前面一个节点发生变更时,重新执行第二步操作。

写锁获取原理: 1、根据资源的id创建临时序号节点:/lock/mylockW0000000006 Write 2、获取 /lock 下所有子节点,判断最小的节点是否为自己 ,如果是则获锁成功 3、如果不是,则阻塞等待,监听自己的前一个节点 4、当前面一个节点发生变更时,重新执行第二步。

通过一张图更清晰地看出现象:首先是写锁,因为写锁不是最前面的节点,所以阻塞了,008读锁因为前面并不是所有都是读锁,所以阻塞了

释放锁: 删除对应的临时节点即可,如果服务器宕机了,因为临时节点的原理也不会发生死锁的情况。

# (三)代码实现 {#三-代码实现}

真实的场景中,一般来说为了效率不会上读锁,想想看如果有人在查看数据,你就不能去修改了,这样效率是不是特别低。这里用代码实现分布式写锁,首先自己定义一个锁类

@Data@AllArgsConstructor@NoArgsConstructorpublic class Lock {
    private String lockId;
    private String path;
    private boolean active;

    public Lock(String lockId, String nodePath) {         this.lockId=lockId;         this.path=nodePath;     }}


再通过Zookeeper写一个加锁工具类,代码已经给了注释,里面的实现原理和上面所讲的写锁获取原理一致:

public class ZookeeperLock {
    private String server="192.168.78.128:2181";
    private ZkClient zkClient;
    private static final String rootPath="/lock";

    //初始化ZkClient,并创建根节点     public ZookeeperLock(){         zkClient=new ZkClient(server,5000,20000);         buildRoot();     }

    //创建根节点     public void buildRoot(){         //如果根节点不存在,就创建         if (!zkClient.exists(rootPath)){             zkClient.createPersistent(rootPath);             System.out.println("创建根节点成功");         }     }

    public Lock lock(String lockId,long timeout){         //创建一个临时节点         Lock lockNode=createLockNode(lockId);         //尝试去激活锁         lockNode=tryActiveLock(lockNode);         //如果没有激活,则等待timeout的时间         if (!lockNode.isActive()){             try {                 synchronized (lockNode){                     lockNode.wait(timeout);                 }             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         //timeout时间内节点还未释放,就报lock timeout错误         if (!lockNode.isActive()){             throw new RuntimeException("lock timeout");         }         return lockNode;     }

    //释放锁     public void unlock(Lock lock){         if (lock.isActive()){             zkClient.delete(lock.getPath());         }     }          //尝试激活锁     private Lock tryActiveLock(Lock lockNode){         //获取所有的子节点         List<String> childList = zkClient.getChildren(rootPath)                 .stream()                 .sorted()                 .map(p -> rootPath + "/" + p)                 .collect(Collectors.toList());         //获取第一个元素         String firstNodePath = childList.get(0);         //如果自己就是第一个节点,就激活锁         if (firstNodePath.equals(lockNode.getPath())){             lockNode.setActive(true);         }else {             //否则监听前一个锁             String upNodePath = childList.get(childList.indexOf(lockNode.getPath())-1);             zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {                 @Override                 public void handleDataChange(String s, Object o) throws Exception {

                }                 //如果前面一个节点被删除了,再次尝试获取锁                 @Override                 public void handleDataDeleted(String s) throws Exception {                     System.out.println("节点删除"+s);                     Lock lock=tryActiveLock(lockNode);                     synchronized (lockNode){                         if (lock.isActive()){                             lockNode.notify();                         }                     }                     zkClient.unsubscribeDataChanges(upNodePath,this);                 }             });         }         return lockNode;     }

    public Lock createLockNode(String lockId) {         String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "lock");         return new Lock(lockId, nodePath);     }}


# (四)测试 {#四-测试}

上面写的这个工具类,以后可以直接拿过来用,我们来测试一下,首先是不加锁开100个线程去加一个变量:

public class Test {
    private int flag=0;
    private ZookeeperLock zookeeperLock=new ZookeeperLock();
    @Test
    public void testLock() throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.submit(()->{
                flag++;
            });
        }

        executorService.shutdown();         executorService.awaitTermination(10, TimeUnit.SECONDS);         System.out.println(flag);     }}


最后的返回结果永远到不了100,因为存在更新丢失。 加上锁:

public class Test {
    private int flag=0;
    private ZookeeperLock zookeeperLock=new ZookeeperLock();
    @Test
    public void testLock() throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.submit(()->{
                Lock lock = zookeeperLock.lock("myLock", 60 * 1000);
                flag++;
                zookeeperLock.unlock(lock);
            });
        }

        executorService.shutdown();         executorService.awaitTermination(10, TimeUnit.SECONDS);         System.out.println(flag);     }}


最后的结果一直都是100。

# (五)总结 {#五-总结}

只要懂得分布式锁的原理,代码的实现就会变得十分简单。你会累是因为你在走上坡路!我们下期再见。

赞(6)
未经允许不得转载:工具盒子 » Zookeeper实现分布式锁的原理是什么?