作者:京东物流 梁吉超
zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构。
01Leader选举机制
Leader选举机制采用半数选举算法。
每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。
02Leader选举集群配置
- 重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg
- 修改zoo.cfg文件,修改值如下:
【plain】
zoo1.cfg文件内容:
dataDir=/export/data/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo2.cfg文件内容:
dataDir=/export/data/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo3.cfg文件内容:
dataDir=/export/data/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
zoo4.cfg文件内容:
dataDir=/export/data/zookeeper-4
clientPort=2184
server.1=127.0.0.1:2001:3001
server.2=127.0.0.1:2002:3002:participant
server.3=127.0.0.1:2003:3003:participant
server.4=127.0.0.1:2004:3004:observer
- server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识
- participant默认参与选举标识,可不写. observer不参与选举
4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。
- 启动三个zookeeper实例:
- bin/zkServer.sh start conf/zoo1.cfg
- bin/zkServer.sh start conf/zoo2.cfg
- bin/zkServer.sh start conf/zoo3.cfg
- 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举。
03Leader选举流程
图1 第一轮到第二轮投票流程
前提:
设定票据数据格式vote(sid,zxid,epoch)
- sid是Server ID每台服务的唯一标识,是myid文件内容;
- zxid是数据事务id号;
- epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。
按照顺序启动sid=1,sid=2节点
第一轮投票:
- sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;
- sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;
- sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);
- sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;
- 第一轮投票选举结束。
第二轮投票:
- sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;
- sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;
- sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;
- sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader。
这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。
3.1 Leader选举采用多层队列架构
zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。
图2 多层队列上下关系交互流程图
04解析代码入口类
通过查看zkServer.sh文件内容找到服务启动类:
org.apache.zookeeper.server.quorum.QuorumPeerMain
05选举流程代码解析
图3 选举代码实现流程图
- 加载配置文件QuorumPeerConfig.parse(path);
针对 Leader选举关键配置信息如下:
- 读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。下面遇到myid变量为当前节点自己sid标识。
-
- 设置peerType当前应用是否参与选举
- new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.
【Java】
public QuorumMaj(Properties props) throws ConfigException {
for (Entry
- QuorumPeerMain.runFromConfig(config) 启动服务;
- QuorumPeer.startLeaderElection() 开启选举服务;
- 设置当前选票new Vote(sid,zxid,epoch)
【plain】
synchronized public void startLeaderElection(){
try {
if (getPeerState() == ServerState.LOOKING) {
//首轮:当前节点默认投票对象为自己
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//........
}
- 创建选举管理类:QuorumCnxnManager;
- 初始化recvQueue
接收投票队列(第二层传输队列); - 初始化queueSendMap
按sid发送投票队列(第二层传输队列); - 初始化senderWorkerMap
发送投票工作线程容器,表示着与sid投票节点已连接; - 初始化选举监听线程类QuorumCnxnManager.Listener。
【Java】
//QuorumPeer.createCnxnManager()
public QuorumCnxManager(QuorumPeer self,
final long mySid,
Map view,
QuorumAuthServer authServer,
QuorumAuthLearner authLearner,
int socketTimeout,
boolean listenOnAllIPs,
int quorumCnxnThreadsSize,
boolean quorumSaslAuthEnabled) {
//接收投票队列(第二层传输队列)
this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY);
//按sid发送投票队列(第二层传输队列)
this.queueSendMap = new ConcurrentHashMap>();
//发送投票工作线程容器,表示着与sid投票节点已连接
this.senderWorkerMap = new ConcurrentHashMap();
this.lastMessageSent = new ConcurrentHashMap();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
this.self = self;
this.mySid = mySid;
this.socketTimeout = socketTimeout;
this.view = view;
this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
quorumSaslAuthEnabled);
// Starts listener thread that waits for connection requests
//创建选举监听线程 接收选举投票请求
listener = new Listener();
listener.setName("QuorumPeerListener");
}
//QuorumPeer.createElectionAlgorithm
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = createCnxnManager();// new QuorumCnxManager(... new Listener())
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();//启动选举监听线程
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;}
- 开启选举监听线程QuorumCnxnManager.Listener;
- 创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap
; - sid>self.sid才可以连接过来。
【Java】
//上面的listener.start()执行后,选择此方法
public void run() {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
while((!shutdown) && (numRetries handleConnection(...)
private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
//...省略
if (sid
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid,
new ArrayBlockingQueue(SEND_CAPACITY));
sw.start();
rw.start();
}
}
- 创建FastLeaderElection快速选举服务;
- 初始选票发送队列sendqueue(第一层队列)
- 初始选票接收队列recvqueue(第一层队列)
- 创建线程WorkerSender
- 创建线程WorkerReceiver
【Java】
//FastLeaderElection.starter
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
//发送队列sendqueue(第一层队列)
sendqueue = new LinkedBlockingQueue();
//接收队列recvqueue(第一层队列)
recvqueue = new LinkedBlockingQueue();
this.messenger = new Messenger(manager);
}
//new Messenger(manager)
Messenger(QuorumCnxManager manager) {
//创建线程WorkerSender
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
//创建线程WorkerReceiver
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
- 开启WorkerSender和WorkerReceiver线程。
WorkerSender线程自旋获取sendqueue第一层队列元素
- sendqueue队列元素内容为相关选票信息详见ToSend类;
- 首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;
- 不相同将sendqueue队列元素转储到queueSendMap
第二层传输队列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{
//...
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
//将投票信息发送出去
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
}
//QuorumCnxManager#toSend
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (this.mySid == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
ArrayBlockingQueue bq = new ArrayBlockingQueue(
SEND_CAPACITY);
ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq);
//转储到queueSendMap第二层传输队列中
if (oldq != null) {
addToSendQueue(oldq, b);
} else {
addToSendQueue(bq, b);
}
connectOne(sid);
}
}
WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。
【Java】
//WorkerReceiver
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
//自旋获取recvQueue第二层传输队列元素
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
// The current protocol and two previous generations all send at least 28 bytes
if (response.buffer.capacity()
06选举核心逻辑
- 启动线程QuorumPeer
开始Leader选举投票makeLEStrategy().lookForLeader();
sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。sendqueue队列由WorkerSender线程处理。
【plain】
//QuorunPeer.run
//...
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//makeLEStrategy().lookForLeader() 发送投票
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
//...
//FastLeaderElection.lookLeader
public Vote lookForLeader() throws InterruptedException {
//...
//向其他应用发送投票
sendNotifications();
//...
}
private void sendNotifications() {
//获取应用节点
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) +
" (n.round), " + sid + " (recipient), " + self.getId() +
" (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
}
//储存投票信息
sendqueue.offer(notmsg);
}
}
class WorkerSender extends ZooKeeperThread {
//...
public void run() {
while (!stop) {
try {
//提取已储存的投票信息
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
//...
}
自旋recvqueue队列元素获取投票过来的选票信息:
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//提取投递过来的选票信息
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
//已全部连接成功,并且前一轮投票都完成,需要再次发起投票
sendNotifications();
} else {
//如果未收到选票信息,manager.contentAll()自动连接其它socket节点
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut
【Java】
//manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)
private boolean startConnection(Socket sock, Long sid)
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
dout = new DataOutputStream(buf);
// Sending id and challenge
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort();
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
dout.flush();
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// authenticate learner
QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
if (qps != null) {
// TODO - investigate why reconfig makes qps null.
authLearner.authenticate(sock, qps.hostname);
}
// If lost the challenge, then drop the new connection
//保证集群中所有节点之间只有一个通道连接
if (sid > self.getId()) {
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(
SEND_CAPACITY));
sw.start();
rw.start();
return true;
}
return false;
}
如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap
图4 节点之间连接方式
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
if (n.electionEpoch > logicalclock.get()) {
//当前选举周期小于选票周期,重置recvset选票池
//大于当前周期更新当前选票信息,再次发送投票
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch
在上代码中,自旋从recvqueue队列中获取到选票信息。开始进行选举:
- 判断当前选票和接收过来的选票周期是否一致
- 大于当前周期更新当前选票信息,再次发送投票
- 周期相等:当前选票信息和接收的选票信息进行PK
【Java】
//接收的选票与当前选票PK
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId)))));
}
在上述代码中的totalOrderPredicate方法逻辑如下:
- 竞选周期大于当前周期为true
- 竞选周期相等,竞选zxid大于当前zxid为true
- 竞选周期相等,竞选zxid等于当前zxid,竞选sid大于当前sid为true
- 经过上述条件判断为true将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。
【Java】
public Vote lookForLeader() throws InterruptedException {
//...
//存储节点对应的选票信息
// key:选票来源sid value:选票推举的Leader sid
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//半数选举开始
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*WorkerSender
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
//已选举出leader 更新当前节点是否为leader
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
//...
}
/**
* Termination predicate. Given a set of votes, determines if have
* sufficient to declare the end of the election round.
*
* @param votes
* Set of votes
* @param vote
* Identifier of the vote received last PK后的选票
*/
private boolean termPredicate(HashMap votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
//votes 来源于recvset 存储各个节点推举出来的选票信息
for (Map.Entry entry : votes.entrySet()) {
//选举出的sid和其它节点选择的sid相同存储到voteSet变量中。
if (vote.equals(entry.getValue())) {
//保存推举出来的sid
voteSet.addAck(entry.getKey());
}
}
//判断选举出来的选票数量是否过半
return voteSet.hasAllQuorums();
}
//QuorumMaj#containsQuorum
public boolean containsQuorum(Set ackSet) {
return (ackSet.size() > half);
}
在上述代码中:recvset是存储每个sid推举的选票信息。
第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);
第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。
最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个。因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。
- setPeerState更新当前节点角色;
- proposedLeader选举出来的sid和自己sid相等,设置为Leader;
- 上述条件不相等,设置为Follower或Observing;
- 更新currentVote当前选票为Leader的选票vote(2,0,1)。
07总结
通过对Leader选举源码的解析,可以了解到:
- 多个应用节点之间网络通信采用BIO方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在BIO分布式中间件开发中的技术重要性。
- 基于BIO的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的。
- 为BIO在多线程技术上的实践带来了宝贵的经验。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net