zookeeper系列(三)工作原理与应用场景 有更新!

  |   0 评论   |   3,184 浏览

37套精品Java架构师高并发高性能高可用分布式集群电商缓存性能调优设计项目实战视教程 置顶! 有更新!

(一)ZK工作原理(集群)

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它包含一个简单的原语集,分布式应用程序可以基于它实现同步服务,配置维护和命名服务等。Zookeeper是hadoop的一个子项目,其发展历程无需赘述。在分布式应用中,由于工程师不能很好地使用锁机制,以及基于消息的协调机制不适合在某些应用中使用,因此需要有一种可靠的、可扩展的、分布式的、可配置的协调机制来统一系统的状态。Zookeeper的目的就在于此。本节简单分析zookeeper的工作原理,对于如何使用zookeeper不是本节重点

1. 基本概念

1) 角色

Zookeeper中的角色主要有以下三类,如下表所示:

系统模型图如下所示:

2) 设计目的

1.最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的性能。

2.可靠性:具有简单、健壮、良好的性能,如果消息m被到一台服务器接受,那么它将被所有的服务器接受。

3.实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口。

4.等待无关(wait-free):慢的或者失效的client不得干预快速的client的请求,使得每个client都能有效的等待。

5.原子性:更新只能成功或者失败,没有中间状态。

6.顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息a在消息b前发布,则在所有Server上消息a都将在消息b前被发布;偏序是指如果一个消息b在消息a后被同一个发送者发布,a必将排在b前面。

2. 工作原理

Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。

为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了zxid。实现中zxid是一个64位的数字,它高32位是epoch用来标识leader关系是否改变,每次一个leader被选出来,它都会有一个新的epoch,标识当前属于那个leader的统治时期。低32位用于递增计数。

每个Server在工作过程中有三种状态:

l LOOKING:当前Server不知道leader是谁,正在搜寻

l LEADING:当前Server即为选举出来的leader

l FOLLOWING:leader已经选举出来,当前Server与之同步

 

1) 主流程

leader崩溃或者leader失去大多数的follower,这时候zk进入恢复模式,恢复模式需要重新选举出一个新的leader,让所有的Server都恢复到一个正确的状态。Zk的选举算法有两种:一种是基于basic paxos实现的,另外一种是基于fast paxos算法实现的。系统默认的选举算法为fast paxos。先介绍basic paxos流程:

1.选举线程由当前Server发起选举的线程担任,其主要功能是对投票结果进行统计,并选出推荐的Server;

2.选举线程首先向所有Server发起一次询问(包括自己);

3.选举线程收到回复后,验证是否是自己发起的询问(验证zxid是否一致),然后获取对方的id(myid),并存储到当前询问对象列表中,最后获取对方提议的leader相关信息(id,zxid),并将这些信息存储到当次选举的投票记录表中;

4.收到所有Server回复以后,就计算出zxid最大的那个Server,并将这个Server相关信息设置成下一次要投票的Server;

5.线程将当前zxid最大的Server设置为当前Server要推荐的Leader,如果此时获胜的Server获得n/2 + 1的Server票数, 设置当前推荐的leader为获胜的Server,将根据获胜的Server相关信息设置自己的状态,否则,继续这个过程,直到leader被选举出来。

通过流程分析我们可以得出:要使Leader获得多数Server的支持,则Server总数必须是奇数2n+1,且存活的Server的数目不得少于n+1.

每个Server启动后都会重复以上流程。在恢复模式下,如果是刚从崩溃状态恢复的或者刚启动的server还会从磁盘快照中恢复数据和会话信息,zk会记录事务日志并定期进行快照,方便在恢复时进行状态恢复。

fast paxos流程是在选举过程中,某Server首先向所有Server提议自己要成为leader,当其它Server收到提议以后,解决epoch和zxid的冲突,并接受对方的提议,然后向对方发送接受提议完成的消息,重复这个流程,最后一定能选举出Leader。

 

2) 同步流程

选完leader以后,zk就进入状态同步过程。

1. leader等待server连接;

