计算机系统应用教程网站

网站首页 > 技术文章 正文

Spring Cloud(三):注册中心zookeeper-站在客户端角度

btikc 2024-09-20 14:51:24 技术文章 26 ℃ 0 评论

大家好,我是杰哥,前两次文章主要讲述了关于Zookeeper的服务端机制。错过的小伙伴们,可以查看杰哥的Spring Cloud专栏的前两篇推送哦


那么今天,就跟着杰哥,转到客户端的角度,通过跟踪源码,进一步揭开zooKeeper的神秘面目吧~



一 概况

大致了解


首先,通过一个图,让大家对于zookeeper的工作流有一个大概的印象

1)图中 包含客户端ClientSendThreadEventThreadServer以及Watcher五个重要角色


2)SendThread负责将ZooKeeper的请求信息封装成一个Packet,发送给 Server ;并维持同Server的心跳


3)Server:处理不同请求,返回response给EventThread


4)EventThread:负责解析通过SendThread得到的Response,最后发送给Watcher


5)Watcher:通过调用processEvent进行具体事件处理


好了,有了一个大致印象了 ,那下面的源码环节将会很easy了~


二 源码

具体探究


细心的你一定发现了,在zookeeper的客户端脚本 zkCli.sh中,我们发现,它实际上是通过加载org.apache.zookeeper.ZooKeeperMain启动的


那么,我们就先从ZooKeeperMain启动类进入zookeeper客户端源码的探索之旅吧~

查看ZooKeeperMainmain()方法,看到该包含两个步骤:



  1. 构造ZooKeeperMain对象并建立连接
  2. 读取终端输入并解析命令


来看看这两个步骤具体是如何处理的~


01. 构造对象 建立连接

1)进入ZookeeperMain()



初始化命令参数各个可选项,调用connectToZK()方法连接到server端


2)进入方法conectToZK()


构造Zookeeper对象,建立连接

3)继续往下跟踪,进入Zookeeper(...)


创建ClientCnxn对象,并调用了它的start()方法

我们分别看看这两个步骤分别做了什么

4)先看看 这个对象初始化的时候都干了什么



我们看到,它分别初始化了很多参数,包括主机列表连接超时时间读取超时时间等。最后还初始化了客户端的2个核心线程:SendThreadEventThread


小插曲:顺便说一句,这两个线程是zookeeper的重要角色,也将是我们今天的主角

5)然后呢,再看看:cnxn.start()方法

这个方法的作用就是:分别启动这两个线程



02. 读取并解析命令


1)进入zookeeper的run()方法



我们看到,该方法通过反射调用jline.ConsoleReader类以及该类的addCompletor()方法,对终端输入进行读取。然后调用executeLine()方法,逐个执行单行命令


2)进入executeLine()方法


我们看到该方法首先通过方法cl.parseCommand(line)方法对命令行进行解析,然后调用processCms(cl)方法执行各个命令


3)进入processCms(cl)方法



该方法再调用processZKCmd()方法,对于抛出的异常分别进行分类处理


4)进入processZKCmd()方法



由于方法比较长,我们分为两部分来查看


a 第一部分,我们看到,对于quitredohistoryprintwatches以及connect方法,直接进行相应处理



到了第二部分的方法,包括对节点的各个操作,则需要在连接建立成功的情况下执行,具体如何执行,我们再来一探究竟~

ZooKeeper.create()为例

1) 上图中,如果指令为create,就会调用zk.create(..)方法

2)进入create(...)方法

我们看到create命令被封装成了一个 CreateRequest对象request,然后调用submitRequest()进行节点创建



3) 进入submitRequest(...)



我们看到zookeeper通过调用queuePacket(...)方法将Request封装成一个Packet


4)进入queuePacket(...)方法



将packet加入SendThread的outgoingQueue队列中,等待执行。并唤醒selector


接下来,在SendThread.run()的while循环中,ZooKeeper将会通过doTransport()将存放在outgoingQueue中的Packet包发送给 Server


03. SendThread

我们在前面有提到,SendThread 的主要作用是:

  • 将Packet包发送给Server
  • 维持Client和Server之间的心跳,确保 session 存活

现在让我们从源码出发,看看SendThread究竟做了哪些工作

SendThread是一个线程类,因此我们进入其run()方法,看看它的启动流程

1)先整体来看,run()方法是通过一个while循环,进行具体任务处理


若状态为关闭或者权限验证失败,则关闭socket连接,并由eventThread处理关闭连接事件


a 与server建立连接



b 判断超时



可以看到会分别判断readTimeoutconnetTimeout 两个超时时间,一旦发现链接超时,则抛出异常,终止SendThread


c 发送心跳


