大家好,我是杰哥,前两次文章主要讲述了关于Zookeeper的服务端机制。错过的小伙伴们,可以查看杰哥的Spring Cloud专栏的前两篇推送哦
那么今天,就跟着杰哥,转到客户端的角度,通过跟踪源码,进一步揭开zooKeeper的神秘面目吧~
一 概况
大致了解
首先,通过一个图,让大家对于zookeeper的工作流有一个大概的印象
1)图中 包含客户端Client、SendThread、EventThread、Server以及Watcher五个重要角色
2)SendThread:负责将ZooKeeper的请求信息封装成一个Packet,发送给 Server ;并维持同Server的心跳
3)Server:处理不同请求,返回response给EventThread
4)EventThread:负责解析通过SendThread得到的Response,最后发送给Watcher
5)Watcher:通过调用processEvent进行具体事件处理
好了,有了一个大致印象了 ,那下面的源码环节将会很easy了~
二 源码
具体探究
细心的你一定发现了,在zookeeper的客户端脚本 zkCli.sh中,我们发现,它实际上是通过加载org.apache.zookeeper.ZooKeeperMain启动的
那么,我们就先从ZooKeeperMain启动类进入zookeeper客户端源码的探索之旅吧~
查看ZooKeeperMain的main()方法,看到该包含两个步骤:
- 构造ZooKeeperMain对象并建立连接
- 读取终端输入并解析命令
来看看这两个步骤具体是如何处理的~
01. 构造对象 建立连接
1)进入ZookeeperMain()
初始化命令参数各个可选项,调用connectToZK()方法连接到server端
2)进入方法conectToZK()
构造Zookeeper对象,建立连接
3)继续往下跟踪,进入Zookeeper(...)
创建ClientCnxn对象,并调用了它的start()方法。
我们分别看看这两个步骤分别做了什么
4)先看看 这个对象初始化的时候都干了什么
我们看到,它分别初始化了很多参数,包括主机列表、连接超时时间、读取超时时间等。最后还初始化了客户端的2个核心线程:SendThread和EventThread
小插曲:顺便说一句,这两个线程是zookeeper的重要角色,也将是我们今天的主角
5)然后呢,再看看:cnxn.start()方法
这个方法的作用就是:分别启动这两个线程
02. 读取并解析命令
1)进入zookeeper的run()方法
我们看到,该方法通过反射调用jline.ConsoleReader类以及该类的addCompletor()方法,对终端输入进行读取。然后调用executeLine()方法,逐个执行单行命令
2)进入executeLine()方法
我们看到该方法首先通过方法cl.parseCommand(line)方法对命令行进行解析,然后调用processCms(cl)方法执行各个命令
3)进入processCms(cl)方法
该方法再调用processZKCmd()方法,对于抛出的异常分别进行分类处理
4)进入processZKCmd()方法
由于方法比较长,我们分为两部分来查看
a 第一部分,我们看到,对于quit、redo、history、printwatches以及connect方法,直接进行相应处理
到了第二部分的方法,包括对节点的各个操作,则需要在连接建立成功的情况下执行,具体如何执行,我们再来一探究竟~
以ZooKeeper.create()为例
1) 上图中,如果指令为create,就会调用zk.create(..)方法
2)进入create(...)方法
我们看到create命令被封装成了一个 CreateRequest对象request,然后调用submitRequest()进行节点创建
3) 进入submitRequest(...)中
我们看到zookeeper通过调用queuePacket(...)方法将Request封装成一个Packet包
4)进入queuePacket(...)方法
将packet加入SendThread的outgoingQueue队列中,等待执行。并唤醒selector
接下来,在SendThread.run()的while循环中,ZooKeeper将会通过doTransport()将存放在outgoingQueue中的Packet包发送给 Server
03. SendThread
我们在前面有提到,SendThread 的主要作用是:
- 将Packet包发送给Server
- 维持Client和Server之间的心跳,确保 session 存活
现在让我们从源码出发,看看SendThread究竟做了哪些工作
SendThread是一个线程类,因此我们进入其run()方法,看看它的启动流程
1)先整体来看,run()方法是通过一个while循环,进行具体任务处理
若状态为关闭或者权限验证失败,则关闭socket连接,并由eventThread处理关闭连接事件
a 与server建立连接
b 判断超时
可以看到会分别判断readTimeout和connetTimeout 两个超时时间,一旦发现链接超时,则抛出异常,终止SendThread
c 发送心跳
在没有超时且为连接状态的情况下,若已经达到心跳间隔时间,或者在最大时间间隔MAX_SEND_PING_INTERVAL内还没有发送packet。会再次发送心跳数据,避免访问超时
d 发送指令
整体来看,SendThread的主要任务即为:
- 创建同 Server 之间的 socket 链接
- 判断链接是否超时
- 定时发送心跳任务
- 将ZooKeeper指令发送给Server
我们主要来看看建立连接的过程与发送指令的过程
2) 与 Server 的长链接
a 进入startConnect()
通过调用抽象类ClientCnxnSocket的connect()方法进行socket连接,该抽象类的默认实现是ClientCnxnSocketNIO类。
b 在ClientCnxnSocketNIO.connect()中我们可以看到,与Server之间创建了一个socket链接,并调用registerAndConnect()方法注册并连接到主机地址上
c 进入registerAndConnect(...)方法。
可以看到,zookeeper会首先将sock注册到selector,然后调用sock.connect()连接服务器,判断当前是否是初次连接。若是,则进入初始化连接primeConnection()方法
d 进入primeConnection()方法
首先设置首次连接为false,然后初始化sessionId,并建立连接
e 接下来,将该连接事件组合成packet对象,并添加到发送队列中
需要注意的是,连接事件的requestHeader(请求头)为null
f 设置为可读可写
调用clientCnxnSocket.enableReadWriteOnly()开启监听事件的读写功能
那么到现在为止,已成功完成连接。接下来就要执行doTransport() 了~
3) 发送 ZooKeeper 指令
a 进入doTransPort()
该方法,首先会确保连接成功建立,调用doIO()方法进行处理,然后调用findSendablePacket(...)方法将连接事件的packet放到outgoingQueue头部
b 进入doIO()方法
该方法会分别判断key值是否是可读、或者可写的,分别进行读、写事件的处理
c readable()
先来看看对读操作的处理
调用readResponse()将其加到eventTread中
d 进入readResonse()方法
小插曲-Tips:
zookeeper的消息分为三种:
- ping 消息:XID=-2
- auth认证消息:XID=-4
- 订阅的消息:XID=-1
订阅的消息,也就是节点变化的通知消息。比如子节点变化、节点内容变化
我们看到readResonse()方法获取到这类消息,通过eventThread.queueEvent() 将消息推入事件队列waitingEvents,等待后续处理
e writable()
进入第二部分,zookeeper对于写操作的处理
锁定outgoningQueue进行如下处理:将事件封装成packet对象,设置事件的xid,若!p.bb.hasRemaining()为true,表示该事件已发送成功,那么删除outgoingQueue中的事件,并将该事件添加到pendingQueue中,等待后续处理
04. EventThread
进入EventThread的run()方法
我们看到该方法对获取到的事件通过方法processEvent()方法进行处理
因此我们就主要来看看processEvent()方法的逻辑
我们看到,该方法首先会判断事件是否是WatcherSetEventPair的实例
若是,则依次调用 watcher.process(pair.event)进行处理
否则就会以异步回调方式处理。根据 p.response() 判断为哪种响应类型,执行响应的回调方法 processResult()
好了~ zooKeeper客户端的源码还是比较简单的吧,分析到这里,也基本搞清楚了它的具体处理流程
根据以上的分析,我们就可以把最开始的zookeeper的工作流再细化一点,变成下面这个样子:
图中细化到可以看到SendThread中处理过程包含的outgoingQueue和pendingQueue,并且SendThread和EventThread是通过Clientcnxn来控制处理的
处理流程为:
1)Client发起request给Zookeeper类
2)Zookeeper类将处理该request,并将其放入outgoingQueue(发送队列)
3)Zookeeper Server端处理发送队列中的该事件,并将该事件放到待处理队列PendingQueue中
4)由EventThreadt消费该pendingQueue中的该事件
5)分发给不同的watcher 进行事件的处理
三 总结
总而言之
也就是说,Client中在终端输入指令后,首先会被封装成一个Request请求。然后通过submitRequest,进一步被封装成Packet包,提交给SendThread处理
SendThread再通过doTransport()将Packet发送给Server,并通过readResponse获取结果,解析成一个Event,再将Event加入EventThread的队列中等待执行
EventThread通过processEvent消费队列中的Event事件
是不是更深入理解啦~
那么到现在为止,我们的注册中心章节之-zookeeper篇到这里就结束啦~之前还讲了Eureka篇,接下来将会继续出品console篇和nacos篇,敬请期待哦~
嗯,就这样。每天学习一点,时间会见证你的强大~
下期预告:
Spring Cloud(四):注册中心-选择zookeeper还是Eureka?
本文暂时没有评论,来添加一个吧(●'◡'●)