ZooKeeper - Data Sync

以下描述中用zk代指ZooKeeper,源码解释均基于ZooKeeper 3.4.6

写在前面

好吧,这又是一篇:

不加全局理论抽象,局部解读具体逻辑细节的文章

然而,我目前的能力只能解决这些非常具体的问题。其实总结出来的文章并不能够给其它同学带来什么收益,真的想懂细节,确实只有“自己研读或者谷歌”

但是,我读完了代码,我会想:

  • 我能不能帮忙别人来加快理解这个问题呢?
  • 我这个理解是不是正确的呢,能不能publish出来让大家拍一拍呢?
  • 想给自己一个交代。我始终相信,写文章才能把头脑中可能忽略的细节都收集起来,才能发现思考时会遗漏的问题

当然,我非常认可需要做全局理论抽象,这是我目前的恐慌区,需要努力去跨越,自勉。

背景

继上次的Session问题后,好学的小冷同学又认真地研究了下ZooKeeper Cluster的原理,问了我下面5个问题:

  • ZooKeeper集群在发生Leader切换的时候,所有的Follower会选择新的Leader进行全量的数据同步吗?
  • 如果一次写入,由于丢包,导致某条日志没有写入,会怎么样呢?
  • 扩容的时候加了个节点,这时候新的写入会不会同步到这个新节点呢?
  • Follower会主动向Leaderping包么?
  • 集群在选举的时候,四字命令都会返回This ZooKeeper instance is not currently serving requests,什么时候会变回正常可服务状态?

这几个问题我都不太确定,于是,踏上了新一轮的啃码之旅

问题A

ZooKeeper集群在发生Leader切换的时候,所有的Follower会选择新的Leader进行全量的数据同步吗?

这篇帖子里面的解释是我比较认同:

1、SNAP-全量同步
条件:peerLastZxid<minCommittedLog
说明:证明二者数据差异太大,follower数据过于陈旧,leader发送快照SNAP指令给follower全量同步数据,即leader将所有数据全量同步到follower

2、DIFF-增量同步
条件:minCommittedLog<=peerLastZxid<=maxCommittedLog
说明:证明二者数据差异不大,follower上有一些leader上已经提交的提议proposal未同步,此时需要增量提交这些提议即可

3、TRUNC-仅回滚同步
条件:peerLastZxid>minCommittedLog
说明:证明follower上有些提议proposal并未在leader上提交,follower需要回滚到zxid为minCommittedLog对应的事务操作

4、TRUNC+DIFF-回滚+增量同步
条件:minCommittedLog<=peerLastZxid<=maxCommittedLog且特殊场景leader a已经将事务truncA提交到本地事务日志中,但没有成功发起proposal协议进行投票就宕机了;然后集群中剔除原leader a重新选举出新leader b,又提交了若干新的提议proposal,然后原leader a重新服务又加入到集群中,不管是否被选举为新leader。
说明:此时a,b都有一些对方未提交的事务,若b是leader, a需要先回滚truncA然后增量同步新leader a上的数据

对应的代码在LearnerHandler.run中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// packetToSend默认为Leader.SNAP
try {
rl.lock();
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizing with Follower sid: " + sid
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
// ZKDatabase.committedLog是仅存在于内存中的结构,虽然Follower或者Leader退出时,
// 会调用ZKDatabase.clear清空内存中的数据,但是FastLeaderElection.getInitLastLoggedZxid
// 最终会调用ZKDatabase.loadDataBase重新加载数据到ZKDatabase.committedLog等内存结构中
// 当然,ZKDatabase.loadDataBase是把日志文件中的内容加载加这个结构中
// 如果加载之前,对应的zk节点刚好是新生成的snapshot,就会导致ZKDatabase.committedLog为空
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
if (proposals.size() != 0) {
LOG.debug("proposal size is {}", proposals.size());
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
// 如果Follower的zxid在[minCommittedLog, maxCommittedLog],说明可能可以通过重新应用Follower上没有的事务日志来恢复数据
LOG.debug("Sending proposals to follower");
// as we look through proposals, this variable keeps track of previous
// proposal Id.
long prevProposalZxid = minCommittedLog;
// Keep track of whether we are about to send the first packet.
// Before sending the first packet, we have to tell the learner
// whether to expect a trunc or a diff
boolean firstPacket=true;
// If we are here, we can use committedLog to sync with
// follower. Then we only need to decide whether to
// send trunc or not
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;
for (Proposal propose: proposals) {
// 遍历ZKDatabase.committedLog中的事务日志,跳过那些已经在Follower上应用过的日志
// skip the proposals the peer already has
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else {
// If we are sending the first packet, figure out whether to trunc
// in case the follower has some proposals that the leader doesn't
if (firstPacket) {
firstPacket = false;
// Does the peer have some proposals that the leader hasn't seen yet
if (prevProposalZxid < peerLastZxid) {
// send a trunc message before sending the diff
// 如果出现这种情况,我们需要先在Follower上应用Leader.TRUNC,让其回滚到prevProposalZxid的位置
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
// 下面这几行语句是针对Follower上没有的事务日志构造出PROPOSAL和COMMIT请求,放到队列中
// 可以注意到,这个时候LearnerHandler的发送线程是还没有启动的,所以对应的FOLLOWER肯定是先响应Leader.DIFF或者Leader.TRUNC请求的
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
}
}
} else if (peerLastZxid > maxCommittedLog) {
// 如果Follower的zxid比当前的Leader还要大,发送Leader.TRUNC让Follower的事务日志回滚到当前Leader的maxCommittedLog
// 和Leader.SNAP一样,Leader.TRUNC会导致对应的Follower调用ZKDatabase.loadDataBase重新加载数据到内存中
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),
Long.toHexString(updates));
packetToSend = Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
// 进入这块代码的条件是peerLastZxid<minCommittedLog,使用默认的packetToSend:Leader.SNAP
// 这个条件有一个case:在zk集群中加入一个新的节点,这时候如果Leader 的事务日志中有内容,就会进入这个情况
LOG.warn("Unhandled proposal scenario");
}
} else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
// 如果ZKDatabase.committedLog为空,而且对应的Follower和当前Leader内存中的数据是一致的,只发送一个Leader.DIFF的请求
// 而不进行数据的同步,甚至也没有对应的PROPOSAL/COMMIT请求
// The leader may recently take a snapshot, so the committedLog
// is empty. We don't need to send snapshot if the follow
// is already sync with in-memory db.
LOG.debug("committedLog is empty but leader and follower "
+ "are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else {
// 如果ZKDatabase.committedLog为空,但是对应的Follower和当前Leader内存中的数据不一致
// 这时候会使用默认的packetToSend:Leader.SNAP
// just let the state transfer happen
LOG.debug("proposals is empty");
}
LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates);
} finally {
rl.unlock();
}

问题B

如果一次写入,由于丢包,导致某条日志没有写入,会怎么样呢?

先引入上次的Session问题中使用过的一张图:

Leader会向Follower发送ProposalCommit请求,其中,Follower收到Proposal请求之后,会写入日志;收到Commit请求之后,会更改内存中的DataTree

问题中提到的日志没有写入,也就是发送到某个FollowerProposal请求被丢弃了, 对应的Follower会是怎样一个逻辑呢?
回答这个问题,最直观的方法就是模拟场景,在实验中将Proposal请求丢弃,观察对应的FollowerLeader的表现。

那么,问题来了,如何模拟呢?
Jepsen中,使用的是iptables来模拟网络故障(周期性地丢包、网络分区等)。在我的模拟环境中,zk集群都是部署在本地,使用iptables来操作会比较繁琐,而且我的需求是精确地丢弃掉Proposal请求,而不是FollowerLeader之间发送的所有的请求。

最终还是使用了Byteman,对应的脚本如下:

1
2
3
4
5
6
7
8
9
RULE trace zk.skip_proposal_packet
CLASS org.apache.zookeeper.server.quorum.Follower
METHOD processPacket
AT ENTRY
IF $1.getType() == 2
DO
traceln("*** drop PROPOSAL packet");
return;
ENDRULE

使用Byteman做故障场景模拟并不是我的原创,Cassandra中使用了Byteman来做故障场景注入。这种方法的优点是可以完成代码级别在精确错误注入;缺点也很明显,需要待注入服务是运行在JVM之上的。

进行了场景模拟之后,发现被测的丢弃Proposal请求的Follower进入了LOOKING状态,然后重新加入了集群。原因是Leader主动断开了和Proposal的连接。

那么,为什么Follower丢弃Proposal请求会导致Leader主动断开了和Proposal的连接呢?
这个逻辑和LearnerHandler$SyncLimitCheck有关,Leader会定时去调用LearnerHandler.pingFollower发送Leader.PING请求,逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void ping() {
long id;
if (syncLimitCheck.check(System.nanoTime())) {
synchronized(leader) {
id = leader.lastProposed;
}
QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
queuePacket(ping);
} else {
LOG.warn("Closing connection to peer due to transaction timeout.");
shutdown();
}
}

如果Leader发送出去的Leader.PROPOSAL请求在一段时间内(这个时间由conf/zoo.cfg中的syncLimit决定)没有收到对应的ACK,就会导致syncLimitCheck.check失败,从而调用LearnerHandler.shutdown关闭到这个Follower的连接,并停止对应的发送、接收请求的线程。

Follower这边,由于Leader连接关闭,调用Learner.readPacket时会抛出异常,退出Follower.followLeader方法,重新进入LOOKING状态。

综上,我们知道了,发送到某个FollowerProposal请求被丢弃,会导致对应的Follower重新进入LOOKING状态。

那么,如果被丢弃的请求是Commit请求呢?

同样使用Byteman进行了模拟,由于Commit请求是不需要返回ACKLeader的,所以,如果模拟时有两个写入请求ReqAReqB,如果两个请求对应的Commit请求都丢弃了,这个时候其实对系统并没有什么影响,但是连接到对应Follower上的客户端看到的数据就是stale的。

如果丢弃ReqA对应的Commit请求之后就撤销故障场景,ReqB对应的Commit请求正常执行会是什么情况呢?对应的逻辑在FollowerZooKeeperServer.commit中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 收到Leader发送的Leader.COMMIT请求之后,会调用这个方法
public void commit(long zxid) {
// FollowerZooKeeperServer.logRequest中会添加条目到pendingTxns中
// 这个field保存了所有调用了syncProcessor.processRequest,但是没有收到
// Commit请求的Request
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
// 如果收到的Commit请求中的zxid和pendingTxns中第一个请求的zxid是不一样的
// 打印Error级别的日志并退出java进程
// 在我们的模拟情况中,pendingTxns中第一个请求是ReqA
// 所以当ReqB对应的Commit请求被Follower收到时,会进入到这个逻辑
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
// 如果是匹配的,就从pendingTxns中移除这个请求
// 并执行commitProcessor.commit
Request request = pendingTxns.remove();
commitProcessor.commit(request);
}

是的,对应的Follower进程退出了。

问题C

扩容的时候加了个节点,这时候新的写入会不会同步到这个新节点呢?

会的。虽然这时候Leaderconf/zoo.cfg里面还没有新加入节点的信息,但是Leader会为这个节点创建相应的LearnerHandler,对应的逻辑在Leader$LearnerCnxAcceptor.run中:

1
2
3
4
5
6
7
Socket s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
LearnerHandler fh = new LearnerHandler(s, Leader.this);
fh.start();

新加入的节点也会经历如下的阶段:

上图中的步骤是在Learner.registerWithLeaderLearner.syncWithLeader中完成的,也是新加入节点从Leader中同步数据的步骤。再看看Leader是如何把增量的数据同步到Follower的。

LeaderFollower发送请求的方法Leader.sendPacket实现如下:

1
2
3
4
5
6
7
void sendPacket(QuorumPacket qp) {
synchronized (forwardingFollowers) {
for (LearnerHandler f : forwardingFollowers) {
f.queuePacket(qp);
}
}
}

新加入节点的LearnerHandler是在LearnerHandler.run中通过调用Leader.startForwarding加入到Leader.forwardingFollowers中的,加入之后Leader就会开始同步数据到新的节点了。

那么,在计算QuorumVerifier.containsQuorum的时候,会涉及到新加入的节点么?
答案是,某种程度上,会

验证Ping或者Proposal是否达到大多数的逻辑是在QuorumPeer.quorumConfig中实现的。
QuorumPeer.quorumConfig这个field对应的类型是QuorumVerifier这个接口,这个接口有两个实现:QuorumMajQuorumHierarchical,我们的部署比较简单,没有weight相关的配置,所以使用的实现都是QuorumMaj
QuorumMaj中有一个fieldhalf,标志着这个集群里面半数的值(对于3节点的集群,half1;对于4节点的集群,half2;依此类推);对应的containsQuorum方法实现也非常简单粗暴:

1
2
3
public boolean containsQuorum(HashSet<Long> set){
return (set.size() > half);
}

由于QuorumPeer.setQuorumVerifier只有在节点启动的时候才会被调用,所以QuorumMaj.half的值在节点启动之后就不会改变。
如果在一个3节点zk0zk1zk2)的集群中,扩容一个新节点zk3。在没有启动集群原有3节点的情况下,Leader中的QuorumMaj.half会一直为1,只是这时候,containsQuorum方法的输入可能是一个大小为4的集合。
可以理解为Leader中判断Proposal是否达到大多数的标准是没有变化的,但是输入产生了变化。

问题D

Follower会主动向Leaderping包么?

不会,FollowerLeader发的Leader.PING包只是response,并没有线程会定期向Leader来发。那么,这时候会有个问题,如果Follower长时间没有收到Leader发的Leader.PING请求会怎么样呢?

依然是使用BytemanLeader发送给FollowerLeader.PING请求给丢弃掉,对应的脚本如下:

1
2
3
4
5
6
7
8
9
RULE trace zk.skip_ping
CLASS org.apache.zookeeper.server.quorum.LearnerHandler
METHOD ping
AT ENTRY
IF $0.sid == 2
DO
traceln("*** drop ping packet to sid: 2");
return;
ENDRULE

可以看到对应的Follower会不断进入LOOKING状态,连上Leader之后相隔10s就会有如下日志,错误为Read timed out

1
2
3
4
5
6
7
8
2017-06-12 15:19:34,963 [myid:2] - INFO - Snapshotting: 0x100000000 to data/version-2/snapshot.100000000
2017-06-12 15:19:44,973 [myid:2] - WARN - Exception when following the leader
Exception when following the leader
java.net.SocketTimeoutException: Read timed out
...
at org.apache.zookeeper.server.quorum.Learner.readPacket(Learner.java:153)
at org.apache.zookeeper.server.quorum.Follower.followLeader(Follower.java:85)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:786)

为什么是10s这个时间呢?
原因在于Leader.syncWithLeader这个方法中,在收到Leader.UPTODATE后,会调用:

1
sock.setSoTimeout(self.tickTime * self.syncLimit);

conf/zoo.cfg中,我们目前的配置为:

1
2
3
4
5
6
7
8
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

因此,和Leader建立的socket的读写超时时间为2000ms * 5 = 10s

不止Follower会超时断连,Leader的日志显示Leader也会出现读超时:

1
2
3
4
5
6
2017-06-12 15:31:15,988 [myid:1] - INFO - Received NEWLEADER-ACK message from 2
2017-06-12 15:31:25,994 [myid:1] - ERROR - Unexpected exception causing shutdown while sock still open
java.net.SocketTimeoutException: Read timed out
...
at org.apache.jute.BinaryInputArchive.readRecord(BinaryInputArchive.java:103)
at org.apache.zookeeper.server.quorum.LearnerHandler.run(LearnerHandler.java:546)

然后,关闭对应的LearnerHandler

问题E

集群在选举的时候,四字命令都会返回This ZooKeeper instance is not currently serving requests,什么时候会变回正常可服务状态?

先看一下四字命令返回ZK_NOT_SERVING的逻辑,以mntr这个四字命令为例,对应的代码在NIOServerCnxn$MonitorCommand.commandRun中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private class MonitorCommand extends CommandThread {
MonitorCommand(PrintWriter pw) {
super(pw);
}
@Override
public void commandRun() {
if(zkServer == null) {
pw.println(ZK_NOT_SERVING);
return;
}
...
}
...
}

NIOServerCnxn这个对象是每个连接都会创建一个的,创建NIOServerCnxn对象的逻辑在NIOServerCnxnFactory.createConnection中:

1
2
3
4
protected NIOServerCnxn createConnection(SocketChannel sock,
SelectionKey sk) throws IOException {
return new NIOServerCnxn(zkServer, sock, sk, this);
}

NIOServerCnxn构造方法参数里面使用的是ServerCnxnFactory.zkServer,那么,ServerCnxnFactory.zkServer是在什么时候设置的呢?分别看下FollowerLeader中对应的逻辑。

Follower

Follower会在收到Leader发送的Leader.UPTODATE之后去设置,对应的逻辑在Learner.syncWithLeader中,调用ServerCnxnFactory.setZooKeeperServer来设置ServerCnxnFactory.zkServer

1
2
3
4
5
6
7
case Leader.UPTODATE:
if (!snapshotTaken) { // true for the pre v1.0 case
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;

Leader

Leader会在Leader.leaderwaitForNewLeaderAck之后去设置,对应的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
try {
waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT);
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ getSidSetString(newLeaderProposal.ackSet) + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for (LearnerHandler f : learners)
followerSet.add(f.getSid());
if (self.getQuorumVerifier().containsQuorum(followerSet)) {
LOG.warn("Enough followers present. "
+ "Perhaps the initTicks need to be increased.");
}
Thread.sleep(self.tickTime);
self.tick++;
return;
}
startZkServer();
...
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.cnxnFactory.setZooKeeperServer(zk);
}

代码里面可以看到,调用ServerCnxnFactory.setZooKeeperServer的前提是zookeeper.leaderServes这个属性是设置为true的(默认为true)。这个属性的含义是,Leader节点是否接受客户端的请求。

总结

其实这些细节在实际维护中应用到的比较少,维护中会遇到的问题可能是“我的snap太多了,应该怎么清理”、“我想给某个包加个自定义的日志级别怎么办”、“我的Curator报这个错是什么意思”。虽然如此,我比较认可的观点仍然是:

维护开源产品不了解源码,或者没有找到看的有效入口,是很被动的,缺少定位解决问题的根本手段

有了源码定位以及相关工具的经验,遇到问题才不会轻易、不会轻易炸毛、不会轻易甩锅。当这些代码不再是坨翔而是My precious时,解决问题就变成一种愉悦的体验了。

参考