2. Follower连接leader,将最大的zxid发送给leader;

3. Leader根据follower的zxid确定同步点;

4. 完成同步后通知follower 已经成为uptodate状态;

5. Follower收到uptodate消息后,又可以重新接受client的请求进行服务了。

 

流程图如下所示:

 

3. 工作流程

1) Leader工作流程

Leader主要有三个功能:

1.恢复数据;

2.维持与Learner的心跳,接收Learner请求并判断Learner的请求消息类型;

3.Learner的消息类型主要有PING消息、REQUEST消息、ACK消息、REVALIDATE消息,根据不同的消息类型,进行不同的处理。

PING消息是指Learner的心跳信息;REQUEST消息是Follower发送的提议信息,包括写请求及同步请求;ACK消息是Follower的对提议的回复,超过半数的Follower通过,则commit该提议;REVALIDATE消息是用来延长SESSION有效时间。
Leader的工作流程简图如下所示,在实际实现中,流程要比下图复杂得多,启动了三个线程来实现功能。

2) Follower工作流程

Follower主要有四个功能:

1.Leader发送请求(PING消息、REQUEST消息、ACK消息、REVALIDATE消息);

2.接收Leader消息并进行处理;

3.接收Client的请求,如果为写请求,发送给Leader进行投票;

4.返回Client结果。

Follower的消息循环处理如下几种来自Leader的消息:

1 .PING消息: 心跳消息;

2 .PROPOSAL消息:Leader发起的提案,要求Follower投票;

3 .COMMIT消息:服务器端最新一次提案的信息;

4 .UPTODATE消息:表明同步完成;

5 .REVALIDATE消息:根据Leader的REVALIDATE结果,关闭待revalidate的session还是允许其接受消息;

6 .SYNC消息:返回SYNC结果到客户端,这个消息最初由客户端发起,用来强制得到最新的更新。

Follower的工作流程简图如下所示,在实际实现中,Follower是通过5个线程来实现功能的。

对于observer的流程不再叙述,observer流程和Follower的唯一不同的地方就是observer不会参加leader发起的投票。


(二)ZK应用原理

 

zookeeper提供了两个重要的功能机制,一个是数据模型另一个是通知机制。

1. 数据模型

如上图所示,ZooKeeper数据模型的结构与Unix文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每个ZNode都可以通过其路径唯一标识,比如上图中第三层的第一个ZNode, 它的路径是/app1/c1。在每个ZNode上可存储少量数据(默认是1M, 可以通过配置修改, 通常不建议在ZNode上存储大量的数据),这个特性非常有用,在后面的典型应用场景中会介绍到。另外,每个ZNode上还存储了其Acl信息,这里需要注意,虽说ZNode的树形结构跟Unix文件系统很类似,但是其Acl与Unix文件系统是完全不同的,每个ZNode的Acl的独立的,子结点不会继承父结点的。

每个子目录项如 NameService 都被称作为znode,和文件系统一样,我们能够自由的增加、删除znode,在一个znode下增加、删除子znode,唯一的不同在于znode是可以存储数据的

四种类型的znode:

l PERSISTENT-持久化目录节点 

客户端与zookeeper断开连接后,该节点依旧存在 

l PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点 

客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号

l EPHEMERAL-临时目录节点

客户端与zookeeper断开连接后,该节点被删除 

l EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点

客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号 

2. 通知机制

ZooKeeper支持一种Watch操作,Client可以在某个ZNode上设置一个Watcher,来Watch该ZNode上的变化。如果该ZNode上有相应的变化,就会触发这个Watcher,把相应的事件通知给设置Watcher的Client。需要注意的是,ZooKeeper中的Watcher是一次性的,即触发一次就会被取消,如果想继续Watch的话,需要客户端重新设置Watcher。这个跟epoll里的oneshot模式有点类似。

ZkClient客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,zookeeper会通知客户端。


(三)ZK应用场景

1. 名字服务(NameService)

