ToyDB

ToyDB


简介

  • toydbErik Grinaker为学习rust语言而开发的分布式sql数据库,支持分布式事务模型;

架构

toydb主要由3部分组成:

  • sqlengine: 负责sql语句的解析、执行计划;

  • raftengine: 负责存储层的数据副本同步;

  • storage: 负责提供kv及mvcc存储;

SQL层

  • SQL层主要负责将输入的sql语句字符串转化为执行计划,并通过raft交给各个副本的mvcc存储引擎执行;

  • SQL主要分为两个阶段:

    • 词法分析:sql语句------->token---->AST;

    • 生成执行计划:AST----->planner----->优化----->执行;

SQL语句 --> 词法分析 ----> 语法分析--->生成执行计划--->

词法分析(Lexer)

  • Lexer 也称为分词,从左向右扫描SQL,将其分割成一个个的toke(词元),在将token组装为AST(Abstract Tree);

  • Lexer的实现一般都是构造DFA(确定性有限状态自动机)来实现的。

状态转移图如下,这是一个能够识别标识符,数字和一般运算符的词法解析器。

语法分析(Parser)

Parser阶段有两种类型方法来实现:

  • 一种是自顶向下分析法,

  • 另一种是自底向上分析法,

    简单介绍一下两种类型分析法的处理思路。

执行计划

1
SQL String ---词法分析<Lexer>--> Token --<语法分析>--> AST Statement

Sql Engine

Parser(解释器)

Planner

Executor

Storage(存储)

Memory

MVCC

Raft Engine

toydb 通过raft来实现各节点间数据的一致性,其自带的raft模块由rust语言提供的一个简单的实现。

Raft内部有2个状态机:

  • 复制状态机(): raft各节点间的日志复制,保证各个副本日志的顺序及一致;

  • 指令状态机(State): 依次执行日志中的指令,驱动内部业务系统状态的改变;

由于复制状态机的Raft协议可以保证日志序列的顺序且一致性,所以由日志驱动的不同副本的指令状态机将拥有相同的输入指令序列,在初始状态相同的情况下,指令状态机将会得到相同的输出,以此就保证了各个副本外部最终状态的一致性;

  • 复制状态机是raft协议的核心;

  • 指令状态机由raft日志驱动来改变外界状态,是raft协议和外界交互的接口;

EvenLoop

Raft 的主驱动是EvenLoop。节点启动时,会开启一个evenloop后台异步任务,持续监听tick, tcp_in_tx, client_rx, node_rx这4个事件源上的消息Msg,以此来驱动整个状态机的运行:

  • tick事件由定时器产生,转入相应rolenode的tick处理;

  • tcp_in_tx事件由其他节点peer产生,交由raft 状态机step处理;

  • node_rx事件由节点内部产生,需根据事件消息的接收对象(to)分别处理;

    • 发往副本(toAddress::Peer, Address::Peers)的消息,放入tcp_tx交由TcpSender进行发送;
    • 发往ClientAddress::Client) 且事件类型为Event::ClientResponse的消息, 根据idrequests表中找到该消息响应rxresponse_tx,通过response_tx将消息响应回复给Client;
    • 其他消息为非法消息, 报错并退出;
  • client_rx事件由客户端产生,处理如下:

    • 先为事件生成uuid作为唯一id;

    • 以id为key, 将消息响应rxrequest_rx放入requests 哈希表中,该表用于后续消息响应时处理消息返回;

    • 生成一个ClientRequest类型的消息 ,交由Rolenodestep处理;

Evenloop接收到消息后,通过tick(), step()来驱动复制状态机执行日志复制操作。

各节点收到ClientRequest后处理流程Step()

  • Candidate:

    • ClientRequest消息放入queued_reqs队列中进行缓存,等待变为Leader后,再依次处理;
  • Follower:

    • 如果没有Leader, 则也将消息放入queued_reqs中缓存;// queud_reqs 后续处理?

    • 如果有Leader, 则将消息(id, from)放入proxied_reqs中记录下来,然后转发到Leader,由Leader处理;

  • Leader:

    • Request::Query消息:

      • 通过state_tx,向状态机发送Instruction::Query指令,状态机将指令插入到queries中;

      • 通过state_tx, 向状态机发送Instruction::Vote指令, 统计;

      • 若存在副本,则向副本发送Event::Heartbeat消息,和Follower

    • Request::Mutate消息:

      • 将消息记录到本地log中;

      • 复制log到各个Follower;

      • 接收到多数Follower的确认消息后,commit消息;

      • 给状态机发送Instruction::Notify指令;

      • 如果peers为空,提交;

    • Request::Status消息:

      • 根据当前节点状态,生成Instruction::Status,通过state_tx交由状态机执行;

指令状态机

指令状态机Driver::drive()驱动。每个Raft Node新建时,会开启一个driver后台任务,该任务从state_rx接收指令,交由execute处理,各指令处理流程如下:

  • Instruction::Abort:

  • Instruction::Apply:

  • Instruction::Notify:

    • 如果指令index大于状态机已经applied_index, 将(index, (address, id))插入到状态机notify哈希表中;

    • 否则指令已被应用过,通过node_tx给Raft Node 发送ClientResponse消息,由Raft Node将错误消息返回给客户端;

  • Instruction::Query:

  • Instruction::Status:

  • Instruction::Vote:

源码

  • Log

Log(日志)是Raft状态机

1
2
3
4
5
6
7
8
/// The replicated Raft log
pub struct Log {
    pub(super) store: Box<dyn log::Store>,
    pub(super) last_index: u64,
    pub(super) last_term: u64,
    pub(super) commit_index: u64,
    pub(super) commit_term: u64,
}
  • Driver
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//状态机接口
pub trait State: Send {
    fn applied_index(&self) -> u64;  //
    fn mutate(&mut self, index: u64, command: Vec<u8>) -> Result<Vec<u8>>;  //修改状态机状态
    fn query(&self, command: Vec<u8>) -> Result<Vec<u8>>;                   //查询状态机
}
//状态机驱动
pub struct Driver {
    state_rx: mpsc::UnboundedReceiver<Instruction>,  //状态机指令输入口
    node_tx: mpsc::UnboundedSender<Message>,         //raft协议消息输出口
    applied_index: u64,                              //
    notify: HashMap<u64, (Address, Vec<u8>)>,        //通知客户端更改被采用
    queries: BTreeMap<u64, BTreeMap<Vec<u8>, Query>>,  //等待处理的客户端查询指令,
}
// 状态机指令
pub enum Instruction {
    Abort,  //取消
    Apply { entry: Entry },   //应用
    Notify { id: Vec<u8>, address: Address, index: u64 }, //通知
    Query { id: Vec<u8>, address: Address, command: Vec<u8>, term: u64, index: u64, quorum: u64 }, //查询
    Status { id: Vec<u8>, address: Address, status: Box<Status> },  //状态机状态
    Vote { term: u64, index: u64, address: Address },  //投票
}

Raft角色

Leader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 节点共有属性
pub struct RoleNode<R> {
    id: String,                //节点id
    peers: Vec<String>,        //
    term: u64,                 //
    log: Log,                  //
    pre_vote: bool,            //
    node_tx: mpsc::UnboundedSender<Message>,       //和node之间发送Msg通道
    state_tx: mpsc::UnboundedSender<Instruction>,  //节点往状态机驱动发送状态机指令通道
    queued_reqs: Vec<(Address, Event)>,            //
    proxied_reqs: HashMap<Vec<u8>, Address>,       //
    role: R,
}
// leader专有属性字段
pub struct Leader {
    heartbeat_ticks: u64,                    //心跳计数
    peer_next_index: HashMap<String, u64>,  //复制到副本的下一个index
    peer_last_index: HashMap<String, u64>,  //已知复制到副本的最后index
}
// follower专有字段
pub struct Follower {
    leader: Option<String>,
    leader_seen_ticks: u64,
    leader_seen_timeout: u64,
    voted_for: Option<String>,
}
// candidate专有字段
pub struct Candidate {
    election_ticks: u64,
    election_timeout: u64,
    votes: u64,
}

参考

  1. GitHub - erikgrinaker/toydb: Distributed SQL database in Rust, written as a learning project
  2. toydb/architecture.md at master · erikgrinaker/toydb · GitHub
updatedupdated2024-05-152024-05-15