分析ZooKeeper分布式锁的实现

目录一、分布式锁方案比较二、ZooKeeper实现分布式锁2.1、方案一2.2、方案二

一、分布式锁方案比较

方案 实现思路 优点 缺点 利用 MySQL 的实现方案 利用数据库自身提供的锁机制实现,要求数据库支持行级锁 实现简单 性能差,无法适应高并发场景;容易出现死锁的情况;无法优雅的实现阻塞式锁 利用 Redis 的实现方案 使用 Setnx 和 lua 脚本机制实现,保证对缓存操作序列的原子性 性能好 实现相对复杂,有可能出现死锁;无法优雅的实现阻塞式锁 利用 ZooKeeper 的实现方案 基于 ZooKeeper 节点特性及 watch 机制实现 性能好,稳定可靠性高,能较好地实现阻塞式锁 实现相对复杂

二、ZooKeeper实现分布式锁

这里使用 ZooKeeper 来实现分布式锁,以50个并发请求来获取订单编号为例,描述两种方案,第一种为基础实现,第二种在第一种基础上进行了优化。

2.1、方案一

流程描述:

具体代码:

OrderNumGenerator:

/** * @Description 生成随机订单号 */public class OrderNumGenerator {    private static long count = 0;    /**     * 使用日期加数值拼接成订单号     */    public String getOrderNumber() throws Exception {        String date = DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now());        String number = new DecimalFormat("000000").format(count++);        return date + number;    }}

Lock:

/** * @Description 自定义锁接口 */public interface Lock {    /**     * 获取锁     */    public void getLock();    /**     * 释放锁     */    public void unLock();}

AbstractLock:

/** * @Description 定义一个模板,具体的方法由子类来实现 */public abstract class AbstractLock implements Lock {    /**     * 获取锁     */    @Override    public void getLock() {        if (tryLock()) {            System.out.println("--------获取到了自定义Lock锁的资源--------");        } else {            // 没拿到锁则阻塞,等待拿锁            waitLock();            getLock();        }    }    /**     * 尝试获取锁,如果拿到了锁返回true,没有拿到则返回false     */    public abstract boolean tryLock();    /**     * 阻塞,等待获取锁     */    public abstract void waitLock();}

ZooKeeperAbstractLock:

/** * @Description 定义需要的服务连接 */public abstract class ZooKeeperAbstractLock extends AbstractLock {    private static final String SERVER_ADDR = "192.168.182.130:2181,192.168.182.131:2181,192.168.182.132:2181";    protected ZkClient zkClient = new ZkClient(SERVER_ADDR);    protected static final String PATH = "/lock";}

ZooKeeperDistrbuteLock:

/** * @Description 真正实现锁的细节 */public class ZooKeeperDistrbuteLock extends ZooKeeperAbstractLock {    private CountDownLatch countDownLatch = null;    /**     * 尝试拿锁     */    @Override    public boolean tryLock() {        try {            // 创建临时节点            zkClient.createEphemeral(PATH);            return true;        } catch (Exception e) {            // 创建失败报异常            return false;        }    }    /**     * 阻塞,等待获取锁     */    @Override    public void waitLock() {        // 创建监听        IZkDataListener iZkDataListener = new IZkDataListener() {            @Override            public void handleDataChange(String s, Object o) throws Exception {            }            @Override            public void handleDataDeleted(String s) throws Exception {                // 释放锁,删除节点时唤醒等待的线程                if (countDownLatch != null) {                    countDownLatch.countDown();                }            }        };        // 注册监听        zkClient.subscribeDataChanges(PATH, iZkDataListener);        // 节点存在时,等待节点删除唤醒        if (zkClient.exists(PATH)) {            countDownLatch = new CountDownLatch(1);            try {                countDownLatch.await();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        // 删除监听        zkClient.unsubscribeDataChanges(PATH, iZkDataListener);    }    /**     * 释放锁     */    @Override    public void unLock() {        if (zkClient != null) {            System.out.println("释放锁资源");            zkClient.delete(PATH);            zkClient.close();        }    }}

测试效果:使用50个线程来并发测试ZooKeeper实现的分布式锁

/** * @Description 使用50个线程来并发测试ZooKeeper实现的分布式锁 */public class OrderService {    private static class OrderNumGeneratorService implements Runnable {        private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();;        private Lock lock = new ZooKeeperDistrbuteLock();        @Override        public void run() {            lock.getLock();            try {                System.out.println(Thread.currentThread().getName() + ", 生成订单编号:"  + orderNumGenerator.getOrderNumber());            } catch (Exception e) {                e.printStackTrace();            } finally {                lock.unLock();            }        }    }    public static void main(String[] args) {        System.out.println("----------生成唯一订单号----------");        for (int i = 0; i < 50; i++) {            new Thread(new OrderNumGeneratorService()).start();        }    }}

2.2、方案二

方案二在方案一的基础上进行优化,避免产生“羊群效应”,方案一一旦临时节点删除,释放锁,那么其他在监听这个节点变化的线程,就会去竞争锁,同时访问 ZooKeeper,那么怎么更好的避免各线程的竞争现象呢,就是使用临时顺序节点,临时顺序节点排序,每个临时顺序节点只监听它本身的前一个节点变化。

流程描述:

具体代码

具体只需要将方案一中的 ZooKeeperDistrbuteLock 改变,增加一个 ZooKeeperDistrbuteLock2,测试代码中使用 ZooKeeperDistrbuteLock2 即可测试,其他代码都不需要改变。

/** * @Description 真正实现锁的细节 */public class ZooKeeperDistrbuteLock2 extends ZooKeeperAbstractLock {    private CountDownLatch countDownLatch = null;    /**     * 当前请求节点的前一个节点     */    private String beforePath;    /**     * 当前请求的节点     */    private String currentPath;    public ZooKeeperDistrbuteLock2() {        if (!zkClient.exists(PATH)) {            // 创建持久节点,保存临时顺序节点            zkClient.createPersistent(PATH);        }    }    @Override    public boolean tryLock() {        // 如果currentPath为空则为第一次尝试拿锁,第一次拿锁赋值currentPath        if (currentPath == null || currentPath.length() == 0) {            // 在指定的持久节点下创建临时顺序节点            currentPath = zkClient.createEphemeralSequential(PATH + "/", "lock");        }        // 获取所有临时节点并排序,例如:000044        List<String> childrenList = zkClient.getChildren(PATH);        Collections.sort(childrenList);        if (currentPath.equals(PATH + "/" + childrenList.get(0))) {            // 如果当前节点在所有节点中排名第一则获取锁成功            return true;        } else {            int wz = Collections.binarySearch(childrenList, currentPath.substring(6));            beforePath = PATH + "/" + childrenList.get(wz - 1);        }        return false;    }    @Override    public void waitLock() {        // 创建监听        IZkDataListener iZkDataListener = new IZkDataListener() {            @Override            public void handleDataChange(String s, Object o) throws Exception {            }            @Override            public void handleDataDeleted(String s) throws Exception {                // 释放锁,删除节点时唤醒等待的线程                if (countDownLatch != null) {                    countDownLatch.countDown();                }            }        };        // 注册监听,这里是给排在当前节点前面的节点增加(删除数据的)监听,本质是启动另外一个线程去监听前置节点        zkClient.subscribeDataChanges(beforePath, iZkDataListener);        // 前置节点存在时,等待前置节点删除唤醒        if (zkClient.exists(beforePath)) {            countDownLatch = new CountDownLatch(1);            try {                countDownLatch.await();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        // 删除对前置节点的监听        zkClient.unsubscribeDataChanges(beforePath, iZkDataListener);    }    /**     * 释放锁     */    @Override    public void unLock() {        if (zkClient != null) {            System.out.println("释放锁资源");            zkClient.delete(currentPath);            zkClient.close();        }    }}

以上就是分析ZooKeeper分布式锁的实现的详细内容,更多关于ZooKeeper分布式锁的资料请关注其它相关文章!

只想到处流浪人生就像一场旅行,不必在乎目的地,

分析ZooKeeper分布式锁的实现

相关文章:

你感兴趣的文章:

标签云: