publicFlexibleRaftServer(final String dataPath, final String groupId, final PeerId serverId, final NodeOptions nodeOptions)throws IOException { // init raft data path, it contains log,meta,snapshot FileUtils.forceMkdir(newFile(dataPath));
// here use same RPC server for raft and business. It also can be seperated generally finalRpcServerrpcServer= RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); // GrpcServer need init marshaller FlexibleGrpcHelper.initGRpc(); FlexibleGrpcHelper.setRpcServer(rpcServer);
// register business processor FlexibleRaftServiceflexibleRaftService=newFlexibleRaftServiceImpl(this); rpcServer.registerProcessor(newFlexibleGetValueRequestProcessor(flexibleRaftService)); rpcServer.registerProcessor(newFlexibleIncrementAndGetRequestProcessor(flexibleRaftService)); // init state machine this.fsm = newFlexibleStateMachine(); // set fsm to nodeOptions nodeOptions.setFsm(this.fsm); // set storage path (log,meta,snapshot) // log, must nodeOptions.setLogUri(dataPath + File.separator + "log"); // meta, must nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); // snapshot, optional, generally recommended nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); // enable flexible raft nodeOptions.enableFlexibleRaft(true); // set flexible raft factor nodeOptions.setFactor(6, 4); // init raft group service framework this.raftGroupService = newRaftGroupService(groupId, serverId, nodeOptions, rpcServer); // start raft node this.node = this.raftGroupService.start(); }
我们知道,NWR 是一种在分布式存储系统中用于控制一致性级别的策略。N 在分布式存储系统中,代表有多少份备份数据 。W 代表一次成功的更新操作要求至少有 W 份数据写入成功。R 代表一次成功的读数据操作要求至少有 R 份数据成功读取。 满足 W + R > N 的情况下,读 Quorum 和写 Quorum 一定存在交集,这个相交的成员一定拥有最新的数据,那么这个分布式系统一定是满足强一致性的。 例如,在一个 N=3 的 Raft 集群中,W 是 2、R 是 2 的时候,W+R>N,这种情况对于集群而言就是强一致性的。
所以基于以上思路,首先我们要对 Quorum 这一核心类进行设计。Quorum 类结构很简单,只需要{int w,int r} 来表示读写节点数量即可。
1 2 3 4 5 6 7 8 9 10
publicclassQuorum { privateint w;
privateint r;
public Quorum(int w, int r) { this.w = w; this.r = r; } }
// use LinkedHashSet to keep insertion order. private LinkedHashSet<PeerId> learners = newLinkedHashSet<>(); }
Quorum 计算规则
我们根据使用规则可以知道,用户端要使用 Flexible Raft 功能需要打开 NodeOptions 中 Flexible Raft 的开关,并且设置好 factor 因子的大小。那么我们又是如何根据 factor 因子计算出一个 raft 集群的w和r是多少呢? 为了实现 w 和 r 的计算逻辑,我们提供了一个计算工厂BallotFactory类专门处理NWR的计算规则。
1 2 3 4 5 6 7 8
checkValid:校验读写factor是否都不为空,并且其和为10 calculateWriteQuorum:根据size与writeFactor计算w的大小 calculateReadQuorum:根据计算出的w,根据公式 r = n - w + 1 计算 calculateMajorityQuorum:计算 Majority 模式下的 w 和 r buildFlexibleQuorum:如果打开 Flexible 模式,用于计算该模式下的 Quorum buildMajorityQuorum:如果打开 Majority 模式,用于计算该模式下的 Quorum convertConfigToLogEntry:将 configuration 转换为 LogEntry convertOldConfigToLogOuterEntry:将 oldConfiguration 转换为 LogEntry
publicstatic Quorum buildFlexibleQuorum(Integer readFactor, Integer writeFactor, int size) { // if size equals 0,config must be empty,so we just return null if (size == 0) { returnnull; } // Check if factors are valid if (!checkValid(readFactor, writeFactor)) { LOG.error("Invalid factor, factor's range must be (0,10) and the sum of factor should be 10"); returnnull; } // Partial factor is empty if (Objects.isNull(writeFactor)) { writeFactor = 10 - readFactor; } if (Objects.isNull(readFactor)) { readFactor = 10 - writeFactor; } // Calculate quorum intw= calculateWriteQuorum(writeFactor, size); intr= calculateReadQuorum(readFactor, size); returnnewQuorum(w, r); }
publicstatic Quorum buildMajorityQuorum(int size) { // if size equals 0,config must be empty,so we just return null if (size == 0) { returnnull; } intmajorityQuorum= calculateMajorityQuorum(size); returnnewQuorum(majorityQuorum, majorityQuorum); }
privatestaticintcalculateReadQuorum(int readFactor, int n) { intwriteQuorum= calculateWriteQuorum(10 - readFactor, n); return n - writeQuorum + 1; }
privatestaticintcalculateMajorityQuorum(int n) { return n / 2 + 1; }
publicstaticbooleancheckValid(Integer readFactor, Integer writeFactor) { if (Objects.isNull(readFactor) || Objects.isNull(writeFactor)) { LOG.error("When turning on flexible mode, Both of readFactor and writeFactor should not be null."); returnfalse; } if (readFactor + writeFactor == 10 && readFactor > 0 && readFactor < 10 && writeFactor > 0 && writeFactor < 10) { returntrue; } LOG.error("Fail to set quorum_nwr because the sum of read_factor and write_factor is {} , not 10", readFactor + writeFactor); returnfalse; }
// 省略非 Flexible 模式下的相关代码 ConfigurationinitialConf= options.getInitialConf(); // 判断开启 Flexible 模式后,校验设置的 factor 是否合规 if (initialConf.isEnableFlexible() && !checkFactor(initialConf.getWriteFactor(), initialConf.getReadFactor())) { returnfalse; } this.conf = newConfigurationEntry(); this.conf.setId(newLogId()); // if have log using conf in log, else using conf in options if (this.logManager.getLastLogIndex() > 0) { checkAndSetConfiguration(false); } else { this.conf.setConf(this.options.getInitialConf()); // initially set to max(priority of all nodes) this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers()); }
if (!this.conf.isEmpty()) { Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf); } else { LOG.info("Init node {} with empty conf.", this.serverId); } // 判断开启 Majority Mode 后,设置 Majority Quorum if (Objects.isNull(conf.getConf().getQuorum()) && !conf.getConf().isEnableFlexible()) { Quorumquorum= BallotFactory.buildMajorityQuorum(conf.getConf().size()); conf.getConf().setQuorum(quorum); }
// 初始化预投票选票Ctx:prevVoteCtx if (!prevVoteCtx.init(conf.getConf(), conf.getOldConf())) { LOG.error("Fail to init prevVoteCtx."); returnfalse; } // 初始化投票选票Ctx:voteCtx if (!voteCtx.init(conf.getConf(), conf.getOldConf())) { LOG.error("Fail to init voteCtx."); returnfalse; }
privatebooleancheckDeadNodes0(final List<PeerId> peers, finallong monotonicNowMs, finalboolean checkReplicator, final Configuration deadNodes) { finalintleaderLeaseTimeoutMs=this.options.getLeaderLeaseTimeoutMs(); intaliveCount=0; longstartLease= Long.MAX_VALUE; for (final PeerId peer : peers) { if (peer.equals(this.serverId)) { aliveCount++; continue; } if (checkReplicator) { checkReplicator(peer); } finallonglastRpcSendTimestamp=this.replicatorGroup.getLastRpcSendTimestamp(peer); if (monotonicNowMs - lastRpcSendTimestamp <= leaderLeaseTimeoutMs) { aliveCount++; if (startLease > lastRpcSendTimestamp) { startLease = lastRpcSendTimestamp; } continue; } if (deadNodes != null) { deadNodes.addPeer(peer); } }
// If the writeFactor in a cluster is less than readFactor and the number of nodes // is less than r and greater than or equal to w, we hope to still be in a writable state. // Therefore, read requests may fail at this time, but the cluster is still available Quorumquorum=this.conf.getConf().getQuorum(); inttargetCount=this.conf.getConf().isEnableFlexible() && quorum.getW() < quorum.getR() ? quorum.getW() : quorum.getR(); if (aliveCount >= targetCount) { updateLastLeaderTimestamp(startLease); returntrue; } returnfalse; }