ZooKeeper - Session Lifetime

以下描述中用zk代指ZooKeeper,源码解释均基于ZooKeeper 3.4.6

背景

小冷同学上周三问了我两个问题:

  • ZooKeeper Session在集群间传递吗?
  • Sessionexpire是由Leader执行的,还是每个节点自己根据时间判断?

第一个问题,必须的必啊,之前使用LogFormatter查看zk日志时:

1
2
3
4
5
java -cp zookeeper-3.4.6.jar:lib/log4j-1.2.16.jar:lib/slf4j-log4j12-1.6.1.jar:lib/slf4j-api-1.6.1.jar org.apache.zookeeper.server.LogFormatter $logfile
...
5/21/17 9:26:48 PM CST session 0x25c2a3ce5610001 cxid 0x0 zxid 0x20000000b createSession 30000
...

看到了包含createSession请求的条目,所以比较确定创建Session的数据也是持久化的,而不仅仅是一个运行时的数据,这个也是zk比较奇特的一点。

对于第二个问题,还真不太清楚,只好和小冷同学约定,周末再给他答复。

问题本身

Sessionexpire是由Leader执行的,还是每个节点自己根据时间判断?

如果仅仅是这一个问题,还是比较容易回答的:Sessionexpire是由Leader执行的。

Leader进程进行jstack,结果截取如下:

1
2
3
4
5
6
7
...
"SessionTracker" #28 prio=5 os_prio=31 tid=0x00007fe81e996800 nid=0x1133 in Object.wait() [0x000070001083e000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.apache.zookeeper.server.SessionTrackerImpl.run(SessionTrackerImpl.java:146)
- locked <0x000000076cf1b738> (a org.apache.zookeeper.server.SessionTrackerImpl)
...

可以看到,Leader节点会有一个名为SessionTracker的线程执行SessionTrackerImpl.run方法。

SessionTrackerImpl.run方法具体做的事情如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
synchronized public void run() {
try {
while (running) {
// 获取当前时间,如果nextExpirationTime比当前时间要大,sleep掉这个gap,重新进入循环
currentTime = System.currentTimeMillis();
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
// sessionSets是一个Map,key是一个时间戳t1,value是一个session的集合,
// 集合里面session的过期时间都为t1
SessionSet set;
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
// 设置session的状态为closing
setSessionClosing(s.sessionId);
// 调用expirer.expire来关闭session,这个时候会把closeSession的日志同步到其它节点
// 会有Info级别日志:“Expiring session ... , timeout of xxx ms exceeded” 产生
expirer.expire(s);
}
}
// 更新nextExpirationTime,expirationInterval就是conf/zoo.cfg里面定义的tickTime
// 所以删除过期session的精度就是tickTime
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
LOG.error("Unexpected interruption", e);
}
LOG.info("SessionTrackerImpl exited loop!");
}

问题回答到这里,显然是不够的。从我的角度,能看到延伸问题如下:

  • sessionId是如何构造的,如何保证唯一性
  • 什么请求会触发更新Session的过期时间
  • 如果客户端是连接到Follower的,Leader如何更新Session的过期时间
  • 如果发生了重新选举,Leader更换之后,新的Leader是如何导入Session信息的
  • SessionEphemeral Node是如何结合的
  • 清除过期Session的时候,会删除掉对应的Ephemeral Node
  • createSession操作完整的处理流程是怎样的

延伸问题

问题A

sessionId是如何构造的,如何保证唯一性

zk日志中createSession时会打印出如下的日志:

1
Established session 0x15c30814aef0000 with negotiated timeout 30000 for client /x.x.x.x:60915

上面这条日志中,sessionId0x15c30814aef0000。在zk日志中,sessionId的类型为long,那么这个64位的数据是如何组成的,如何保证唯一性的呢?

依然是SessionTrackerImpl这个类中,有initializeNextSession这个方法:

1
2
3
4
5
6
public static long initializeNextSession(long id) {
long nextSid = 0;
nextSid = (System.currentTimeMillis() << 24) >>> 8;
nextSid = nextSid | (id <<56);
return nextSid;
}

所以,sessionId的构造如下:

1
2
3
4
|63...56|55...................................16|15............0|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| myid | timestamp | counter |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

对于sessionId这个64位的数据,高8位代表创建Session时所在的zk节点的id中间40位代表zk节点当前角色(Leader或者Learner)在创建的时候的时间戳;低16位是一个计数器,初始值为0

再看看上面日志里面的sessionId0x15c30814aef0000

  • 高8位0x1,代表这个Session是在myid=1zk节点上面创建的
  • 中间40位0x5c30814aef,代表myid=1zk节点在初始化的时候时间戳的低40位0x5c30814aef
  • 低16位0x0000,代表这是myid=1zk节点在当前状态创建的第1Session

每个角色(Leader或者Learner)在构造的时候,都会调用createSessionTracker来创建一个SessionTracker对象。
这时候就会调用SessionTrackerImpl. initializeNextSession来设置nextSessionId
初始设置完之后,每次通过createSession来获取sessionId时,所做的动作仅仅是进行nextSessionId++

问题B

什么请求会触发更新Session的过期时间

客户端的任何请求都会触发更新Session的过期时间,包括客户端维持心跳的ping请求。
Session过期时间的更新是在ZooKeeperServer.touch中进行的,通过BTrace可以拿到ZooKeeperServer.touch的调用栈如下:

1
2
3
4
5
6
7
8
org.apache.zookeeper.server.ZooKeeperServer.touch(ZooKeeperServer.java)
org.apache.zookeeper.server.ZooKeeperServer.submitRequest(ZooKeeperServer.java:667)
org.apache.zookeeper.server.ZooKeeperServer.processPacket(ZooKeeperServer.java:942)
org.apache.zookeeper.server.NIOServerCnxn.readRequest(NIOServerCnxn.java:373)
org.apache.zookeeper.server.NIOServerCnxn.readPayload(NIOServerCnxn.java:200)
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:244)
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
java.lang.Thread.run(Thread.java:745)

除了四字命令,客户端所有的访问操作都会触发Session更新过期时间。
再看看ZooKeeperServer.touch都会做哪些事情:

1
2
3
4
5
6
7
8
9
10
11
12
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
}
long id = cnxn.getSessionId();
int to = cnxn.getSessionTimeout();
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException(
"No session with sessionid 0x" + Long.toHexString(id)
+ " exists, probably expired and removed");
}
}

ZooKeeperServer.touch其实只是调用了sessionTracker.touchSession来进行Session过期时间的更新。对于LeaderLearner这两个不同的角色,使用的sessionTracker的实现是不同的:

  • Leader使用的是SessionTrackerImpl
  • Learner使用的是LearnerSessionTracker

更新Session的过期时间其实都是在Leader中进行的,毕竟expire操作也是由Leader来执行的,所以这里只看SessionTrackerImpltouchSession的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
synchronized public boolean touchSession(long sessionId, int timeout) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.CLIENT_PING_TRACE_MASK,
"SessionTrackerImpl --- Touch session: 0x"
+ Long.toHexString(sessionId) + " with timeout " + timeout);
}
// 根据sessionId来获取session结构,如果不存在,或者已经是closing状态,说明对应的session已经过期
// 返回false
SessionImpl s = sessionsById.get(sessionId);
if (s == null || s.isClosing()) {
return false;
}
// 如果session存在,计算出这个session下一次的过期时间
// roundToInterval保证计算出来的过期时间会是SessionTrackerImpl.expirationInterval的整数倍
// 像前面提到的,SessionTrackerImpl.expirationInterval的值就是conf/zoo.cfg里面定义的tickTime
long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
// 如果session当前的过期时间比计算出来的时间还要大,直接返回true,这个可能是因为session的timeout设置变小了
if (s.tickTime >= expireTime) {
return true;
}
// 下面的操作就是更新sessionSets,这样就不会在之前的过期时间s.tickTime的时候被过期掉了
SessionSet set = sessionSets.get(s.tickTime);
if (set != null) {
set.sessions.remove(s);
}
// 更新session的过期时间
s.tickTime = expireTime;
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}

问题C

如果客户端是连接到Follower的,Leader如何更新Session的过期时间

刚才问题B里面提到了Learner使用的是LearnerSessionTracker,所以再来看一下LearnerSessionTracker.touchSession的实现:

1
2
3
4
synchronized public boolean touchSession(long sessionId, int sessionTimeout) {
touchTable.put(sessionId, sessionTimeout);
return true;
}

似不似很简单,只有一次HashMap操作;touchTable会在LearnerSessionTracker.snapshot中使用:

1
2
3
4
5
synchronized HashMap<Long, Integer> snapshot() {
HashMap<Long, Integer> oldTouchTable = touchTable;
touchTable = new HashMap<Long, Integer>();
return oldTouchTable;
}

通过BTrace可以拿到LearnerSessionTracker.snapshot的调用栈如下:

1
2
3
4
5
6
org.apache.zookeeper.server.quorum.LearnerSessionTracker.snapshot(LearnerSessionTracker.java)
org.apache.zookeeper.server.quorum.LearnerZooKeeperServer.getTouchSnapshot(LearnerZooKeeperServer.java:58)
org.apache.zookeeper.server.quorum.Learner.ping(Learner.java:525)
org.apache.zookeeper.server.quorum.Follower.processPacket(Follower.java:112)
org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:86)
org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:786)

可以看到Follower会在收到Leader发送过来的ping请求之后,把touchTable中的内容放在response之中,传回给Leader
那么Leader收到Followerping response之后会怎么处理呢?
对应的逻辑在LearnerHandler.run中:

1
2
3
4
5
6
7
8
9
10
11
12
13
case Leader.PING:
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
// sess为follower传过来的sessionId
long sess = dis.readLong();
// to为follower传过来的sessionId对应的timeout
int to = dis.readInt();
// 这里调用ZooKeeperServer.touch来更新session过期时间
leader.zk.touch(sess, to);
}
break;

可以看到,Leader会把回包中的sessionIdtouch一遍。

LeaderFollower进行ping的时间间隔是多少呢?
如果间隔太大,可能导致LeaderSession的过期时间更新得不及时,导致Session信息被删除掉。

对应的逻辑在Leader.lead中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
while (true) {
// 等待 self.tickTime / 2 的时间
// 以tickTime为2s为例,这里leader对follower进行ping的时间间隔为1s
// 前面提到了session的过期时间会是expirationInterval的整数倍,expirationInterval就是tickTime
// 也就是说在leader连续两次检查session过期的间隔期间,至少会对follower进行一次ping操作
Thread.sleep(self.tickTime / 2);
...
// 这里会对所有的Learner进行ping
for (LearnerHandler f : getLearners()) {
// Synced set is used to check we have a supporting quorum, so only
// PARTICIPANT, not OBSERVER, learners should be used
if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
syncedSet.add(f.getSid());
}
f.ping();
}
...
}

从代码中可以看到,ping的间隔足够小,不会导致LeaderSession的过期时间更新不及时。

问题D

如果发生了重新选举,Leader更换之后,新的Leader是如何导入session信息的

奥秘就在SessionTrackerImpl的构造方法里面:

1
2
3
4
5
6
7
8
9
10
11
12
public SessionTrackerImpl(SessionExpirer expirer,
ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
long sid)
{
...
this.sessionsWithTimeout = sessionsWithTimeout;
nextExpirationTime = roundToInterval(System.currentTimeMillis());
this.nextSessionId = initializeNextSession(sid);
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
}

可以看到,SessionTrackerImpl会遍历参数里面的sessionsWithTimeout,调用addSession重建sessionsByIdsessionSetssessionsWithTimeout这些用来管理session的数据结构。
Leader上位会大赦天下,在addSession里面把所有的session全部touch一遍。

再回过头去看,新Leader是如何去创建SessionTrackerImpl对象的:

1
2
3
4
5
6
org.apache.zookeeper.server.SessionTrackerImpl.<init>(SessionTrackerImpl.java:97)
org.apache.zookeeper.server.quorum.LeaderZooKeeperServer.createSessionTracker(LeaderZooKeeperServer.java:81)
org.apache.zookeeper.server.ZooKeeperServer.startup(ZooKeeperServer.java:405)
org.apache.zookeeper.server.quorum.Leader.startZkServer(Leader.java:947)
org.apache.zookeeper.server.quorum.Leader.lead(Leader.java:418)
org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:799)

具体是在ZooKeeperServer.createSessionTracker中创建SessionTrackerImpl对象的。
ZooKeeperServer.createSessionTracker逻辑如下:

1
2
3
4
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1);
}

那么,zkDb.getSessionWithTimeOuts()是如何通过现在的数据进行sessionsWithTimeout的重建的呢?
逻辑在ZKDatabase. loadDataBase中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public long loadDataBase() throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
null, null);
r.txn = txn;
r.hdr = hdr;
r.zxid = hdr.getZxid();
addCommittedProposal(r);
}
};
long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
initialized = true;
return zxid;
}

综上,新Leader在进行Leader.lead()时,会先调用zk.loadData()把数据从持久化文件(snapshot/log)中恢复出sessionsWithTimeout,然后调用startZkServer创建SessionTrackerImpl重构session相关的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void lead() throws IOException, InterruptedException {
self.end_fle = System.currentTimeMillis();
LOG.info("LEADING - LEADER ELECTION TOOK - " +
(self.end_fle - self.start_fle));
self.start_fle = 0;
self.end_fle = 0;
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick = 0;
zk.loadData();
...
startZkServer();
...
} finally {
zk.unregisterJMX(this);
}
}

问题E

SessionEphemeral Node是如何结合的

PrepRequestProcessor.pRequest2Txn中,可以看到下面的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
case OpCode.create:
...
// 首先判断创建的Znode的parent是否为ephemeral,如果是,直接抛出异常
// 因为 Ephemerals cannot have children,临时节点是不能有子节点的
boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
// 更新新的cversion
int newCversion = parentRecord.stat.getCversion()+1;
request.txn = new CreateTxn(path, createRequest.getData(),
listACL,
createMode.isEphemeral(), newCversion);
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
// 这里会关连ephemeral node和相关的session信息
s.setEphemeralOwner(request.sessionId);
}
// 深拷贝出来一个parentRecord,设置一些相关的信息
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
0, listACL));
break;

Znode创建的过程,就会设置Znode对应的Ephemeral Owner

问题F

清除过期Session的时候,会删除掉对应的Ephemeral Node

清除过期Session,最终会调用关闭Session的操作,ZooKeeperServer.close的逻辑如下:

1
2
3
private void close(long sessionId) {
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}

再看看zk请求收到OpCode.closeSession时,是如何处理的。
同样在PrepRequestProcessor.pRequest2Txn中(PrepRequestProcessor处理了很多与zk api语义相关的逻辑),可以看到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
case OpCode.closeSession:
// We don't want to do this check since the session expiration thread
// queues up this operation without being the session owner.
// this request is the last of the session so it should be ok
//zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
HashSet<String> es = zks.getZKDatabase()
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
path2Delete, null, 0, null));
}
zks.sessionTracker.setSessionClosing(request.sessionId);
}

可以看到:
Leader收到OpCode.closeSession请求之后(PrepRequestProcessor只会存在于Leader中),会找出zKDatabase中所有和这个Session相关的Ephemeral Node的路径。
另外,还会找出zks.outstandingChanges里面,EphemeralOwner设置为当前session的所有路径。
然后把所有这些路径都调用addChangeRecord添加到zks.outstandingChanges中。

但是这些并非真正地应用到内存中的DataTree上,真正的删除节点的操作并不在这里。之前在zk日志里面发现过这样的记录:

1
Deleting ephemeral node /mypath for session 0x153501f0a4a05cb

这行日志打印由方法DataTree.killSession打印的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void killSession(long session, long zxid) {
// the list is already removed from the ephemerals
// so we do not have to worry about synchronizing on
// the list. This is only called from FinalRequestProcessor
// so there is no need for synchronization. The list is not
// changed here. Only create and delete change the list which
// are again called from FinalRequestProcessor in sequence.
HashSet<String> list = ephemerals.remove(session);
if (list != null) {
for (String path : list) {
try {
deleteNode(path, zxid);
if (LOG.isDebugEnabled()) {
LOG
.debug("Deleting ephemeral node " + path
+ " for session 0x"
+ Long.toHexString(session));
}
} catch (NoNodeException e) {
LOG.warn("Ignoring NoNodeException for path " + path
+ " while removing ephemeral for dead session 0x"
+ Long.toHexString(session));
}
}
}
}

调用栈如下:

1
2
3
4
5
6
7
org.apache.zookeeper.server.DataTree.killSession(DataTree.java)
org.apache.zookeeper.server.DataTree.processTxn(DataTree.java:818)
org.apache.zookeeper.server.ZKDatabase.processTxn(ZKDatabase.java:329)
org.apache.zookeeper.server.ZooKeeperServer.processTxn(ZooKeeperServer.java:994)
org.apache.zookeeper.server.FinalRequestProcessor.processRequest(FinalRequestProcessor.java:116)
org.apache.zookeeper.server.quorum.Leader$ToBeAppliedRequestProcessor.processRequest(Leader.java:644)
org.apache.zookeeper.server.quorum.CommitProcessor.run(CommitProcessor.java:74)

删掉Ephemeral Node到底意味着什么,和删除Persistent Node是一样的么?
关闭Session的时候删除Ephemeral Node会应用到日志里面么?

带着上面的疑问,使用zkCli.sh连接到server,创建Ephemeral Node,然后退出。
观察到log如下:

1
2
3
5/24/17 1:57:56 PM CST session 0x15c38d9b8000001 cxid 0x0 zxid 0x100000004 createSession 30000
5/24/17 1:58:20 PM CST session 0x15c38d9b8000001 cxid 0x1 zxid 0x100000005 create '/ephemeral,#6b6b6b,v{s{31,s{'world,'anyone}}},T,2
5/24/17 1:58:53 PM CST session 0x15c38d9b8000001 cxid 0x2 zxid 0x100000006 closeSession null

可见,关闭Session的时候删除Ephemeral Node并不会应用到日志中,只会从内存的DataTree中删除对应的数据。删除DataTree中数据的逻辑在DataTree.deleteNode
关于DataTree,我觉得有一个很有意思的地方,从DataTreejavadoc里面可以看到:

1
2
3
* The tree maintains two parallel data structures: a hashtable that maps from
* full paths to DataNodes and a tree of DataNodes. All accesses to a path is
* through the hashtable. The tree is traversed only when serializing to disk.

javadoc说的是,维护了两个数据结构,一个是全路径到DataNode的映射(DataTree.nodes这个field),另外一个是所有DataNodetreeDataTree.root这个field)。然而,这个tree呢,和我们传统意义上的tree(至少我们写二叉树实现的时候)是不太一样的:DataNode里面并没有指针/对象指向所有的子节点,仅仅有所有子节点的路径。
所以节点对象的定位,都是通过DataTree.nodes来查找的。所以删除Ephemeral Node,只需要删除这个ZnodeDataTree.nodes中的条目即可。

问题G

createSession操作完整的处理流程是怎样的

这个问题再描述具体点:如果应用使用zk客户端连接到zk集群的一个Follower结点,那么会是一个什么逻辑呢?

Follower的处理

请求到达Follower后,Follower会调用ZooKeeperServer.submitRequest,然后会调用firstProcessor.processRequest,对于Follower来说,ZooKeeperServer的实现是FollowerZooKeeperServer,这个实现里面的firstProcessorFollowerReqeustProcessor

对于不同的ZooKeeperServer子类来说,比较重要的是
setupRequestProcessors这个方法,setupRequestProcessors这个方法会去生成某个角色的处理链,StandaloneFollowerLeader这三种角色的处理链都是各有不同的(Observer这个角色在我们的部署中没有,暂时偷懒忽略:))。

对于Follower来说,主线处理链是:
FollowerReqeustProcessor => CommitProcessor => FinalRequestProcessor

另外,还有一条辅线处理链:
SyncReqeustProcessor => SendAckRequestProcessor

下面逐个来讲解下这几种RequestProcessor

主线处理链

FollowerReqeustProcessorFollower专有的RequestProcessor,会做两件事情:

  • 把收到的情况一股脑儿地传给nextProcessor,也就是CommitRequestProcessor
  • 调用zks.getFollower().request(request)把写请求转发给Leader

CommitProcessor比较重要,followerleader的处理链都有它。
它的名字比较特殊,RequestProcessor接口所有的实现里面,就它名字特殊,其它实现类名的suffix都是RequestProcessor,就它不是,当然,这是我纯扯淡:)

CommitProcessor里面有两个队列:queuedRequestscommittedRequestsqueuedRequests里面是所有应用过来的请求,committedRequests里面是所有已经被committed的请求。只有CommitProcessor.commit这个方法会往committedRequests这个队列添加元素,而CommitProcessor.commit的调用有以下两个地方:

  • Follower中,Follower.processPacket方法中收到Leader发过来的Leader.COMMIT会调用FollowerZooKeeperServer.commit,然后会调用CommitProcessor.commit
  • Leader中,Leader.processAck中如果收到Ack的个数达到了大多数会调用CommitProcessor.commitLeader.processAck有两个调用的地方:
    • 处理Leader自己的Ack,在AckRequestProcessor.processRequest中(AckRequestProcessorleader专有的)会调用
    • 处理Follower回复的Ack,在LearnerHandler.run中处理Follower发来的Leader.ACK请求时会调用

好,扯远了,收回来。CommitProcessor.runloop逻辑如下:

  • toProcess队列里面的所有请求调用nextProcessor.processRequest,也就是FinalRequestProcessor.processRequesttoProcess队列只有读请求和已经committed的写请求
  • 如果committedRequests中有请求,就把这个请求拉出来,和当前的nextPendingnextPending可以理解为当前正在等commit的写请求)对比,如果匹配,就把nextPending放入toProcess队列中,并清空nextPending
  • 如果nextPending不为空,说明有写请求还在等commit,不用处理queuedRequests队列里面的请求了,重新进入loop
  • 处理queuedRequests队列里面的请求,如果是写操作,设置nextPending,重新进入loop;如果是读操作,添加到toProcess队列中,有多少添加多少

通过以上逻辑可以看到写请求是会阻塞后面的读请求的,所以,如果对一致性要求不是那么强的读请求,zk的访问有必要做读写分离呢?


FinalRequestProcessor也是一个比较重要的角儿,StandaloneFollowerLeader这三种角色的处理链中都有它的存在,到达这个RequestProcessor的请求和放在CommitProcessor.toProcess一样,只有读请求和已经committed的写请求。

FinalRequestProcessor会做两件事情:

  • 操作ZooKeeperServer.zkDb
  • 返回response给客户端

辅线处理链

SyncReqeustProcessor也是比较重要的,和FinalRequestProcessor一样,也是StandaloneFollowerLeader这三种角色都会有的。这个RequestProcessor其实只做一件事情:写日志,可以认为是两阶段提交的第一阶段。

SyncReqeustProcessor.processRequest有两个调用的地方:

  • Follower中,Follower.processPacket方法中收到Leader发过来的Leader.PROPOSAL会调用FollowerZooKeeperServer.logRequest,然后会调用SyncReqeustProcessor.processRequest
  • Leader中,ProposalRequestProcessor.processRequest对于写操作,不仅会zks.getLeader().propose(request)通知所有Follower去写日志,还会调用SyncReqeustProcessor.processRequest来写入Leader自己的本地日志。

SendAckRequestProcessorFollower专有的RequestProcessor,也是唯一实现了Flushable接口的RequestProcessor。做的事情比较纯粹,给Leader回一个Leader.ACKQuorumPacket


Leader的处理

上述Follower的处理中,我们提到了FollowerRequestProcessor会:

调用zks.getFollower().request(request)把写请求转发给Leader

从这一刻开始,Leader的处理开始登上历史舞台。
LearnerHandler.run中收到Leader.REQUEST,会调用leader.zk.submitRequest(si),代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
// 对于OpCode.sync请求,会创建一个不一样的包,LearnerSyncRequest
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
// 这里会调用submitRequest,让LeaderZooKeeperServer来处理请求
leader.zk.submitRequest(si);
break;

上面讲Follower的处理时,提到了:

请求到达Follower后,Follower会调用ZooKeeperServer.submitRequest,然后会调用firstProcessor.processRequest

LeaderFollower一样,都会调用ZooKeeperServer.submitRequest,这里面逻辑是一样的,区别在于处理链不一样。Leader的处理链是所有角色中最复杂的,涉及到7RequestProcessor,如下图所示:

CommitProcessorSyncRequestProcessorFinalRequestProcessor都是老熟人了,在上面刚见过。下面介绍下另外的4RequestProcessor


PrepRequestProcessor并非是Leader专有的,Standalone模式也会有。这个RequestProcessor也是所有RequestProcessor实现中最复杂的,从这个类对应代码文件的行数就可以看出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
find * -name '*Processor.java' |xargs wc -l |sort -k 1
44 src/java/main/org/apache/zookeeper/server/RequestProcessor.java
48 src/java/main/org/apache/zookeeper/server/UnimplementedRequestProcessor.java
54 src/java/main/org/apache/zookeeper/server/quorum/AckRequestProcessor.java
80 src/java/main/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java
93 src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
112 src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
126 src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
128 src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
192 src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
235 src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
418 src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
766 src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

复杂也是正常的,毕竟与zk api语义相关的逻辑基本都在这里实现的。
PrepRequestProcessor.pRequest是这个RequestProcessor最重要的实现,这个方法里面做两件事情:

  • 调用pRequest2Txn设置部分请求的request.hdrrequest.txnpRequest2Txn里面会完成一些接口语义相关的逻辑,比如上面提到的,收到OpCode.create请求时会去设置Ephemeral Owner
  • 调用nextProcessor.processRequest,也就是ProposalRequestProcessor.processRequest

ProposalRequestProcessorLeader专有的RequestProcessor,会做三件事情:

  • 调用nextProcessor.processRequest,也就是CommitProcessor.processRequest
  • 对于设置了request.hdr的请求,调用zks.getLeader().propose(request)向所有的Follower发送Leader.PROPOSAL请求。上面的描述中提到了Follower收到Leader发送过来的Leader.PROPOSAL请求后,最终会调用SyncReqeustProcessor.processRequest去写入日志。那么Leader自己的日志什么时候写呢,就在下一步了。
  • 对于设置了request.hdr的请求,调用syncProcessor.processRequest来向Leader自己的日志里写入记录。这里会有疑问,什么是”设置了request.hdr的请求”呢?除了createdeletesetDatasetACLmulticreateSessioncloseSession这些常见的写操作之外,还有一个check

ToBeAppliedRequestProcessorLeader专有的RequestProcessor,也是唯一一个内部类的RequestProcessor。做的事情非常简单,只是在CommitProcessorFinalRequestProcessor之前做个桥接。唯一的作用在于维护一个toBeApplied的队列,这个队列里面包括了已经达到quorum,但是还没有应用到FinalRequestProcessorProposal


AckRequestProcessorLeader专有的RequestProcessor,和Follower专有的SendAckRequestProcessor长得很像,做的逻辑也比较类似。调用leader.processAck,相当于写一个本地的Ack


综上所述,CreateSession操作完整的处理流程如下图:

选做题

分享这个问题的时候遇到两个问题,我不太能回答上来,想请大家帮我解答一下:

  • 为什么不能统一地从LeadersessionId?其实想想,也是可以的,只是createSessionsynchronized,在Follower上操作能稍微提高一些并发。
  • 为什么大量使用synchronized,而不是使用锁?zk的代码里面,确实是非常大量地使用synchronized。只是因为zk不是很注重性能,使用synchronized会使代码看起来更易懂一些么。

总结

解释了这几个延伸问题之后,小冷同学的疑问才算是比较完整地解决了,小冷同学表示解答比较符合期望(其实他没表示,都是我YY的)。
Trouble shooting driven source reading,带着具体问题去看代码,是一种能有短期反馈的读源码的方式,比较适合我,也推荐给大家。
当然,看代码是目的是为了之后改代码、优化代码,Writting is always more than reading

参考