在没有超时且为连接状态的情况下,若已经达到心跳间隔时间,或者在最大时间间隔MAX_SEND_PING_INTERVAL内还没有发送packet。会再次发送心跳数据,避免访问超时



d 发送指令


整体来看,SendThread的主要任务即为:


  • 创建同 Server 之间的 socket 链接
  • 判断链接是否超时
  • 定时发送心跳任务
  • 将ZooKeeper指令发送给Server

我们主要来看看建立连接的过程与发送指令的过程

2) 与 Server 的长链接

a 进入startConnect()

通过调用抽象类ClientCnxnSocket的connect()方法进行socket连接,该抽象类的默认实现是ClientCnxnSocketNIO类。


b 在ClientCnxnSocketNIO.connect()中我们可以看到,与Server之间创建了一个socket链接,并调用registerAndConnect()方法注册并连接到主机地址上



c 进入registerAndConnect(...)方法。



可以看到,zookeeper会首先将sock注册到selector,然后调用sock.connect()连接服务器,判断当前是否是初次连接。若是,则进入初始化连接primeConnection()方法


d 进入primeConnection()方法


首先设置首次连接为false,然后初始化sessionId,并建立连接



e 接下来,将该连接事件组合成packet对象,并添加到发送队列中


需要注意的是,连接事件的requestHeader(请求头)为null



f 设置为可读可写


调用clientCnxnSocket.enableReadWriteOnly()开启监听事件的读写功能


那么到现在为止,已成功完成连接。接下来就要执行doTransport() 了~

3) 发送 ZooKeeper 指令

a 进入doTransPort()



该方法,首先会确保连接成功建立,调用doIO()方法进行处理,然后调用findSendablePacket(...)方法将连接事件的packet放到outgoingQueue头部



b 进入doIO()方法


该方法会分别判断key值是否是可读、或者可写的,分别进行读、写事件的处理



c readable()


先来看看对读操作的处理


调用readResponse()将其加到eventTread


d 进入readResonse()方法

小插曲-Tips:

zookeeper的消息分为三种:

  • ping 消息:XID=-2
  • auth认证消息:XID=-4
  • 订阅的消息:XID=-1

订阅的消息,也就是节点变化的通知消息。比如子节点变化、节点内容变化

我们看到readResonse()方法获取到这类消息,通过eventThread.queueEvent() 将消息推入事件队列waitingEvents,等待后续处理

e writable()

进入第二部分,zookeeper对于写操作的处理



锁定outgoningQueue进行如下处理:将事件封装成packet对象,设置事件的xid,若!p.bb.hasRemaining()为true,表示该事件已发送成功,那么删除outgoingQueue中的事件,并将该事件添加到pendingQueue中,等待后续处理


04. EventThread


进入EventThread的run()方法



我们看到该方法对获取到的事件通过方法processEvent()方法进行处理


因此我们就主要来看看processEvent()方法的逻辑



我们看到,该方法首先会判断事件是否是WatcherSetEventPair的实例


若是,则依次调用 watcher.process(pair.event)进行处理


否则就会以异步回调方式处理。根据 p.response() 判断为哪种响应类型,执行响应的回调方法 processResult()


好了~ zooKeeper客户端的源码还是比较简单的吧,分析到这里,也基本搞清楚了它的具体处理流程


根据以上的分析,我们就可以把最开始的zookeeper的工作流再细化一点,变成下面这个样子:

图中细化到可以看到SendThread中处理过程包含的outgoingQueuependingQueue,并且SendThread和EventThread是通过Clientcnxn来控制处理的


处理流程为:


1)Client发起request给Zookeeper类


2)Zookeeper类将处理该request,并将其放入outgoingQueue(发送队列)


3)Zookeeper Server端处理发送队列中的该事件,并将该事件放到待处理队列PendingQueue中


4)由EventThreadt消费该pendingQueue中的该事件


5)分发给不同的watcher 进行事件的处理



三 总结

总而言之


也就是说,Client中在终端输入指令后,首先会被封装成一个Request请求。然后通过submitRequest,进一步被封装成Packet包,提交给SendThread处理

SendThread再通过doTransport()将Packet发送给Server,并通过readResponse获取结果,解析成一个Event,再将Event加入EventThread的队列中等待执行

EventThread通过processEvent消费队列中的Event事件


是不是更深入理解啦~

那么到现在为止,我们的注册中心章节之-zookeeper篇到这里就结束啦~之前还讲了Eureka篇,接下来将会继续出品console篇和nacos篇,敬请期待哦~



嗯,就这样。每天学习一点,时间会见证你的强大~


下期预告:


Spring Cloud(四):注册中心-选择zookeeper还是Eureka?

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表