分布式应用中,通常需要一套完备的命令机制,既能产生唯一的标识,又方便人识别和记忆。 我们知道,每个ZNode都可以由其路径唯一标识,路径本身也比较简洁直观,另外ZNode上还可以存储少量数据,这些都是实现统一的NameService的基础。下面以在HDFS中实现NameService为例,来说明实现NameService的基本布骤:

目标:通过简单的名字来访问指定的HDFS机群

定义命名规则:这里要做到简洁易记忆。下面是一种可选的方案: [serviceScheme://][zkCluster]-[clusterName],比如hdfs://lgprc-example/表示基于lgprc ZooKeeper集群的用来做example的HDFS集群

配置DNS映射: 将zkCluster的标识lgprc通过DNS解析到对应的ZooKeeper集群的地址

创建ZNode: 在对应的ZooKeeper上创建/NameService/hdfs/lgprc-example结点,将HDFS的配置文件存储于该结点下

用户程序要访问hdfs://lgprc-example/的HDFS集群,首先通过DNS找到lgprc的ZooKeeper机群的地址,然后在ZooKeeper的/NameService/hdfs/lgprc-example结点中读取到HDFS的配置,进而根据得到的配置,得到HDFS的实际访问入口

2. 配置管理(Configuration Management) 

在分布式系统中,常会遇到这样的场景: 某个Job的很多个实例在运行,它们在运行时大多数配置项是相同的,如果想要统一改某个配置,一个个实例去改,是比较低效,也是比较容易出错的方式。通过ZooKeeper可以很好的解决这样的问题,下面的基本的步骤:

将公共的配置内容放到ZooKeeper中某个ZNode上,比如/service/common-conf

所有的实例在启动时都会传入ZooKeeper集群的入口地址,并且在运行过程中Watch /service/common-conf这个ZNode

如果集群管理员修改了了common-conf,所有的实例都会被通知到,根据收到的通知更新自己的配置,并继续Watch /service/common-conf

3. 组员管理(Group Membership) 

在典型的Master-Slave结构的分布式系统中,Master需要作为“总管”来管理所有的Slave, 当有Slave加入,或者有Slave宕机,Master都需要感知到这个事情,然后作出对应的调整,以便不影响整个集群对外提供服务。以Hbase为例,HMaster管理了所有的RegionServer,当有新的RegionServer加入的时候,HMaster需要分配一些Region到该RegionServer上去,让其提供服务;当有RegionServer宕机时,HMaster需要将该RegionServer之前服务的Region都重新分配到当前正在提供服务的其它RegionServer上,以便不影响客户端的正常访问。下面是这种场景下使用ZooKeeper的基本步骤:

Master在ZooKeeper上创建/service/slaves结点,并设置对该结点的Watcher

每个Slave在启动成功后,创建唯一标识自己的临时性(Ephemeral)结点/service/slaves/${slave_id},并将自己地址(ip/port)等相关信息写入该结点

Master收到有新子结点加入的通知后,做相应的处理

如果有Slave宕机,由于它所对应的结点是临时性结点,在它的Session超时后,ZooKeeper会自动删除该结点

Master收到有子结点消失的通知,做相应的处理

4. 简单互斥锁(Simple Lock) 

我们知识,在传统的应用程序中,线程、进程的同步,都可以通过操作系统提供的机制来完成。但是在分布式系统中,多个进程之间的同步,操作系统层面就无能为力了。这时候就需要像ZooKeeper这样的分布式的协调(Coordination)服务来协助完成同步,下面是用ZooKeeper实现简单的互斥锁的步骤,这个可以和线程间同步的mutex做类比来理解:

多个进程尝试去在指定的目录下去创建一个临时性(Ephemeral)结点 /locks/my_lock

ZooKeeper能保证,只会有一个进程成功创建该结点,创建结点成功的进程就是抢到锁的进程,假设该进程为A

其它进程都对/locks/my_lock进行Watch

A进程不再需要锁,可以显式删除/locks/my_lock释放锁;或者是A进程宕机后Session超时,ZooKeeper系统自动删除/locks/my_lock结点释放锁。此时,其它进程就会收到ZooKeeper的通知,并尝试去创建/locks/my_lock抢锁,如此循环反复

5. 互斥锁(Simple Lock without Herd Effect) 

上一节的例子中有一个问题,每次抢锁都会有大量的进程去竞争,会造成羊群效应(Herd Effect),为了解决这个问题,我们可以通过下面的步骤来改进上述过程:

每个进程都在ZooKeeper上创建一个临时的顺序结点(Ephemeral Sequential) /locks/lock_${seq}

${seq}最小的为当前的持锁者(${seq}是ZooKeeper生成的Sequenctial Number)

其它进程都对只watch比它次小的进程对应的结点,比如2 watch 1, 3 watch 2, 以此类推当前持锁者释放锁后,比它次大的进程就会收到ZooKeeper的通知,它成为新的持锁者,如此循环反复

这里需要补充一点,通常在分布式系统中用ZooKeeper来做Leader Election(选主)就是通过上面的机制来实现的,这里的持锁者就是当前的“主”。

6. 读写锁(Read/Write Lock) 

我们知道,读写锁跟互斥锁相比不同的地方是,它分成了读和写两种模式,多个读可以并发执行,但写和读、写都互斥,不能同时执行行。利用ZooKeeper,在上面的基础上,稍做修改也可以实现传统的读写锁的语义,下面是基本的步骤:

每个进程都在ZooKeeper上创建一个临时的顺序结点(Ephemeral Sequential) /locks/lock_${seq}

${seq}最小的一个或多个结点为当前的持锁者,多个是因为多个读可以并发

需要写锁的进程,Watch比它次小的进程对应的结点

需要读锁的进程,Watch比它小的最后一个写进程对应的结点

当前结点释放锁后,所有Watch该结点的进程都会被通知到,他们成为新的持锁者,如此循环反复

7. 屏障(Barrier) 

在分布式系统中,屏障是这样一种语义: 客户端需要等待多个进程完成各自的任务,然后才能继续往前进行下一步。下用是用ZooKeeper来实现屏障的基本步骤:

Client在ZooKeeper上创建屏障结点/barrier/my_barrier,并启动执行各个任务的进程

Client通过exist()来Watch /barrier/my_barrier结点

每个任务进程在完成任务后,去检查是否达到指定的条件,如果没达到就啥也不做,如果达到了就把/barrier/my_barrier结点删除

Client收到/barrier/my_barrier被删除的通知,屏障消失,继续下一步任务

8. 双屏障(Double Barrier)

双屏障是这样一种语义: 它可以用来同步一个任务的开始和结束,当有足够多的进程进入屏障后,才开始执行任务;当所有的进程都执行完各自的任务后,屏障才撤销。下面是用ZooKeeper来实现双屏障的基本步骤:

进入屏障:

Client Watch /barrier/ready结点, 通过判断该结点是否存在来决定是否启动任务

每个任务进程进入屏障时创建一个临时结点/barrier/process/${process_id},然后检查进入屏障的结点数是否达到指定的值,如果达到了指定的值,就创建一个/barrier/ready结点,否则继续等待

Client收到/barrier/ready创建的通知,就启动任务执行过程

离开屏障:

Client Watch /barrier/process,如果其没有子结点,就可以认为任务执行结束,可以离开屏障

每个任务进程执行任务结束后,都需要删除自己对应的结点/barrier/process/${process_id}

 

9. 负载均衡(软负载)

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。

消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下:

生产者负载均衡:metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。

 

消费负载均衡:

在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

每个分区针对同一个group只挂载一个消费者。

如果同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。

如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。

在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费

10. 分布式队列

队列方面,简单地讲有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里不再赘述。

第二种队列其实是在FIFO队列的基础上作了一个增强。通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,需要在很多子任务完成(或条件就绪)情况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下建立自己的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现自己下面的子节点满足指定个数,就可以进行下一步按序进行处理了。


(四)ZkClient

在使用ZooKeeper的Java客户端时,经常需要处理几个问题:重复注册watcher、session失效重连、异常处理。

要解决上述的几个问题,可以自己解决,也可以采用第三方的java客户端来完成。这里就介绍一种常用的客户端zkclient,目前已经运用到了很多项目中,知名的有Dubbo、Kafka、Helix。

1. ZkClient的设计

 


2. ZkClient的组件说明

从上述结构上看,IZKConnection是一个ZkClient与ZooKeeper之间的一个适配器。在代码里直接使用的是ZKClient,其实质还是委托了zookeeper来处理了。

前面有一篇文章中,已经说了,使用ZooKeeper客户端来注册watcher有几种方法:1、创建ZooKeeper对象时指定默认的Watcher,2、getData(),3、exists(),4、getchildren。其中getdata,exists注册的是某个节点的事件处理器(watcher),getchildren注册的是子节点的事件处理器(watcher)。而在ZKClient中,根据事件类型,分为了节点事件(数据事件)、子节点事件。对应的事件处理器则是IZKDataListener和IZKChildListener。另外加入了Session相关的事件和事件处理器。

ZkEventThread是专门用来处理事件的线程。

3. 重要处理流程说明

1) 启动ZKClient

在创建ZKClient对象时,就完成了到ZooKeeper服务器连接的建立。具体过程是这样的:

      

1、  启动时,指定好connection string,连接超时时间,序列化工具等。

2、  创建并启动eventThread,用于接收事件,并调度事件监听器Listener的执行。

3、  连接到zookeeper服务器,同时将ZKClient自身作为默认的Watcher。

2) 为节点注册Watcher

       ZooKeeper的三个方法:getData、getChildren、exists,ZKClient都提供了相应的代理方法。就拿exists来看:

可以看到,是否注册watcher,由hasListeners(path)来决定的。

hasListeners就是看有没有与该数据节点绑定的listener。

所以呢,默认情况下,都会自动的为指定的path注册watcher,并且是默认的watcher(ZKClient)。怎么才能让hasListeners判定值为true呢,也就是怎么才能为path绑定Listener呢?

3) ZKClient提供了订阅功能注册监听

 

一个新建的会话,只需要在取得响应的数据节点后,调用subscribteXxx就可以订阅上相应的事件了。

4. ZooKeeper的变更操作

Zookeeper中提供的变更操作有:节点的创建、删除,节点数据的修改。

创建操作,数据节点分为四种,ZKClient分别为他们提供了相应的代理:

删除节点的操作:

 

修改节点数据的操作:

 

writeDataReturnStat():写数据并返回数据的状态。

updateDataSerialized():修改已序列化的数据。执行过程是:先读取数据,然后使用DataUpdater对数据修改,最后调用writeData将修改后的数据发送给服务端。

5. 客户端处理变更

前面已经知道,ZKClient是默认的Watcher,并且在为各个数据节点注册的Watcher都是这个默认的Watcher。那么该是如何将各种事件通知给相应的Listener呢?

 

处理过程大致可以概括为下面的步骤:

1、判断变更类型:变更类型分为State变更、ChildNode变更(创建子节点、删除子节点、修改子节点数据)、NodeData变更(创建指定node,删除节点,节点数据变更)。

2、取出与path关联的Listeners,并为每一个Listener创建一个ZKEvent,将ZkEvent交给ZkEventThread处理。

3、ZkEventThread线程,拿到ZkEvent后,只需要调用ZkEvent的run方法进行处理。

从这里也可以知道,具体的怎么如何调用Listener,还要依赖于ZkEvent的run()实现了。

6. 序列化处理

ZooKeeper中,会涉及到序列化、反序列化的操作有两种:getData、setData。在ZKClient中,分别用readData、writeData来替代了。

对于readData:先调用zookeeper的getData,然后进行使用ZKSerializer进行反序列化工作。

对于writeData:先使用ZKSerializer将对象序列化后,再调用zookeeper的setData。

ZkClient如何解决使用ZooKeeper客户端遇到的问题的呢?

Watcher自动重注册:这个要是依赖于hasListeners()的判断,来决定是否再次注册。如果对此有不清晰的,可以看上面的流程处理的说明

Session失效重连:如果发现会话过期,就先关闭已有连接,再重新建立连接。

异常处理:对比ZooKeeper和ZKClient,就可以发现ZooKeeper的所有操作都是抛异常的,而ZKClient的所有操作,都不会抛异常的。在发生异常时,它或做日志,或返回空,或做相应的Listener调用。

相比于ZooKeeper官方客户端,使用ZKClient时,只需要关注实际的Listener实现即可。所以这个客户端,还是值得使用。


(五)ZK应用场景实现

1. Demo-Znode操作监控

1) pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.boom.zk</groupId>

  <artifactId>config</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <packaging>jar</packaging>

  <name>config</name>

  <url>http://maven.apache.org</url>

 

  <properties>

   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

 

  <dependencies>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

<version>1.7.5</version>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>3.8.1</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.6</version>

</dependency>

<dependency>

<groupId>com.101tec</groupId>

<artifactId>zkclient</artifactId>

<version>0.8</version>

</dependency>

  </dependencies>

</project>

2) 操作类

package com.boom.zk.demo;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

 

import org.I0Itec.zkclient.ZkClient;

 

/**

 * Znode操作简单封装

 * @author C.MuLin

 *

 */

public class ZnodeModel{

private ZkClient client;

 

public ZnodeModel(String serverstring) {

this.client = new ZkClient(serverstring);

}

 

public void add(String key, String value) {

this.client.createPersistent(key, value);

}

 

public void update(String key, String value) {

this.client.writeData(key, value);

}

 

public void delete(String key) {

this.client.delete(key);

}

 

public String get(String key) {

if(!this.client.exists(key)){

return null;

}

return this.client.readData(key);

}

 

public Map<String, String> getAll(String yardRoot) {

List<String> yardList = this.client.getChildren(yardRoot);

Map<String, String> currentYardProperties = new HashMap<String, String>();

for(String yard : yardList){

String value = this.client.readData(yard);

String key = yard.substring(yard.indexOf("/")+1);

currentYardProperties.put(key, value);

}

return currentYardProperties;

}

}

3) 监听类

package com.boom.zk.demo;

 

import java.util.List;

 

import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

public class ZnodeWatcher{

private final static Logger logger = LoggerFactory.getLogger(ZnodeWatcher.class);

private ZkClient zkClient = null;

private String watchKey = null;

private ZnodeListener listener = null;

public ZnodeWatcher(String serverstring){

zkClient = new ZkClient(serverstring);

listener = new ZnodeListener();

}

public ZnodeWatcher(String serverstring,String watchKey){

zkClient = new ZkClient(serverstring);

listener = new ZnodeListener();

this.watchKey = watchKey;

this.watch(this.watchKey);

}

public void watch(String key){

// 添加监听

zkClient.subscribeChildChanges(key, listener);

zkClient.subscribeDataChanges(key, listener);

logger.info("start watched the key :"this.watchKey);

}

public class ZnodeListener implements IZkDataListener,IZkChildListener{

public void handleChildChange(String parentPath, List<String> currentChilds)

throws Exception {

StringBuffer childs = new StringBuffer();

for (String string : currentChilds) {

childs.append(string+"\n");

}

logger.info("------------- childData {} change ",parentPath+"\nchilds:\n"+childs);

}

 

public void handleDataChange(String dataPath, Object data) throws Exception {

logger.info("------------- data {} change ",dataPath);

}

 

public void handleDataDeleted(String dataPath) throws Exception {

logger.info("------------- data {} deleted ",dataPath);

}

}

}

 

4) 开启一个监听线程

package com.boom.zk.demo;

 

import junit.framework.TestCase;

 

