流程图
先看一下客户端源码的流程图
下面根据源码讲解,大家整合源码和流程图一起看最好,本篇内容比较多建议收藏起来看。
入口类
从ZkCli.sh脚本中可以看到zk源码客户端入口类为ZooKeeperMain
找到入口类ZooKeeperMain 中的main方法
public static void main(String args[]) throws KeeperException, IOException, InterruptedException
{
// 初始化
ZooKeeperMain main = new ZooKeeperMain(args);
// 读命令执行命令,也就是我们的创建节点更新节点等等命令的读取执行
main.run();
}
跟踪 ZooKeeperMain main = new ZooKeeperMain(args);
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
//向private Map<String,String> options = new HashMap<String,String>(); 设置参数
cl.parseOptions(args);
System.out.println("Connecting to " + cl.getOption("server"));
// 连接zk
connectToZK(cl.getOption("server"));
//zk = new ZooKeeper(cl.getOption("server"),
// Integer.parseInt(cl.getOption("timeout")), new MyWatcher());
}
protected void connectToZK(String newHost) throws InterruptedException, IOException {
// zk已连接,先close
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
//new 出 ZooKeeper
zk = new ZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
new MyWatcher(), readOnly);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
// 创建客户端上下文
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
// 开启上下文设置的线程
/** public void start() {
** sendThread.start();
** eventThread.start();
** }
**/
cnxn.start();
}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
// 连接状态 CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,CLOSED, AUTH_FAILED, NOT_CONNECTED
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);// 守护线程
}
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true); // 守护线程
}
跟踪main.run();
void run() throws KeeperException, IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");
boolean jlinemissing = false;
// only use jline if it's in the classpath
try {
Class<?> consoleC = Class.forName("jline.ConsoleReader");
Class<?> completorC =
Class.forName("org.apache.zookeeper.JLineZNodeCompletor");
System.out.println("JLine support is enabled");
Object console =
consoleC.getConstructor().newInstance();
Object completor =
completorC.getConstructor(ZooKeeper.class).newInstance(zk);
Method addCompletor = consoleC.getMethod("addCompletor",
Class.forName("jline.Completor"));
addCompletor.invoke(console, completor);
String line;
Method readLine = consoleC.getMethod("readLine", String.class);
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
// 执行
executeLine(line);
}
} catch (ClassNotFoundException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (NoSuchMethodException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InvocationTargetException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (IllegalAccessException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InstantiationException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
}
if (jlinemissing) {
System.out.println("JLine support is disabled");
BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = br.readLine()) != null) {
executeLine(line);
}
}
} else {
// Command line args non-null. Run what was passed.
processCmd(cl);
}
}
public void executeLine(String line) throws InterruptedException, IOException, KeeperException {
if (!line.equals("")) {
cl.parseCommand(line);
// 添加历史命令
addToHistory(commandCount,line);
// 执行命令核心方法
processCmd(cl);
// 命令计数
commandCount++;
}
}
protected boolean processCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException
{
try {
// 执行zk命令
return processZKCmd(co);
} catch (IllegalArgumentException e) {
System.err.println("Command failed: " + e);
} catch (KeeperException.NoNodeException e) {
System.err.println("Node does not exist: " + e.getPath());
} catch (KeeperException.NoChildrenForEphemeralsException e) {
System.err.println("Ephemerals cannot have children: "
+ e.getPath());
} catch (KeeperException.NodeExistsException e) {
System.err.println("Node already exists: " + e.getPath());
} catch (KeeperException.NotEmptyException e) {
System.err.println("Node not empty: " + e.getPath());
} catch (KeeperException.NotReadOnlyException e) {
System.err.println("Not a read-only call: " + e.getPath());
}catch (KeeperException.InvalidACLException e) {
System.err.println("Acl is not valid : "+e.getPath());
}catch (KeeperException.NoAuthException e) {
System.err.println("Authentication is not valid : "+e.getPath());
}catch (KeeperException.BadArgumentsException e) {
System.err.println("Arguments are not valid : "+e.getPath());
}catch (KeeperException.BadVersionException e) {
System.err.println("version No is not valid : "+e.getPath());
}
return false;
}
/**
根据字符串判断属于什么命令然后发送请求,最终会调用到zookeeper类中的对应方法
**/
protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException
{
Stat stat = new Stat();
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
return false;
}
if (!commandMap.containsKey(cmd)) {
usage();
return false;
}
boolean watch = args.length > 2;
String path = null;
List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
LOG.debug("Processing " + cmd);
if (cmd.equals("quit")) {
System.out.println("Quitting...");
zk.close();
System.exit(0);
} else if (cmd.equals("redo") && args.length >= 2) {
Integer i = Integer.decode(args[1]);
if (commandCount <= i || i < 0){ // don't allow redoing this redo
System.out.println("Command index out of range");
return false;
}
cl.parseCommand(history.get(i));
if (cl.getCommand().equals( "redo" )){
System.out.println("No redoing redos");
return false;
}
history.put(commandCount, history.get(i));
processCmd( cl);
} else if (cmd.equals("history")) {
for (int i=commandCount - 10;i<=commandCount;++i) {
if (i < 0) continue;
System.out.println(i + " - " + history.get(i));
}
} else if (cmd.equals("printwatches")) {
if (args.length == 1) {
System.out.println("printwatches is " + (printWatches ? "on" : "off"));
} else {
printWatches = args[1].equals("on");
}
} else if (cmd.equals("connect")) {
if (args.length >=2) {
connectToZK(args[1]);
} else {
connectToZK(host);
}
}
// Below commands all need a live connection
if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}
if (cmd.equals("create") && args.length >= 3) {
int first = 0;
CreateMode flags = CreateMode.PERSISTENT;
if ((args[1].equals("-e") && args[2].equals("-s"))
|| (args[1]).equals("-s") && (args[2].equals("-e"))) {
first+=2;
flags = CreateMode.EPHEMERAL_SEQUENTIAL;
} else if (args[1].equals("-e")) {
first++;
flags = CreateMode.EPHEMERAL;
} else if (args[1].equals("-s")) {
first++;
flags = CreateMode.PERSISTENT_SEQUENTIAL;
}
if (args.length == first + 4) {
acl = parseACLs(args[first+3]);
}
path = args[first + 1];
String newPath = zk.create(path, args[first+2].getBytes(), acl,
flags);
System.err.println("Created " + newPath);
} else if (cmd.equals("delete") && args.length >= 2) {
path = args[1];
zk.delete(path, watch ? Integer.parseInt(args[2]) : -1);
} else if (cmd.equals("rmr") && args.length >= 2) {
path = args[1];
ZKUtil.deleteRecursive(zk, path);
} else if (cmd.equals("set") && args.length >= 3) {
path = args[1];
stat = zk.setData(path, args[2].getBytes(),
args.length > 3 ? Integer.parseInt(args[3]) : -1);
printStat(stat);
} else if (cmd.equals("aget") && args.length >= 2) {
path = args[1];
zk.getData(path, watch, dataCallback, path);
} else if (cmd.equals("get") && args.length >= 2) {
path = args[1];
byte data[] = zk.getData(path, watch, stat);
data = (data == null)? "null".getBytes() : data;
System.out.println(new String(data));
printStat(stat);
} else if (cmd.equals("ls") && args.length >= 2) {
path = args[1];
List<String> children = zk.getChildren(path, watch);
System.out.println(children);
} else if (cmd.equals("ls2") && args.length >= 2) {
path = args[1];
List<String> children = zk.getChildren(path, watch, stat);
System.out.println(children);
printStat(stat);
} else if (cmd.equals("getAcl") && args.length >= 2) {
path = args[1];
acl = zk.getACL(path, stat);
for (ACL a : acl) {
System.out.println(a.getId() + ": "
+ getPermString(a.getPerms()));
}
} else if (cmd.equals("setAcl") && args.length >= 3) {
path = args[1];
stat = zk.setACL(path, parseACLs(args[2]),
args.length > 4 ? Integer.parseInt(args[3]) : -1);
printStat(stat);
} else if (cmd.equals("stat") && args.length >= 2) {
path = args[1];
stat = zk.exists(path, watch);
if (stat == null) {
throw new KeeperException.NoNodeException(path);
}
printStat(stat);
} else if (cmd.equals("listquota") && args.length >= 2) {
path = args[1];
String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode;
byte[] data = null;
try {
System.err.println("absolute path is " + absolutePath);
data = zk.getData(absolutePath, false, stat);
StatsTrack st = new StatsTrack(new String(data));
System.out.println("Output quota for " + path + " "
+ st.toString());
data = zk.getData(Quotas.quotaZookeeper + path + "/" +
Quotas.statNode, false, stat);
System.out.println("Output stat for " + path + " " +
new StatsTrack(new String(data)).toString());
} catch(KeeperException.NoNodeException ne) {
System.err.println("quota for " + path + " does not exist.");
}
} else if (cmd.equals("setquota") && args.length >= 4) {
String option = args[1];
String val = args[2];
path = args[3];
System.err.println("Comment: the parts are " +
"option " + option +
" val " + val +
" path " + path);
if ("-b".equals(option)) {
// we are setting the bytes quota
createQuota(zk, path, Long.parseLong(val), -1);
} else if ("-n".equals(option)) {
// we are setting the num quota
createQuota(zk, path, -1L, Integer.parseInt(val));
} else {
usage();
}
} else if (cmd.equals("delquota") && args.length >= 2) {
//if neither option -n or -b is specified, we delete
// the quota node for thsi node.
if (args.length == 3) {
//this time we have an option
String option = args[1];
path = args[2];
if ("-b".equals(option)) {
delQuota(zk, path, true, false);
} else if ("-n".equals(option)) {
delQuota(zk, path, false, true);
}
} else if (args.length == 2) {
path = args[1];
// we dont have an option specified.
// just delete whole quota node
delQuota(zk, path, true, true);
} else if (cmd.equals("help")) {
usage();
}
} else if (cmd.equals("close")) {
zk.close();
} else if (cmd.equals("sync") && args.length >= 2) {
path = args[1];
zk.sync(path, new AsyncCallback.VoidCallback() { public void processResult(int rc, String path, Object ctx) { System.out.println("Sync returned " + rc); } }, null );
} else if (cmd.equals("addauth") && args.length >=2 ) {
byte[] b = null;
if (args.length >= 3)
b = args[2].getBytes();
zk.addAuthInfo(args[1], b);
} else if (!commandMap.containsKey(cmd)) {
usage();
}
return watch;
}
Zookeeper类中的方法
接下来看一个Zookeeper类中的create 方法
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
// 请求参数
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
// 设置acl权限
request.setAcl(acl);
// 提交请求
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
public ReplyHeader submitRequest(RequestHeader h, Record request,Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 封装成packet,任务放到outgoingQueue队列里面
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
// 同步等待结果
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
之前提到上下文会创建sendThread与evenThread两个线程,接下来再来看一下这两个线程都做了什么,既然是线程那么就需要看run方法。
sendThread线程run方法:
@Override
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
InetSocketAddress serverAddress = null;
// 客户端alive时
while (state.isAlive()) {
try {
// 未连接
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
// 开始创建连接
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
// 已连接
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
// 传输方法 pendingQueue 等待结果的队列 outgoingQueue 需要发送的队列
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()),
serverAddress,
RETRY_CONN_MSG,
e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
开始连接startConnect()方法
private void startConnect(InetSocketAddress addr) throws IOException {
// initializing it for new connection
saslLoginFailed = false;
// 设置state为CONNECTING
state = States.CONNECTING;
setName(getName().replaceAll("\\(.*\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
if (ZooKeeperSaslClient.isEnabled()) {
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient = new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
// 连接
clientCnxnSocket.connect(addr);
}
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
// 注册连接
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
/*
* Reset incomingBuffer
*/
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
// 创建socket连接
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
// 次方法最终也会封装成package放到outgoingQueue队列中
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!disableAutoWatchReset) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
// 封装为package
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
// 添加到队列中
outgoingQueue.addFirst(packet);
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
返回到run方法查看调用的doTransport传输方法
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 处理读请求
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
// 处理写请求
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
// 根据是否有回调方法,判断是同步请求还是异步请求
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
// 异步请求放到waitingEvents 队列中,现在就要用到eventThread
eventThread.queuePacket(p);
}
}
下面跟踪一下eventThread线程,当然也是先看run方法:
@Override
public void run() {
try {
isRunning = true;
while (true) {
// take 方法取数据
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// event处理
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetDataResponse) p.response)
.getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx,
((SetACLResponse) p.response)
.getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null,
null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp
.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath()
.substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
if (rc == 0) {
List<OpResult> results = rsp.getResultList();
int newRc = rc;
for (OpResult result : results) {
if (result instanceof ErrorResult
&& KeeperException.Code.OK.intValue()
!= (newRc = ((ErrorResult) result).getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
}
整片内容比较多建议收藏起来看,源码方面的知识也比较枯燥一些。
本文暂时没有评论,来添加一个吧(●'◡'●)