Chubao-Raft

Chubao-Raft


简介

Chubao Raft 是 chubaofs 项目使用的multigroup-raft 库,是在 etcd raft 库上进行了重构和优化。

特性

chubao-raft 有以下特性:

  • multi-raft:支持多组 raft;
  • read-lease:
  • batch/pipeline:

组成

  • FSM:算法状态机,负责基本的raft算法逻辑状态机封装;

  • Log: 日志模块,负责raft日志的记录及同步;

  • Storage: 存储接口,

  • Transport:传输模块,负责raft副本间的数据传输;

  • StateMachine:指令状态机接口模块,

状态机

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
graph TB
    F(Follower)
    C(Candidate)
    L(Leader)
    E(ElectionAck)

    S((START)) --> F
    F --timeout--> C
    C --timeout/vote req--> C
    C --electAck/append/hb/minor votes--> F
    C -- quorum:lease_off --> L
    C -- quorum:lease_on --> E
    L -- lease offline --> F
    E -- vote req --> E
    E --electAck/append/hb--> F
    E --quorum--> L

重要数据结构

  • Entry
  • Message
  • Peer
  • Snapshot:
  • HardState:持久化状态;
  • SoftState: 内存状态;
  • 需要持久化的状态:
    • currentTerm: 当前任期;
    • votedFor:投给票的节点 ID;
    • log:日志序列
  • 内存中的状态:
    • commitIndex:
    • lastApplied:
  • leader 内存状态:
    • nextIndex[]:
    • matchIndex[]:

Proposal(提案)

  • 外部给raft状态机执行的命令;

1
2
3
4
5
6
//raft/raft.go
type proposal struct {
    cmdType proto.EntryType    //提案类型,分为normal()
    future  *Future
    data    []byte
}

Future

  • future是raft中用于描述raft输出结果的一个数据结构;

  • future内部有2个长度为1的chan,分别代表着正常输出错误输出

  • future主要提供以下2个外部调用来获取输出:

    • Response(): 获取同步输出结果;

    • AsyncResponse: 异步输出;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type respErr struct {
    errCh chan error
}
type Future struct {
    respErr
    respCh chan interface{}
}
// 
func (f *Future) Response() (resp interface{}, err error) {
    select {
    case err = <-f.error():
        return
    case resp = <-f.respCh:
        return
    }
}
//
func (f *Future) AsyncResponse() (respCh <-chan interface{}, errCh <-chan error) {
    return f.respCh, f.errCh
}
  • inflight(复制环形数组)
1
2
3
4
5
6
7
// 复制环形数组(滑窗)
type inflight struct {
    start  int      //有效元素起始idx
    count  int      //有效元素个数
    size   int      //最大元素个数
    buffer []uint64 //
}

角色数据

  • 共有持久存储的数据:

    • id: raft 节点id;

    • term:当前任期;

    • log: raft日志序列,只能append和truncate, 无法insert(),update();

  • 共有内存数据:

    • peers: 其他节点;

    • queued_reqs: 以缓存的req,用来缓存选举过程中收到的req;

    • proxied_reqs: 已转发的req,用来记录follower转发到leader的req, 在一个新的选举期内被清空;

    • node_tx: raft外部消息入口,通过该管道接收外界输入的消息;

    • state_tx: 指令状态机指令入口,通过该管道向指令状态机发送指令;

Candidator

  • 私有数据:

    • votes: 已经收到的投票总数(包括自己投自己的一票),如果votes>半数,则当选为leader;

    • election_ticks: 当前轮选举已过的ticks数, 每个tick来到时+1;

    • election_timeout: 当前轮选举超时数, election_ticks>election_timeout时,发起新一轮选举(term++);

Follower

  • follower内存状态:

    • leader: 当前term的leader;

    • voted_for: 当前term所投的leader;

    • leader_seen_ticks: leader心跳计数tick, 每个tick +1, 收到leader hb msg, 重置为0;

    • leader_seen_timeout: leader心跳timeount, leader_seen_ticks > leader_seen_timeout时, follower将转入candidate;

Leader

  • leader内存状态:

    • heartbeat_ticks: 心跳tick计数;

    • peers_next_index: 将要发送到peers的下一个index.每个peer的peer_next_index最开始置为leader启动时的最后一个log index+1。在复制过程中leader将发送以为起始的(term,peer_next_index-1)所有后续log给follower,follower检查该(term,peer_next_index-1)数据是否在log中存在,如果不存在, 则会发送拒绝复制的响应。leader接收到拒绝复制响应后,依次减小改值peer_next_index,直到被接受;

    • peers_last_index: 已经发送到peers的最后一个index,peers_last_index[peerId] < peers_next_index[peerId]


关键流程

新建RaftServer

  • chubao raft的multi-raft, 多个raft主要通过RaftServer->rafts进行管理;

  • 启动入口为raft/server.go中的NewRaftServer, 通过输入参数config生成一个新的RaftSever结构体对象,并启动后台服务协程run();

  • RaftServer->run()协程监听如下事件:

    • rs->stopC: RaftServer结束消息,接收该消息,立马结束run服务协程;

    • fatalStopC: 关闭raft消息,从rs->rafts map中删除对应的raft;

    • rs->heartc: 遍历所有raft, 将msg放入raft->recv中处理;

    • rs->ticker.C:

      • 递增ticks计数器;

      • 如果ticks达到配置的HeartbeatTick值,则将计数器置0,并给所有非Leader节点发送ReqMsgHeartBeat消息;

      • 如果节点不在恢复快照状态,则依次给所有raft tickc发送数据,激活各raft tick()

新建Raft

RaftSever启动后,通过RaftServer->CreateRaft()函数可新建raft, 并将raft加入到RaftSever->rafts中。newRaft()流程如下:

  • 校验config;

  • 根据配置,生成新raftFsm实例;

  • 初始化raft实例;

  • 启动Raft.runApply()协程,该协程主要用于监听raft->applycchannel上的事件;

  • 启动Raft.run()协程,

Raft主处理流程

raft.run()是各个raft主要用来处理各个事件的总协程入口,其处理如下事件:

  • s.stopc: 退出信号;

  • s.tickc: 计数器信号,调用raftFsm.tick();

  • s.propc:


选举

选举发生条件

  • FollowerCandidate节点选举计时器超时(lease off) 或收到 Leader 的 Lease TimeOut 消息(lease on);
  • ElectionAck 状态时,选举计时器超时;
  • 外界选举信号;

选举过程

  • 当选举条件达到时,raft 状态机会产生一个LocalMsgHup消息中断当前任期,发起选举;

  • 非 Leader 节点收到 LocalMsgHup 消息,如果可以提升为 Leader,且 raft 中没有待处理的配置变更日志,则举行选举(campaign);

  • 发起节点先将自己变成候选人(Candidate);

  • 判断得票数。如果达到法定票数(大于总数一半)

    • 启用 Lease,则直接当选为领导人(Leader),选举结束;
    • 未启用 Lease,则进入选举确认(ElectionAck)状态;
  • 否则,如果未达到法定票数,则给各个复制节点发送ReqMsgVote拉票消息,强制各节点投票;

  • Candidate节点收到ReqMsgVote拉票消息,投反对票,并回复 RespMsgVote 消息给拉票节点;

  • Follower节点收到ReqMsgVote拉票消息后,

    • 如果满足投票条件(条件:未设置 lease,或没有 leader 节点 并且 未投票给其他节点 且 日志为最新),投赞成票,回复 RespMsgVote 消息给拉票节点;
    • 否则,投反对票;回复 RespMsgVote 消息给拉票节点;
  • Leader节点收到ReqMsgVote消息,投反对票,回复RespMsgVote消息给拉票节点;

  • ElectionAck节点收到ReqMsgVote消息,投反对票;回复 RespMsgVote 消息给拉票节点;

  • Candidate 收到 RespMsgVote投票消息后,统计有效票数:

    • 如果赞成票达多数
      • 如果未启用 lease,则成为 Leader;
      • 否则,进入 ElectionAck 状态;
    • 如果反对票达多数,则成为Follower
  • Follower给拉票候选人投赞成票条件:

    • 无 leader(或未启用 lease);
    • 未给其他节点投过票;
    • 该候选人的 term 和 index 均为最新。

