Ⅵ:zookeeper的Watcher事件监听机制

?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ?? ??

文章目录??前置:–》把握住Watcher流程《–????1、watcher的连接状态判断????2、watcher机制下的exists????Ⅰ、连接对象的监听器????Ⅱ、自定义watcher????Ⅲ、watcher的多次监听????Ⅳ、多个watcher同时监听一个节点????3、watcher机制下的getData????Ⅰ、连接对象的监听器????Ⅱ、自定义watcher监听器????Ⅲ、多次watcher监听????Ⅳ、多个watcher同时监听一个节点????4、watcher机制下的getChildren????Ⅰ、连接对象的监视器????Ⅱ、自定义watcher监听器????Ⅲ、多次watcher监听????Ⅳ、多个watcher同时监听一个节点??


xshell7连接云服务器演示结果,如果未知请看第一章

前置:–》把握住Watcher流程《–

1、连接zookeeper服务器 2、连接时必须使当前线程等待(等待其他线程创建连接zookeeper服务成功,使用计数器实现) 3、执行回调函数process 4、释放当前线程

1、watcher的连接状态判断package com.zookeeper.watcher;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.io.IOException;import java.util.concurrent.CountDownLatch;/** * @author:抱着鱼睡觉的喵喵 * @date:2021/5/7 * @description: */public class WatcherConnection implements Watcher {//计数器,使当前线程等待其他线程完成 static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; public static void main(String[] args) { try { //连接zookeeper服务 zooKeeper = new ZooKeeper(“8.140.37.103:2181”, 5000, new WatcherConnection()); //使当前线程等待其他线程完成(其他线程也就是连接zookeeper服务的线程) countDownLatch.await(); Thread.sleep(1000); zooKeeper.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } //回调函数,进性状态的判断 @Override public void process(WatchedEvent watchedEvent) { try { if (watchedEvent.getType() == Event.EventType.None) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println(“连接成功!”); countDownLatch.countDown(); } else if (watchedEvent.getState() == Event.KeeperState.Disconnected) { System.out.println(“断开连接”); } else if (watchedEvent.getState() == Event.KeeperState.Expired) { System.out.println(“超时了”); } else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) { System.out.println(“认证失败!”); } } } catch (Exception e) { e.printStackTrace(); } }}2、watcher机制下的existsⅠ、连接对象的监听器public class WatcherExistsTest { private String IP = “8.140.37.103:2181”; private ZooKeeper zookeeper; @Before public void connection() throws IOException, InterruptedException { //计数器对象,使当前线程等待其他线程的完成 final CountDownLatch downLatch = new CountDownLatch(1); zookeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //判断是否连接成功 if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了) downLatch.countDown(); System.out.println(“连接成功!”); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); //主线程进入等待态 downLatch.await(); } @Test public void watcherExists() throws KeeperException, InterruptedException { //第一个参数是节点路径 //第二个参数为Boolean类型,true代表监听path下的节点,false表示不进行监听 zookeeper.exists(“/exists”, true); Thread.sleep(10000); } @After public void close() { try { zookeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }}

此时在zookeeper客户端创建/exists节点

IDEA控制台就会出现NodeCreated??在这里插入代码片??

当然还有删除节点的NodeDeleted等,不再演示

Ⅱ、自定义watcherpublic class WatcherExistsTest { private String IP = “8.140.37.103:2181”; private ZooKeeper zookeeper; @Before public void connection() throws IOException, InterruptedException { //计数器对象,使当前线程等待其他线程的完成 final CountDownLatch downLatch = new CountDownLatch(1); zookeeper = new ZooKeeper(IP, 6000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了) downLatch.countDown(); } } }); //主线程进入等待态 downLatch.await(); } @Test public void watcherExists2() throws KeeperException, InterruptedException { zookeeper.exists(“/exists2”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“自定义watcher!”); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); Thread.sleep(10000); System.out.println(“————–“); } @After public void close() { try { zookeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } }}

执行@Test注解方法-》客户端创建/exists2节点-》IDEA控制台查看结果

当我修改/exists2节点的数据时,控制台出现了NodeDataChanged

Ⅲ、watcher的多次监听

本质上只能进性一次注册,一次监听;当然可以利用循环调用进行生命周期内的多次监听

@Test public void watcherExists2() throws KeeperException, InterruptedException { zookeeper.exists(“/exists2”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println(“自定义watcher!”); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); zookeeper.exists(“/exists2”, this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(10000); System.out.println(“————–“); }

Ⅳ、多个watcher同时监听一个节点

一般来说这种多个监听对象才比较符合发布-订阅模式,当节点中的数据发生变化时,会通知所有的监听对象。

@Test public void watcherExists3() throws KeeperException, InterruptedException { System.out.println(“============================”); zookeeper.exists(“/exists3”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象1”); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zookeeper.exists(“/exists3”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象2”); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zookeeper.exists(“/exists3”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象3”); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(10000); System.out.println(“==========================”); }


3、watcher机制下的getData

getData(String path, boolean b, Stat stat)连接对象的监听器 getData(String path, watcher watcher, Stat stat) 自定义的监听器

Ⅰ、连接对象的监听器public class WatcherGetDataTest { static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; final String IP = “8.140.37.103:2181”; @Before public void before() throws IOException, InterruptedException { zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println(“=================”); countDownLatch.countDown(); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); countDownLatch.await(); } @Test public void test() throws KeeperException, InterruptedException { zooKeeper.getData(“/data”,true, null); Thread.sleep(10000); System.out.println(“=======================”); } @After public void after() throws InterruptedException { zooKeeper.close(); }}

启动测试-》修改data节点的数据-》查看idea控制台结果

Ⅱ、自定义watcher监听器@Test public void test2() throws KeeperException, InterruptedException { System.out.println(“========================”); zooKeeper.getData(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }, null); Thread.sleep(10000); System.out.println(“============================”); }

Ⅲ、多次watcher监听@Test public void test3() throws KeeperException, InterruptedException { System.out.println(“=========================”); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); zooKeeper.getData(“/data”, this, null); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }; zooKeeper.getData(“/data”, watcher, null); Thread.sleep(5000); System.out.println(“=======================”); }

Ⅳ、多个watcher同时监听一个节点@Test public void test4() throws KeeperException, InterruptedException { System.out.println(“=======================”); zooKeeper.getData(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象1”); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); zooKeeper.getData(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象2”); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); zooKeeper.getData(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象3”); System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }, null); Thread.sleep(5000); System.out.println(“========================”); } @After public void after() throws InterruptedException { zooKeeper.close(); }

4、watcher机制下的getChildren

getChildren(String path, boolean b) //使用连接对象的监视器 getChildren(String path, watcher w) //自定义监视器 子节点的修改不会被监测到

Ⅰ、连接对象的监视器public class WatcherGetChildrenTest { static CountDownLatch countDownLatch = new CountDownLatch(1); static ZooKeeper zooKeeper; final String IP = “8.140.37.103:2181”; @Before public void before() throws IOException, InterruptedException { zooKeeper = new ZooKeeper(IP, 5000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { System.out.println(“=================”); countDownLatch.countDown(); } System.out.println(watchedEvent.getPath()); System.out.println(watchedEvent.getType()); } }); countDownLatch.await(); } @Test public void test() throws KeeperException, InterruptedException { zooKeeper.getChildren(“/data”, true); Thread.sleep(5000); } @After public void after() throws InterruptedException { zooKeeper.close(); }}

Ⅱ、自定义watcher监听器@Test public void test2() throws KeeperException, InterruptedException { zooKeeper.getChildren(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“==================”); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(10000); System.out.println(“====================”); }

Ⅲ、多次watcher监听@Test public void test3() throws KeeperException, InterruptedException { zooKeeper.getChildren(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“================”); if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) { try { System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); zooKeeper.getChildren(“/data”, this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); Thread.sleep(5000); }

Ⅳ、多个watcher同时监听一个节点@Test public void test4() throws KeeperException, InterruptedException { System.out.println(“==================================”); zooKeeper.getChildren(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象1”); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getType()); } }); zooKeeper.getChildren(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象2”); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); zooKeeper.getChildren(“/data”, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println(“监听对象3”); System.out.println(watchedEvent.getType()); System.out.println(watchedEvent.getPath()); } }); Thread.sleep(5000); System.out.println(“================================”); } @After public void after() throws InterruptedException { zooKeeper.close(); }

旁观者的姓名永远爬不到比赛的计分板上。

Ⅵ:zookeeper的Watcher事件监听机制

相关文章:

你感兴趣的文章:

标签云: