时间:2026-01-31 23:40
人气:
作者:admin
本文档详细说明了 LingRaft-Lite 模块中 Raft Leader 选举功能的实现,包括涉及的类、实现细节、测试方法等,便于开发者理解和复现。
本实现基于 Raft 论文(Diego Ongaro 和 John Ousterhout, 2014)的 Leader 选举部分,具体参考:
Raft 节点有三种状态:
| 状态 | 说明 | 职责 |
|---|---|---|
| FOLLOWER | 从节点 | 响应 Leader 的 RPC 请求(AppendEntries、RequestVote) |
| CANDIDATE | 候选节点 | 发起选举,向其他节点请求投票 |
| LEADER | 主节点 | 处理客户端请求,向 Follower 复制日志,发送心跳 |
定义:
用途:
实现:
private volatile long currentTerm = 0; // 当前任期号
定义:
随机化:
实现:
// 配置随机范围
config.setElectionTimeoutRandomRange(Range.of(150, 300));
// 计算随机超时时间
int randomTimeout = raftConfig.getElectionTimeoutMs();
定义:
N/2 + 1重要性:
实现:
public VoteCounter(long term, int totalNodes) {
this.majorityCount = totalNodes / 2 + 1;
}
节点投票给候选人的条件:
日志比较规则:
candidateLastLogTerm > lastLogTerm,投票candidateLastLogTerm == lastLogTerm 且 candidateLastLogIndex >= lastLogIndex,投票| 类名 | 路径 | 职责 |
|---|---|---|
| RaftNodeImpl | com.ling.raft.core.RaftNodeImpl |
节点状态管理、选举发起、投票处理、心跳发送 |
| ConsensusModuleImpl | com.ling.raft.core.ConsensusModuleImpl |
投票请求和响应的具体实现逻辑 |
| VoteCounter | com.ling.raft.core.VoteCounter |
投票计数器,统计和判断多数派 |
| ElectionTask | com.ling.raft.core.task.ElectionTask |
选举超时检测任务 |
| HeartbeatTask | com.ling.raft.core.task.HeartbeatTask |
Leader 心跳发送任务 |
| ServerStatusEnum | com.ling.raft.enums.ServerStatusEnum |
节点状态枚举 |
| VoteRequest | com.ling.raft.model.dto.VoteRequest |
投票请求 RPC |
| VoteResponse | com.ling.raft.model.dto.VoteResponse |
投票响应 RPC |
| ThreeNodeElectionTest | com.ling.raft.example.leader.ThreeNodeElectionTest |
完整测试程序 |
┌─────────────────────┐
│ RaftNodeImpl │
│ (节点主类) │
└──────────┬──────────┘
│ 持有引用
├─────────────────┐
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ ConsensusModuleImpl │ │ VoteCounter │
│ (投票逻辑) │ │ (投票计数) │
└─────────────────────┘ └─────────────────────┘
│ │
├─────────────────────┤
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ ElectionTask │ │ HeartbeatTask │
│ (选举超时检测) │ │ (心跳任务) │
└─────────────────────┘ └─────────────────────┘
// 节点状态
private volatile ServerStatusEnum nodeStatus = ServerStatusEnum.FOLLOWER;
// 持久化状态
private volatile long currentTerm = 0; // 当前任期
private volatile String votedFor = null; // 本轮任期投票给的候选人
// 选举相关
private ScheduledExecutorService electionExecutor;
private ScheduledFuture<?> electionFuture;
private VoteCounter currentVoteCounter;
private final Random random = new Random();
// 心跳相关
private ScheduledExecutorService heartbeatExecutor;
private ScheduledFuture<?> heartbeatFuture;
// 时间记录
private volatile long prevElectionTime = 0; // 上次选举时间
private volatile long preHeartBeatTime = 0; // 上次收到心跳时间
public final RaftNodeImpl node; // 持有 RaftNodeImpl 的引用
public final ReentrantLock voteLock = new ReentrantLock(); // 投票锁
public final ReentrantLock appendEntriesLock = new ReentrantLock(); // 追加条目锁
private final long term; // 当前选举任期
private final Set<String> votesReceived; // 已投票的节点ID集合
private final int majorityCount; // 需要获得的多数派票数
private volatile boolean votedForSelf; // 是否已投票给自己
类名:ServerStatusEnum
定义:
public enum ServerStatusEnum {
LEADER("LEADER", "主节点"),
CANDIDATE("CANDIDATE", "候选节点"),
FOLLOWER("FOLLOWER", "从节点");
}
+-------------------------+
| 初始化 |
+-------------------------+
|
▼
+-------------------------+
| FOLLOWER | <------------+
| (等待心跳或投票) | |
+-------------------------+ |
| |
| 选举超时 | 收到更高任期的
| | AppendEntries 或
▼ | RequestVote
+-------------------------+ |
| CANDIDATE | |
| (发起选举) | |
+-------------------------+ |
| |
| 获得多数派 |
| |
▼ |
+-------------------------+ |
| LEADER | --------------+
| (处理客户端请求) | 发现更高任期
+-------------------------+
方法:becomeFollower(newTerm)
实现位置:RaftNodeImpl.java:175-196
public void becomeFollower(long newTerm) {
// 检查任期
if (newTerm < currentTerm) {
log.warn("Cannot become Follower with smaller term: {} < {}",
newTerm, currentTerm);
return;
}
ServerStatusEnum oldStatus = nodeStatus;
// 更新状态
nodeStatus = ServerStatusEnum.FOLLOWER;
currentTerm = newTerm;
votedFor = null; // 重置投票记录
currentVoteCounter = null; // 清空投票计数器
// 停止心跳(如果之前是 Leader)
cancelHeartbeatTimer();
// 重置选举定时器
resetElectionTimer();
log.info("State changed: {} -> FOLLOWER, term: {}", oldStatus, currentTerm);
}
调用场景:
方法:becomeCandidate()
实现位置:RaftNodeImpl.java:201-216
public void becomeCandidate() {
ServerStatusEnum oldStatus = nodeStatus;
// 增加任期号(重要!)
currentTerm++;
nodeStatus = ServerStatusEnum.CANDIDATE;
votedFor = currentNodeConfig.getServerId(); // 投票给自己
log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);
// 重置选举定时器
resetElectionTimer();
// 发起投票请求
startElection();
}
调用场景:
方法:becomeLeader()
实现位置:RaftNodeImpl.java:221-243
public void becomeLeader() {
// 只有 Candidate 才能成为 Leader
if (nodeStatus != ServerStatusEnum.CANDIDATE) {
log.warn("Only CANDIDATE can become LEADER, current: {}", nodeStatus);
return;
}
ServerStatusEnum oldStatus = nodeStatus;
nodeStatus = ServerStatusEnum.LEADER;
// 初始化 Leader 状态(nextIndex、matchIndex)
initializeLeaderState();
// 取消选举定时器(Leader 不需要选举)
cancelElectionTimer();
log.info("========================================");
log.info("State changed: {} -> LEADER, term: {}", oldStatus, currentTerm);
log.info("========================================");
// 立即发送心跳并开始心跳定时器
sendHeartbeats();
startHeartbeatTimer();
}
调用场景:
类名:ElectionTask
实现位置:com.ling.raft.core.task.ElectionTask.java
核心逻辑:
@Override
public void run() {
try {
// Leader 不需要选举
if (node.getNodeStatus() == ServerStatusEnum.LEADER) {
log.debug("Current node is LEADER, skip election");
return;
}
// 检查是否超时
long currentTime = System.currentTimeMillis();
int electionTimeoutMs = node.getRaftConfig().getElectionTimeoutMs();
long timeElapsed = currentTime - node.getPrevElectionTime();
if (timeElapsed < electionTimeoutMs) {
// 未超时,重新设置定时器
node.resetElectionTimer();
return;
}
// 选举超时,开始新一轮选举
log.info("========================================");
log.info("ELECTION TIMEOUT DETECTED!");
log.info("Time elapsed: {}ms, Timeout: {}ms", timeElapsed, electionTimeoutMs);
log.info("Current term: {}, Status: {}", node.getCurrentTerm(), node.getNodeStatus());
log.info("Converting to CANDIDATE and starting new election...");
log.info("========================================");
node.becomeCandidate();
} catch (Exception e) {
log.error("Error in election task", e);
if (node.getIsRunning().get()) {
node.resetElectionTimer();
}
}
}
特点:
方法:resetElectionTimer()
实现位置:RaftNodeImpl.java:455-475
public void resetElectionTimer() {
if (!isRunning.get()) {
return;
}
// 取消旧的定时任务
cancelElectionTimer();
// 计算随机超时时间
int randomTimeout = raftConfig.getElectionTimeoutMs();
// 更新超时时间戳
prevElectionTime = System.currentTimeMillis();
// 设置新的定时任务
electionFuture = electionExecutor.schedule(
new ElectionTask(this),
randomTimeout,
TimeUnit.MILLISECONDS
);
log.debug("Election timer reset, timeout: {}ms", randomTimeout);
}
调用时机:
配置方式:
RaftConfig config = new RaftConfig(currentNode, allNodes);
config.setElectionTimeout(2); // 基础倍数
config.setElectionTimeoutRandomRange(Range.of(150, 300)); // 随机范围
实现原理:
// RaftConfig 内部实现
public int getElectionTimeoutMs() {
if (electionTimeoutRandomRange == null) {
return electionTimeout * 1000;
}
// 在随机范围内选择一个值
int min = electionTimeoutRandomRange.getMin();
int max = electionTimeoutRandomRange.getMax();
Random random = new Random();
return min + random.nextInt(max - min + 1);
}
避免平票的原理:
请求格式:VoteRequest
字段说明:
public class VoteRequest {
private long term; // candidate 的任期号
private String candidateId; // candidate 的节点 ID
private long lastLogIndex; // candidate 最后一条日志的索引
private long lastLogTerm; // candidate 最后一条日志的任期号
}
响应格式:VoteResponse
字段说明:
public class VoteResponse {
private long term; // 当前任期(用于更新 candidate 的任期)
private boolean voteGranted; // 是否投票
}
方法:requestVote(VoteRequest voteRequest)
实现位置:ConsensusModuleImpl.java:45-90
@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
voteLock.lock();
try {
long currentTerm = node.getCurrentTerm();
String votedFor = node.getVotedFor();
String candidateId = voteRequest.getCandidateId();
log.info("Received vote request from candidate: {}, Term: {}, CurrentTerm: {}, VotedFor: {}",
candidateId, voteRequest.getTerm(), currentTerm, votedFor);
// 1. 任期检查
if (voteRequest.getTerm() < currentTerm) {
log.info("Rejected: candidate term {} < current term {}",
voteRequest.getTerm(), currentTerm);
return new VoteResponse(currentTerm, false);
}
// 2. 任期更大,更新并转为 Follower
if (voteRequest.getTerm() > currentTerm) {
log.info("Higher term received: {} -> {}, becoming FOLLOWER",
currentTerm, voteRequest.getTerm());
node.becomeFollower(voteRequest.getTerm());
currentTerm = node.getCurrentTerm();
votedFor = node.getVotedFor();
}
// 3. 检查是否已投票给其他人
if (votedFor != null && !votedFor.equals(candidateId)) {
log.info("Already voted for {}, rejecting {}", votedFor, candidateId);
return new VoteResponse(currentTerm, false);
}
// 4. 检查日志是否至少一样新
if (isLogUpToDate(voteRequest.getLastLogIndex(), voteRequest.getLastLogTerm())) {
log.info("Voting for candidate: {}", candidateId);
node.setVotedFor(candidateId);
node.setPrevElectionTime(System.currentTimeMillis()); // 重置超时
return new VoteResponse(currentTerm, true);
} else {
log.info("Candidate log not up to date");
return new VoteResponse(currentTerm, false);
}
} finally {
voteLock.unlock();
}
}
投票规则详解:
任期检查
任期更新
唯一投票
日志完整性
方法:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
实现位置:ConsensusModuleImpl.java:337-350
private boolean isLogUpToDate(long candidateLastLogIndex, long candidateLastLogTerm) {
long lastLogTerm = getLastLogTerm();
long lastLogIndex = getLastLogIndex();
// 优先比较任期:candidate 的任期更大 → 更新
if (candidateLastLogTerm > lastLogTerm) {
return true;
}
// 任期相同,比较索引:candidate 的索引 >= 自己的索引 → 更新
if (candidateLastLogTerm == lastLogTerm && candidateLastLogIndex >= lastLogIndex) {
return true;
}
// 其他情况 → 不更新
return false;
}
示例:
情况 1: candidate 任期更大
candidate: term=3, index=5
current: term=2, index=5
→ 投票 (任期更大)
情况 2: 任期相同,索引更大或相等
candidate: term=2, index=5
current: term=2, index=4
→ 投票 (索引更大)
情况 3: 任期相同,索引更小
candidate: term=2, index=4
current: term=2, index=5
→ 不投票 (日志落后)
情况 4: 任期更小
candidate: term=1, index=10
current: term=2, index=5
→ 不投票 (任期更小)
目的:防止并发投票请求导致状态不一致
实现:
public final ReentrantLock voteLock = new ReentrantLock();
@Override
public VoteResponse requestVote(VoteRequest voteRequest) {
voteLock.lock();
try {
// 投票逻辑
...
} finally {
voteLock.unlock();
}
}
保护的资源:
currentTermvotedFornodeStatus方法:startElection()
实现位置:RaftNodeImpl.java:266-289
private void startElection() {
int totalNodes = raftConfig.getRaftNodeConfigList().size();
currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
// 投票给自己
currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());
log.info("Starting election for term: {}, voted for self, votes: {}/{}",
currentTerm, currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
// 单机模式直接成为 Leader
if (totalNodes == 1) {
log.info("Single node mode, becoming leader immediately");
becomeLeader();
return;
}
// 发送投票请求给所有其他节点
List<RaftNodeConfig> otherNodes = getOtherNodes();
for (RaftNodeConfig nodeConfig : otherNodes) {
electionExecutor.execute(() -> sendVoteRequest(nodeConfig));
}
// 检查是否已获得多数派(可能只有自己一票的情况)
checkElectionResult();
}
流程:
方法:sendVoteRequest(targetNode)
实现位置:RaftNodeImpl.java:294-316
private void sendVoteRequest(RaftNodeConfig targetNode) {
try {
// 构建 VoteRequest
VoteRequest request = VoteRequest.builder()
.term(currentTerm)
.candidateId(currentNodeConfig.getServerId())
.lastLogIndex(getLastLogIndex())
.lastLogTerm(getLastLogTerm())
.build();
request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
request.setCmd(Request.REQUEST_VOTE);
log.debug("Sending VoteRequest to {} for term {}", targetNode.getServerId(), currentTerm);
// 发送 RPC 请求
VoteResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
// 处理响应
if (response != null) {
handleVoteResponse(response, targetNode.getServerId());
}
} catch (Exception e) {
log.debug("Failed to send vote request to {}: {}", targetNode.getServerId(), e.getMessage());
}
}
特点:
类名:VoteCounter
实现位置:com.ling.raft.core.VoteCounter.java
核心方法:
// 记录投票
public synchronized boolean recordVote(String nodeId) {
return votesReceived.add(nodeId);
}
// 投票给自己
public synchronized void voteForSelf(String selfId) {
if (!votedForSelf) {
votesReceived.add(selfId);
votedForSelf = true;
}
}
// 检查是否获得多数派
public boolean hasMajority() {
return votesReceived.size() >= majorityCount;
}
// 获取当前票数
public int getVoteCount() {
return votesReceived.size();
}
数据结构:
ConcurrentHashMap.newKeySet() 存储投票节点 ID方法:handleVoteResponse(response, voterId)
实现位置:RaftNodeImpl.java:322-361
private void handleVoteResponse(VoteResponse response, String voterId) {
// 使用同步块确保原子性
synchronized (this) {
// 如果不是 Candidate,忽略
if (nodeStatus != ServerStatusEnum.CANDIDATE) {
log.debug("Not a candidate anymore (status: {}), ignoring vote from {}",
nodeStatus, voterId);
return;
}
// 如果收到更高任期,转为 Follower
if (response.getTerm() > currentTerm) {
log.info("Received higher term {} from {}, stepping down",
response.getTerm(), voterId);
becomeFollower(response.getTerm());
return;
}
// 忽略旧任期的响应
if (response.getTerm() < currentTerm) {
log.debug("Received stale vote response from {} for old term {}",
voterId, response.getTerm());
return;
}
// 统计投票
if (response.isVoteGranted()) {
boolean isNewVote = currentVoteCounter.recordVote(voterId);
if (isNewVote) {
log.info("Received vote from {} for term {}, total votes: {}/{}",
voterId, currentTerm, currentVoteCounter.getVoteCount(),
currentVoteCounter.getMajorityCount());
// 检查选举结果
checkElectionResult();
}
} else {
log.debug("Vote denied by {} for term {}", voterId, currentTerm);
}
}
}
处理逻辑:
状态检查
任期检查
投票统计
方法:checkElectionResult()
实现位置:RaftNodeImpl.java:367-373
private void checkElectionResult() {
if (currentVoteCounter != null && currentVoteCounter.hasMajority()) {
log.info("Majority votes received ({}/{}), becoming LEADER",
currentVoteCounter.getVoteCount(), currentVoteCounter.getMajorityCount());
becomeLeader();
}
}
调用时机:
类名:HeartbeatTask
实现位置:com.ling.raft.core.task.HeartbeatTask.java
@Override
public void run() {
try {
// 只有 Leader 才发送心跳
if (node.getNodeStatus() != ServerStatusEnum.LEADER) {
log.debug("Current node is not LEADER, skip heartbeat");
return;
}
log.debug("Sending heartbeats to all nodes, term: {}", node.getCurrentTerm());
// 发送心跳给所有节点
node.sendHeartbeats();
} catch (Exception e) {
log.error("Error in heartbeat task", e);
}
}
方法:sendHeartbeats()
实现位置:RaftNodeImpl.java:407-413
public void sendHeartbeats() {
List<RaftNodeConfig> otherNodes = getOtherNodes();
for (RaftNodeConfig nodeConfig : otherNodes) {
heartbeatExecutor.execute(() -> sendHeartbeat(nodeConfig));
}
}
方法:sendHeartbeat(targetNode)
实现位置:RaftNodeImpl.java:418-436
private void sendHeartbeat(RaftNodeConfig targetNode) {
try {
// 构建心跳请求(entries 为空)
AppendEntriesRequest request = AppendEntriesRequest.builder()
.term(currentTerm)
.leaderId(currentNodeConfig.getServerId())
.entries(new ArrayList<>()) // 空列表表示心跳
.build();
request.setAddress(targetNode.getIp() + ":" + targetNode.getPort());
request.setCmd(Request.APPEND_ENTRIES);
// 发送请求
AppendEntriesResponse response = rpcClient.send(request, RPC_TIMEOUT_MS);
// 处理响应
if (response != null) {
handleHeartbeatResponse(response, targetNode.getServerId());
}
} catch (Exception e) {
log.debug("Failed to send heartbeat to {}: {}", targetNode.getServerId(), e.getMessage());
}
}
心跳特点:
entries 为空列表term、leaderId 等元数据方法:startHeartbeatTimer()
实现位置:RaftNodeImpl.java:380-391
private void startHeartbeatTimer() {
int heartbeatInterval = raftConfig.getHeartbeatIntervalMs();
heartbeatFuture = heartbeatExecutor.scheduleAtFixedRate(
new HeartbeatTask(this),
0, // 立即开始
heartbeatInterval, // 间隔
TimeUnit.MILLISECONDS
);
log.debug("Heartbeat timer started, interval: {}ms", heartbeatInterval);
}
配置示例:
config.setHeartbeatInterval(1); // 每 1 秒发送一次心跳
方法:handleHeartbeatResponse(response, nodeId)
实现位置:RaftNodeImpl.java:441-448
private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
// 如果响应的任期更大,转为 Follower
if (response.getTerm() > currentTerm) {
log.info("Received higher term {} from {} in heartbeat response, stepping down",
response.getTerm(), nodeId);
becomeFollower(response.getTerm());
}
}
处理逻辑:
目标:任期内最多一个 Leader
实现:
任期单调递增
public void becomeCandidate() {
currentTerm++; // 每次选举增加任期
}
只投一次票
// ConsensusModuleImpl.requestVote()
if (votedFor != null && !votedFor.equals(candidateId)) {
return new VoteResponse(currentTerm, false);
}
多数派约束
// VoteCounter
public boolean hasMajority() {
return votesReceived.size() >= majorityCount; // N/2 + 1
}
规则:发现更高任期 → 更新任期,转为 Follower
实现位置:
ConsensusModuleImpl.requestVote() 第 63-68 行ConsensusModuleImpl.appendEntries() 第 128-134 行RaftNodeImpl.handleVoteResponse() 第 333-337 行RaftNodeImpl.handleHeartbeatResponse() 第 443-447 行示例:
// 在 requestVote 中
if (voteRequest.getTerm() > currentTerm) {
node.becomeFollower(voteRequest.getTerm());
currentTerm = node.getCurrentTerm();
}
目的:只投票给日志至少和自己一样新的候选人
实现:isLogUpToDate(candidateLastLogIndex, candidateLastLogTerm)
规则:
重要性:
场景:网络分区,两个 Leader 同时存在
预防机制:
多数派约束
心跳超时
任期递增
示例:
初始状态:5 节点(node1-5),Leader=node1
网络分区:
- 分区 A: node1, node2 (2 节点)
- 分区 B: node3, node4, node5 (3 节点)
分区 A:
- node1 仍是 Leader
- node2 收不到心跳,超时后转为 Candidate
- 只有 1 票(自己),无法获得多数派(需要 3 票)
- 无法选出新 Leader
分区 B:
- node3 超时后发起选举
- 获得自己 + node4 + node5 的票(3 票)
- 成为新 Leader(term=2)
网络恢复后:
- node1 发送心跳(term=1)
- 其他节点拒绝(term=2 > term=1)
- node1 收到更高任期,转为 Follower
文件位置:
LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader/ThreeNodeElectionTest.java
运行方式:
# 直接运行 main 方法
java -cp <classpath> com.ling.raft.example.leader.ThreeNodeElectionTest
脚本运行:
cd LingRaft-Lite-Core/LingRafte-Lite-CopyLog/src/main/java/com/ling/raft/example/leader
start-cluster.bat
场景 1:正常选举
[STEP 1] Starting 3 nodes...
✓ node1 started on port 8081
✓ node2 started on port 8082
✓ node3 started on port 8083
✓ All nodes started!
[STEP 2] Waiting for leader election...
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
✓ Leader elected!
场景 2:Leader 故障
raft> kill node1
✓ node1 stopped
! Leader killed, waiting for new election...
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
场景 3:节点恢复
raft> revive node1
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
| 命令 | 说明 | 示例 |
|---|---|---|
status |
查看所有节点状态 | status |
leader |
显示当前 Leader 信息 | leader |
kill <node> |
模拟节点故障 | kill node1 |
revive <node> |
恢复节点 | revive node1 |
log <level> |
控制日志级别 | log debug |
stop |
停止所有节点并退出 | stop |
控制方式:
raft> log silent
✓ Log level set to ERROR (silent mode)
raft> log info
✓ Log level set to INFO
raft> log debug
✓ Log level set to DEBUG (verbose mode)
raft> log election
✓ Showing election logs only
raft> log heartbeat
✓ Showing heartbeat logs only
日志级别说明:
silent/error - 仅错误信息warn - 警告及以上info - 信息及以上(默认)debug - 调试信息(全部日志)election - 仅选举相关日志heartbeat - 仅心跳相关日志╔════════════════════════════════════════════════════════════╗
║ Raft Leader Election Test - 3 Nodes ║
╚════════════════════════════════════════════════════════════╝
[STEP 1] Starting 3 nodes...
✓ node1 started on port 8081
✓ node2 started on port 8082
✓ node3 started on port 8083
✓ All nodes started!
[STEP 2] Waiting for leader election...
[Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
----------------------------------------
✓ Leader elected!
┌────────────────────────────────────────────────────────────┐
│ Cluster Status │
├────────────┬──────────────┬─────────┬─────────┬────────────┤
│ Node │ Status │ Term │ Log │ Voted For │
├────────────┼──────────────┼─────────┼─────────┼────────────┤
│ node1 │ LEADER │ 1 │ 0 │ - │
│ node2 │ FOLLOWER │ 1 │ 0 │ node1 │
│ node3 │ FOLLOWER │ 1 │ 0 │ node1 │
└────────────┴──────────────┴─────────┴─────────┴────────────┘
raft> kill node1
Killing node1...
✓ node1 stopped
! Leader killed, waiting for new election...
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
raft> leader
┌────────────────────────────────────────────────────────────┐
│ Leader Info │
├────────────────────────────────────────────────────────────┤
│ Node ID: node2 │
│ Address: 127.0.0.1:8082 │
│ Term: 2 │
└────────────────────────────────────────────────────────────┘
raft> revive node1
Reviving node1...
✓ node1 revived
✓ Status: FOLLOWER
✓ Election timer: active
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
# 运行测试程序
java com.ling.raft.example.leader.ThreeNodeElectionTest
# 观察选举过程
[Cluster] node1:F(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:C(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
# 查看当前状态
raft> status
# 等待几秒,观察心跳
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
[Cluster] node1:L(t1) node2:F(t1) node3:F(t1)
# 启用心跳日志观察
raft> log heartbeat
✓ Showing heartbeat logs only
# 杀死 Leader
raft> kill node1
✓ node1 stopped
! Leader killed, waiting for new election...
# 观察新选举
[Cluster] node1:F(t1) node2:C(t2) node3:F(t2)
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
# 恢复节点
raft> revive node1
✓ node1 revived
# 观察恢复过程
[Cluster] node1:F(t2) node2:L(t2) node3:F(t2)
# 持续故障恢复
raft> kill node2
raft> kill node3
raft> revive node2
raft> revive node3
// 1. 创建节点配置
RaftNodeConfig node1 = new RaftNodeConfig("node1", "127.0.0.1", 8081);
RaftNodeConfig node2 = new RaftNodeConfig("node2", "127.0.0.1", 8082);
RaftNodeConfig node3 = new RaftNodeConfig("node3", "127.0.0.1", 8083);
List<RaftNodeConfig> allNodes = Arrays.asList(node1, node2, node3);
// 2. 创建 Raft 配置
RaftConfig config1 = new RaftConfig(node1, allNodes);
config1.setElectionTimeout(2); // 基础超时倍数
config1.setElectionTimeoutRandomRange(Range.of(150, 300)); // 随机范围
config1.setHeartbeatInterval(1); // 心跳间隔 1 秒
// 3. 创建 RPC 组件
DefaultRpcServer rpcServer1 = new DefaultRpcServer(node1.getPort(), null);
DefaultRpcClient rpcClient1 = new DefaultRpcClient();
// 4. 创建并初始化 Raft 节点
RaftNodeImpl raftNode1 = new RaftNodeImpl(config1, rpcServer1, rpcClient1);
rpcServer1.setRaftNode(raftNode1);
raftNode1.init();
// 5. 等待选举
Thread.sleep(2000);
// 6. 检查节点状态
if (raftNode1.getNodeStatus() == ServerStatusEnum.LEADER) {
System.out.println("Node1 is Leader, term: " + raftNode1.getCurrentTerm());
}
// 创建监控线程
Thread monitor = new Thread(() -> {
while (true) {
System.out.printf("Node1: %s(t%d) ",
raftNode1.getNodeStatus(),
raftNode1.getCurrentTerm());
System.out.printf("Node2: %s(t%d) ",
raftNode2.getNodeStatus(),
raftNode2.getCurrentTerm());
System.out.printf("Node3: %s(t%d)\n",
raftNode3.getNodeStatus(),
raftNode3.getCurrentTerm());
Thread.sleep(3000);
}
});
monitor.setDaemon(true);
monitor.start();
// 停止 Leader 的心跳定时器
raftNode1.cancelHeartbeatTimer();
// 模拟 Follower 超时
raftNode2.resetElectionTimer(); // 重置超时
// 等待超时后,node2 会自动发起选举
// 获取当前投票信息
String votedFor = raftNode1.getVotedFor();
long currentTerm = raftNode1.getCurrentTerm();
ServerStatusEnum status = raftNode1.getNodeStatus();
System.out.println("Node1 - Status: " + status + ", Term: " + currentTerm + ", VotedFor: " + votedFor);
// 如果是 Candidate,查看投票计数器
if (status == ServerStatusEnum.CANDIDATE) {
VoteCounter counter = raftNode1.getCurrentVoteCounter();
System.out.println("Votes: " + counter.getVoteCount() + "/" + counter.getMajorityCount());
}
原因:
示例:
不随机化(3 个节点都使用 200ms):
t=0ms: 所有节点启动
t=200ms: 3 个节点同时超时,都转为 Candidate
t=201ms: 3 个节点都发送投票请求
t=210ms: 每个节点只收到自己的票(1 票)
t=220ms: 选举超时,重新选举(平票)
随机化(3 个节点使用 150-300ms 随机):
t=0ms: 所有节点启动
t=170ms: node1 超时,发起选举
t=171ms: node2 和 node3 收到投票请求,重置超时
t=220ms: node2 超时(新时间)
t=221ms: node1 已经是 Leader,node2 收到心跳,重置超时
t=280ms: node3 超时
t=281ms: node3 收到心跳,重置超时
t=1000ms: 心跳继续,node1 保持 Leader
代码实现:
// RaftConfig.java
public int getElectionTimeoutMs() {
if (electionTimeoutRandomRange == null) {
return electionTimeout * 1000;
}
int min = electionTimeoutRandomRange.getMin();
int max = electionTimeoutRandomRange.getMax();
Random random = new Random();
return min + random.nextInt(max - min + 1);
}
原因:
代码实现:
// RaftNodeImpl.java:498-500
@Override
public VoteResponse handleVoteRequest(VoteRequest voteRequest) {
VoteResponse response = consensus.requestVote(voteRequest);
// 如果投票给了对方,重置选举定时器
if (response.isVoteGranted()) {
resetElectionTimer();
}
return response;
}
原因:
代码实现:
// RaftNodeImpl.java:201-216
public void becomeCandidate() {
ServerStatusEnum oldStatus = nodeStatus;
// 增加任期号(重要!)
currentTerm++;
nodeStatus = ServerStatusEnum.CANDIDATE;
votedFor = currentNodeConfig.getServerId();
log.info("State changed: {} -> CANDIDATE, new term: {}", oldStatus, currentTerm);
resetElectionTimer();
startElection();
}
Raft 的保证:
代码体现:
// 旧 Leader 的心跳被拒绝
private void handleHeartbeatResponse(AppendEntriesResponse response, String nodeId) {
if (response.getTerm() > currentTerm) {
log.info("Received higher term {}, stepping down", response.getTerm());
becomeFollower(response.getTerm());
}
}
// 旧 Leader 无法获得多数派
public boolean hasMajority() {
return votesReceived.size() >= majorityCount; // N/2 + 1
}
原因:
配置示例:
config.setHeartbeatInterval(1); // 1 秒(1000ms)
config.setElectionTimeoutRandomRange(Range.of(150, 300)); // 150-300ms
// 注意:这里心跳间隔是秒,超时是毫秒
// 实际使用时,心跳间隔应该 < 选举超时
建议配置:
心跳间隔:50ms - 100ms
选举超时:150ms - 300ms
平票场景:
3 个节点:
- node1: term=2, votes=[node1]
- node2: term=2, votes=[node2]
- node3: term=2, votes=[node3]
每个节点只有 1 票,无法获得多数派(需要 2 票)
选举超时后重新选举
避免方法:
随机化超时(已实现)
预投票(Pre-vote)(未实现)
快速重试(未实现)
当前实现:
原因:
代码实现:
// RaftNodeImpl.java:274-279
private void startElection() {
int totalNodes = raftConfig.getRaftNodeConfigList().size();
currentVoteCounter = new VoteCounter(currentTerm, totalNodes);
currentVoteCounter.voteForSelf(currentNodeConfig.getServerId());
// 单机模式直接成为 Leader
if (totalNodes == 1) {
log.info("Single node mode, becoming leader immediately");
becomeLeader();
return;
}
// 多机模式发送投票请求
...
}
参数建议:
| 参数 | 推荐值 | 说明 |
|---|---|---|
| 心跳间隔 | 50ms - 100ms | 越短越快,但网络开销大 |
| 选举超时最小 | 150ms - 200ms | 应该 > 心跳间隔 |
| 选举超时最大 | 300ms - 400ms | 应该是心跳间隔的 3-5 倍 |
| RPC 超时 | 2000ms - 3000ms | 应该 > 选举超时 |
调优示例:
// 低延迟场景(数据中心内)
config.setHeartbeatInterval(1); // 1ms
config.setElectionTimeoutRandomRange(Range.of(10, 20)); // 10-20ms
// 高稳定性场景(广域网)
config.setHeartbeatInterval(100); // 100ms
config.setElectionTimeoutRandomRange(Range.of(500, 1000)); // 500-1000ms
// 开发调试场景
config.setHeartbeatInterval(1); // 1秒
config.setElectionTimeoutRandomRange(Range.of(2000, 4000)); // 2-4秒
| 术语 | 说明 |
|---|---|
| Term | 任期号,单调递增,用于识别 Leader |
| Election Timeout | 选举超时时间,随机化避免平票 |
| Heartbeat | 心跳,Leader 定期发送维持地位 |
| Majority | 多数派,超过半数的节点(N/2 + 1) |
| Split Vote | 平票选举,没有节点获得多数派 |
| Candidate | 候选节点,发起选举的节点 |
| Leader | 主节点,处理客户端请求 |
| Follower | 从节点,响应 Leader 的请求 |
| 文件 | 路径 |
|---|---|
| RaftNodeImpl | com.ling.raft.core.RaftNodeImpl |
| ConsensusModuleImpl | com.ling.raft.core.ConsensusModuleImpl |
| VoteCounter | com.ling.raft.core.VoteCounter |
| ElectionTask | com.ling.raft.core.task.ElectionTask |
| HeartbeatTask | com.ling.raft.core.task.HeartbeatTask |
| ServerStatusEnum | com.ling.raft.enums.ServerStatusEnum |
| VoteRequest | com.ling.raft.model.dto.VoteRequest |
| VoteResponse | com.ling.raft.model.dto.VoteResponse |
| ThreeNodeElectionTest | com.ling.raft.example.leader.ThreeNodeElectionTest |