public class ZnodeWatcherTest extends TestCase {

public static void main(String[] args) {

ZnodeWatcher watcher = new ZnodeWatcher("192.168.211.135:2181""/boom");

while (true) {

try {

Thread.sleep(100);

catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

执行上述测试类,开启一个监听线程,信息如下:

2017-05-10 20:12:26 [ZkClient-EventThread-9-192.168.211.135:2181] INFO org.I0Itec.zkclient.ZkEventThread  - Starting ZkClient event thread.

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:host.name=a113

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:java.version=1.7.0_80

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:java.vendor=Oracle Corporation

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:java.home=C:\Program Files\Java\jdk1.7.0_80\jre

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:java.class.path=E:\Src41-45\83-config\target\test-classes;E:\Src41-45\83-config\target\classes;D:\repository\org\slf4j\slf4j-log4j12\1.7.5\slf4j-log4j12-1.7.5.jar;D:\repository\org\slf4j\slf4j-api\1.7.5\slf4j-api-1.7.5.jar;D:\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;D:\repository\junit\junit\3.8.1\junit-3.8.1.jar;D:\repository\org\apache\zookeeper\zookeeper\3.4.6\zookeeper-3.4.6.jar;D:\repository\jline\jline\0.9.94\jline-0.9.94.jar;D:\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;D:\repository\com\101tec\zkclient\0.8\zkclient-0.8.jar

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:java.library.path=C:\Program Files\Java\jdk1.7.0_80\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;D:\apps\apache-maven-3.5.0\bin;C:\ProgramData\Oracle\Java\javapath;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\TortoiseSVN\bin;.

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:java.io.tmpdir=C:\Users\ADMINI~1\AppData\Local\Temp\

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:java.compiler=<NA>

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:os.name=Windows 7

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:os.arch=amd64

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:os.version=6.1

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:user.name=Administrator

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:user.home=C:\Users\Administrator

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Client environment:user.dir=E:\Src41-45\83-config

2017-05-10 20:12:26 [main] INFO org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=192.168.211.135:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@48e9f62

2017-05-10 20:12:26 [main] INFO org.I0Itec.zkclient.ZkClient  - Waiting for keeper state SyncConnected

2017-05-10 20:12:26 [main-SendThread(192.168.211.135:2181)] INFO org.apache.zookeeper.ClientCnxn  - Opening socket connection to server 192.168.211.135/192.168.211.135:2181. Will not attempt to authenticate using SASL (unknown error)

2017-05-10 20:12:26 [main-SendThread(192.168.211.135:2181)] INFO org.apache.zookeeper.ClientCnxn  - Socket connection established to 192.168.211.135/192.168.211.135:2181, initiating session

2017-05-10 20:12:26 [main-SendThread(192.168.211.135:2181)] INFO org.apache.zookeeper.ClientCnxn  - Session establishment complete on server 192.168.211.135/192.168.211.135:2181, sessionid = 0x35bf2356d510000, negotiated timeout = 30000

2017-05-10 20:12:26 [main-EventThread] INFO org.I0Itec.zkclient.ZkClient  - zookeeper state changed (SyncConnected)

2017-05-10 20:12:26 [main] INFO com.boom.zk.demo.ZnodeWatcher  - start watched the key :/boom

查看监听线程注册监听的zk节点,135为监听线程监听的zk节点,能查看到watch的数量

[root@slave1 bin]# echo wchs | nc 192.168.211.135 2181

1 connections watching 1 paths

Total watches:1

[root@slave1 bin]# echo wchs | nc 192.168.211.134 2181

0 connections watching 0 paths

Total watches:0

[root@slave1 bin]# 

开启其它线程操作ZK节点

package com.boom.zk.demo;

 

import junit.framework.TestCase;

 

public class ZnodeOprTest extends TestCase {

public void testNodeAdd(){

ZnodeModel model = new ZnodeModel("192.168.211.134:2181");

model.add("/boom/test6","ads");

}

}

执行上述添加节点操作,监听线程监控到的消息如下:

2017-05-10 20:33:03 [ZkClient-EventThread-9-192.168.211.135:2181] INFO com.boom.zk.demo.ZnodeWatcher  - ------------- childData /boom

childs:

test5

test2

test3

test1

test6

test7

 change 

评论

发表评论

validate