irpas技术客

09、Hadoop框架Zookeeper Java API_liangzai2048

未知 1462

文章目录 Hadoop框架Zookeeper Java API引入zookeeper依赖测试连接1、新建连接2、创建临时节点3、运行测试 ZKJavaAPI名词解析创建永久节点创建临时节点获取节点数据修改数据删除节点事件完整代码

Hadoop框架Zookeeper Java API 引入zookeeper依赖

??去Maven官网引入Zookeeper依赖。

??选择3.4.6版本,复制到IDEA的pom文件里

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>

??新建ZOOKEEPER包

??新建ZKJavaAPI

测试连接 1、新建连接

??这里需要抛出异常

// 1、新建连接 ZooKeeper zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); 2、创建临时节点

??这里需要抛出异常

zk.create("/test1" ,"abcdefg".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT ); 3、运行测试 package com.liangzai.ZOOKEEPER; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; public class ZKJavaAPI { public static void main(String[] args) throws IOException, InterruptedException, KeeperException { // 1、新建连接 ZooKeeper zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); // 2、创建临时节点 zk.create("/test1" ,"abcdefg".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL ); } } 控制台输出结果: log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Process finished with exit code 0 去ZK里查看运行结果 #启动ZK zkCli.sh -server node1:2181 #ZK Shell ls / get /test1

运行结果:

注意我这里并没有zk.close(); 因为创建的是临时节点,断开了就会被删除 这里是运行成功的。 下面我将详细介绍。

ZKJavaAPI 名词解析 ZooDefs.Ids :控制所创建的ZNODE的权限OPEN_ACL_UNSAFE :完全开放的ACL,任何连接的客户端都可以操作该属性znodeCREATOR_ALL_ACL :只有创建者才有ACL权限READ_ACL_UNSAFE :只能读取ACLCreateMode :创建的ZNODE的类型PERSISTENT :永久创建,连接断开不会删除EPHEMERAL :创建临时节点,连接断开即删除 创建永久节点 ZooKeeper zk; @Before // 创建连接 public void init() throws IOException { zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); } @Test // 创建永久节点 public void createPersistentZNODE() throws InterruptedException, KeeperException { /** * ZooDefs.Ids :控制所创建的ZNODE的权限 * OPEN_ACL_UNSAFE : 完全开放的ACL,任何连接的客户端都可以操作该属性znode * CREATOR_ALL_ACL : 只有创建者才有ACL权限 * READ_ACL_UNSAFE:只能读取ACL * * CreateMode : 创建的ZNODE的类型 * PERSISTENT : 永久创建,连接断开不会删除 * EPHEMERAL : 创建临时节点,连接断开即删除 */ zk.create( "/test3" , "def".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT ); } @After // 关闭连接 public void closed() throws InterruptedException { zk.close(); } ls / get /test3

运行结果:

可见@After注解关闭了ZK连接 test3被创建了 PERSISTENT :永久创建,连接断开不会删除

创建临时节点 ZooKeeper zk; @Before // 创建连接 public void init() throws IOException { zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); } @Test // 创建临时节点 public void createEPHEMERALZNODE() throws InterruptedException, KeeperException { zk.create( "/test2" , "abc".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL ); } @After // 关闭连接 public void closed() throws InterruptedException { zk.close(); } ls /

运行结果:

可见后面@After注解里将ZK连接关闭了 EPHEMERAL : 创建临时节点,连接断开即删除 所以test2并没有被创建

获取节点数据 ZooKeeper zk; @Before // 创建连接 public void init() throws IOException { zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); } @Test // 获取节点数据 public void getZNODE() throws InterruptedException, KeeperException { byte[] data = zk.getData("/test1", null, new Stat()); // 字节数组转String,这里toString没有意义 System.out.println(new String(data)); } @After // 关闭连接 public void closed() throws InterruptedException { zk.close(); } get /test1

运行结果:

这里的watcher就是我们NameNode的故障转移机制

修改数据 ZooKeeper zk; @Before // 创建连接 public void init() throws IOException { zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); } @Test // 修改数据 public void setZNODE() throws InterruptedException, KeeperException { zk.setData("/test1","liangzai".getBytes(),1); } @After // 关闭连接 public void closed() throws InterruptedException { zk.close(); } get /test1

运行结果:

刚刚我们test1里面的数据是bacdef 运行后改成了liangzai

删除节点 ZooKeeper zk; @Before // 创建连接 public void init() throws IOException { zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); } @Test // 删除节点 public void deleteZNODE() throws InterruptedException, KeeperException { zk.delete("/test3", 0); } @After // 关闭连接 public void closed() throws InterruptedException { zk.close(); } ls /

运行结果:

这里通过ls / 查看后 test3被删除了

事件 ZooKeeper zk; @Before // 创建连接 public void init() throws IOException { zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); } @Test // 事件 public void triggerWatcher() throws InterruptedException, KeeperException { zk.exists("/test", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("/test 发生了变化"); System.out.println(event.getPath()); System.out.println(event.getState()); System.out.println(event.getType()); System.out.println(event.getWrapper()); } }); @After // 关闭连接 public void closed() throws InterruptedException { zk.close(); } set /test node1

运行结果:

/test 发生了变化 /test SyncConnected NodeDataChanged 3,3,'/test

事件方便我们去监控NameNode的故障转移机制

完整代码 package com.liangzai.ZOOKEEPER; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; public class ZKJavaAPI { ZooKeeper zk; @Before // 创建连接 public void init() throws IOException { zk = new ZooKeeper( "master:2181,node1:2181,node2:2181" , 100000 , null ); } @Test // 创建临时节点 public void createEPHEMERALZNODE() throws InterruptedException, KeeperException { zk.create( "/test2" , "abc".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL ); } @Test // 创建永久节点 public void createPersistentZNODE() throws InterruptedException, KeeperException { /** * ZooDefs.Ids :控制所创建的ZNODE的权限 * OPEN_ACL_UNSAFE : 完全开放的ACL,任何连接的客户端都可以操作该属性znode * CREATOR_ALL_ACL : 只有创建者才有ACL权限 * READ_ACL_UNSAFE:只能读取ACL * * CreateMode : 创建的ZNODE的类型 * PERSISTENT : 永久创建,连接断开不会删除 * EPHEMERAL : 创建临时节点,连接断开即删除 */ zk.create( "/test3" , "def".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT ); } @Test // 获取节点数据 public void getZNODE() throws InterruptedException, KeeperException { byte[] data = zk.getData("/test1", null, new Stat()); // 字节数组转String,这里toString没有意义 System.out.println(new String(data)); } @Test // 修改数据 public void setZNODE() throws InterruptedException, KeeperException { zk.setData("/test1", "liangzai".getBytes(), 1); } @Test // 删除节点 public void deleteZNODE() throws InterruptedException, KeeperException { zk.delete("/test3", 0); } @Test // 事件 public void triggerWatcher() throws InterruptedException, KeeperException { zk.exists("/test", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("/test 发生了变化"); System.out.println(event.getPath()); System.out.println(event.getState()); System.out.println(event.getType()); System.out.println(event.getWrapper()); } }); // 加入死循环,不会被退出 方便在控制台输出 while (true) { } } @After // 关闭连接 public void closed() throws InterruptedException { zk.close(); } }

到底啦!觉得靓仔的文章对你学习Hadoop有所帮助的话,一波三连吧!q(≧▽≦q)


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #JAVA #API