选举确认(ElectionAck)

如果启用 Lease,节点获得多数票后,不会直接当选,需先进入 ElectionAck 状态,对选举进行确认;

在 ElectionAck 状态,ElectionAck 节点将向其他节点发送选举确认请求消息(ReqMsgElectAck),待其他节点回复确认消息(RespMsgElectAck)后,统计确认票数,如果确认票数达到法定票数,才当选为领导人(Leader)。

  • Follower 节点收到 ReqMsgElectAck 消息,将重置选举计数器,并将发送节点设为 Leader 后,回复 RespMsgElectAck 消息;
  • Candidate 节点收到 ReqMsgElectAck 消息,将自己转变成 Follower,回复 RespMsgElectAck 消息;
  • 其他 ElectAck 节点收到 ReqMsgElectAck 消息,同样将自己转变成 Follower,回复 RespMsgElectAck 消息;
  • ElectAck 节点收到 RespMsgElectAck 消息后,记录确认消息(ack)数。如果确认消息数大于等于法定当选票数,则当选为 Leader。并立即广播 append;

日志复制流程

任何节点收到消息:

  • 如果是 LocalMsgHup 消息:
    • 如果是非 Leader,且有副本节点,
      • 如果包含配置变更消息且 comitted > applied, 则忽略;
      • 否则,举行选举;
    • 如果是 Leader 节点,则忽略;
    • 返回;
  • 否则,判断消息任期(Term):
    • 如果 Term==0, 忽略;
    • 如果消息任期(m.Term) > 节点任期(r.term), 则继续判断
      • 如果为请求投票消息(ReqMsgVote),判断 lease:
        • 如果是 Follower 节点,且启用了 lease,且消息不是来自旧 Leader 节点,则给旧 Leader 回复 LeaseMsgOffline 消息,通知旧 Leader Lease 下线;
      • 变成 Follower。
    • 如果消息里任期小于节点任期(m.Term<r.term),则忽略;
  • 转入不同角色状态处理;

角色状态消息处理

  • Follower

    • LocalPropMsg:
      • 如果没有 Leader,则不处理(报警提示);
      • 如果有 Leader,转发 Leader 处理
    • ReqMsgAppend:
      • 重置选举计数器;
      • 设置 Leader 为消息发送者;
      • 执行 append 操作:
        • 如果消息 index 小于本节点 committed,则回复 RespMsgAppend 本节点 committe index;
        • 否则,尝试 append
          • 如果 append 成功,则回复 RespMsgAppend(index = lastIndex);
          • 否则,回复 RespMsgAppend,Reject=true;
    • ReqMsgHeartBeat:
      • 重置选举计数器;
      • 设置 Leader 为消息发送者;
    • ReqCheckQuorum:
      • 重置选举计数器;
      • 设置 Leader 为消息发送者;
      • 回应发送者 RespCheckQuorum 消息;
    • ReqMsgVote:
      • 如果满足投票条件,则重置选举计数器,并回复发送者 RespMsgVote(赞成票,Reject=false);
      • 否则回复发送者 RespMsgVote(反对票,Reject=true)
    • LeaseMsgTimeout
      • 如果消息来自 Leader,则重置选举计数器,并发送 LocalMsgHup 消息;
      • 否则忽略;
  • Candidate

    • LocalMsgProp:
      • 忽略
    • ReqMsgAppend:
      • 先转变为 Follower;
      • 再执行 append:
    • ReqMsgHeartBeat:
      • 转变为 Follower;
    • ReqMsgElectAck
      • 先转变为 Follower;
      • 回复发送者 RespMsgElectAck 消息(Reject=false)。
    • ReqMsgVote:
      • 回复发送者 RespMsgVote 消息, 投反对票(Reject=true)。
    • RespMsgVote:
      • 统计收到的赞成票数;
      • 如果赞成票已达多数:
        • 如果启用 LeaseCheck,则转变为 ElectionAck 状态;
        • 否则未启用 LeaseCheck,则当选为 Leader,并广播 append;
      • 如果反对票已达多数,则变成 Follower;
  • Leader

    • LocalMsgProp:

      • 空消息或没有副本节点,则忽略该消息;
      • 否则,检查是否有配置变更消息;
      • append;
      • 向副本节点广播 append。
    • ReqMsgVote:

      • 向发送者回复 RespMsgVote,投反对票(Reject=true);
    • RespMsgAppend:

      • 更新复制信道(pr)状态;
      • 如果 Reject, 则 pr 可能要回退,并转变到探测状态,在发送 append
      • 否则,更新信道状态,并尝试 commit:
        • 如果 commit 成功,则广播 append;
        • 如果信道暂停过,则发送 append;
    • RespMsgHeartBeat:

更新信道状态

  • LeaseMsgOffline:

    • 给所有副本节点发送 LeaseMsgTimeOut 消息;
    • 转变为 Follower;
  • RespMsgSnapShot:

    • 如果信道状态不是 replicaStateSnapshot, 则忽略;
    • 如果消息被拒绝,则快照失败,信道转入探测状态;
    • 否则,更新信道并继续探测;
    • 信道暂停;
  • RespCheckQuorum:

    • 接收 Ack;
    • 如果收到确认节点数+1(本节点),则 readOnly->advance;
  • ElectionAck

    • LocalMsgProp:

      • 忽略
    • ReqMsgAppend:

      • 转变为 Follower;
      • 执行 append;
    • ReqMsgHeartBeat:

      • 变成 Follower;
    • ReqMsgElectAck:

      • 变成 Follower;
      • 回复 ResqMsgElectAck;
    • ReqMsgVote:

      • 回复 RespMsgVote,投反对票(Reject=true)
    • RespCheckQuorum:

      • recvAck
    • RespMsgElectAck:

      • 统计 ack 数;
      • 如果确认数>=法定票数,则成为 Leader,并广播 append;

计时器(Tick)

计时器和消息一起用于驱动 raft 状态机的状态变更,主要包括三种种计时器:

  • 心跳计数器(tickHeartBeat):用于 Leader 向 Follower 发送存活信号。

    用于 Leader 节点,其频率>>选举计时器,约为选举计数器频率的 10 倍,用于 Leader 和 Follower 节点间确认信道状态。流程如下:

    • 心跳计数器+1,选举计数器+1;
    • 如果选举计数器到达阈值:
      • 重置选举计数器为 0;
      • 如果启用 lease 且 lease 失效,则 Leader 状态转变为 Follower 状态;
        • 如果是非 Leader 节点,忽略;
        • 如果心跳计数器到达阈值:
    • 重置心跳计数器为 0;
    • 检查所有副本状态,如果不是快照状态,则副本继续;
    • 广播 ReadOnly;
  • 选举计数器(tickElection):

    用于 Follower 和 Candidate 节点,用来计算一个任期是超时时间,如果一个任期内选举计时器超时,则将触发重新选举操作,流程如下:

    • 如果节点无法;
    • 选举计数器+1;
    • 判断是否超时:
      • 如果超时,则重重置选举计数器为 0;状态机输入 LocalMsgHup 消息,中断当前任期;
  • 当选确认计数器(tickElectAck):

    当选确认计数器主要用作当选确认状态节点中,流程如下:

    • 选举计数器+1;
    • 如果选举计数器超时,则:
      • 重置选举计数器为 0;
      • 状态机输入 LocalMsgHup 消息,中断当前任期;

成员变更

  • 成员变更在一次只变更一个节点(增加或删除一个节点)的情况下,可以当做普通的raft write请求来处理;
  • 将日志分为普通和成员变更两种类型,普通日志走正常的fsm流程,成员变更类型日志

ReadOnly

  • 使用readOnly来处理读请求;

  • raft算法的leader完整性确保leader一定拥有最新的已提交数据,为此只读请求在leader中需要保证当前的leader有效();

  • 实现两种读请求优化 :

    • readIndex:

    • readLease

不足

  • 缺乏test;

参考

  1. 线性一致性和 Raft
  2. Raft 的 PreVote 实现机制
  3. Etcd 之 Lease read
  4. Raft 协议精解
  5. TiKV 源码解析系列 - Lease Read
  6. Raft TLA+形式化验证
updatedupdated2024-05-102024-05-10