ChubaoFS DataNode

ChubaoFS DataNode

简介

chubaofs datanode 是chubaofs中的数据存储节点,用于将chubaofs中的文件数据存储在磁盘中;

chubaofs 中的datanode数据以dataPartition为单位进行管理。dataPartition是datanode中进行数据管理的最高单位。

大文件/小文件

文件系统中,每个文件存在元数据。由于磁盘和内存的性能成本差别,导致同一个文件系统对于大小文件的操作管理成本存在显著的差异。

  • 对于小文件,其单个文件数据量少,平均磁盘操作成本巨大,且元数据数量膨胀快;

  • 大文件数据文件大,顺序读写可以获得较低的磁盘操作成本,取得较高的性能,元数据相对总数据量成本低;

因此同一个文件系统对于大小文件很难使用同一策略来满足高效低费存储需求。

chubaofs对于大小文件的读写使用了不同的策略,以此满足大小文件的不同需求。

chubaofs中的小文件是客户端指定,小于一定大小(默认为:1MB)的文件。可以通过客户端配置参数tinySize指定。

每个客户端文件的前1MB字节内的文件都使用TinyExtent进行存储管理,

大于1MB的文件部分使用NormalExtent方式进行存储管理。

顺序写/随机写

ChubaoFS同时支持顺序写随机写两种文件写入方式。

  • 顺序写: 指写入的数据每次只往文件末尾追加;

  • 随机写: 指覆盖之前已经写过的文件内容;

    客户端在发起写请求时,根据写入数据的偏移是否已经存在,来决定使用那种写入方式;

  • 顺序写: 使用主从方式进行副本间同步数据, 对应的存储引擎;

  • 随机写: 使用Raft协议来在数据副本间同步数据;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// sdk/data/stream/stream_writer.go
func (s *Streamer) write(data []byte, offset, size, flags int) (total int, err error) {

    ...
    for _, req := range requests {
        var writeSize int
        if req.ExtentKey != nil {  //已存在旧写入数据extentkey
            writeSize, err = s.doOverwrite(req, direct)  //随机覆盖写
        } else {
            writeSize, err = s.doWrite(req.Data, req.FileOffset, req.Size, direct) //顺序写 
        }
        ...
    }

    ...
}

datanode目录结构

  • datanode配置文件中的disks指定了每个datanode dp的存储磁盘;

  • 每个disk中包含一系列datapartition_<id>_<dp_size>和命名的目录,用于存储对应dp;

  • 每个disk中还可能包含expired_dataparition_<id>_<dp_size>的过期dp,这些dp是在master中不存在的;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
## datanode配置文件中的disks设置datanode管理的哪些disk; 
$ cat /opt/chubaofs/conf/datanode.json
  ...
  "disks": [
    "/data/hdfs10:159978629365",        //磁盘目录:可用大小
    "/data/hdfs12:159978629365",        
    ...
  ],
...
## 每个disk中包含一系列的dp目录
$ ls -1 /data/hdfs10
...
datapartition_1001_128849018880        //正常dp,1001:dpid,128849018880:dpsize
datapartition_1003_128849018880
datapartition_1004_128849018880
...
expired_datapartition_2004_128849018880    //过期dp,在master中 不存在的dp
...

## 每个dp包含一系列的extent file 和dp 元数据
$ tree /data/hdfs10/datapartition_1001_128849018880
├── 1
├── 10
├── 11
├── 12                       
├── 13
├── 14
├── 15
├── 16
...
├── 63
├── 64
├── 7
├── 8
├── 9                        //extent file,   1-64, tiny extent file; 1000-, normal extent file
├── APPLY                    //dp apply index, 里面存储了当前dp.appliedID(raft rsm apply index)
├── .apply                   //apply index临时文件
├── EXTENT_CRC               //extent file crc,
├── EXTENT_META              //extent meta
├── .meta                    //临时META文件,
├── META                     //dp meta
├── NORMALEXTENT_DELETE      //
├── TINYEXTENT_DELETE
└── wal_1001                 //raft wal 
    ├── 0000000000000001-0000000000000001.log
    └── META

DataPartition

dp存储

  • datanode配置文件中的disks指定了每个datanode dp的存储磁盘;

  • 每个disk中包含一系列datapartition_<id>_<dp_size>和命名的目录,用于存储对应dp;

  • 每个disk中还可能包含expired_dataparition_<id>_<dp_size>的过期dp,这些dp是在master中不存在的;

  • 每个dp

APPLY文件:

  • 保存当前dp的appliedID

  • dp的StartRaftLoggingSchedule()协程周期性(10s)将dp的appliedID 写入到APPLY文件中(先写.apply,后move);

  • dp

META文件

  • META保存了当前dp的配置元信息;

  • 当dp配置信息改变时,由PersistMetadata()将dp的配置元信息持久化到该文件中;

  • dp加载时(LoadDataPartition),从META中读取dp元信息;

  • 写入时机包括:

    • dp创建时;

    • 周期性truncate raft log时,lastTruncateID发生改变;

    • raft 配置变更;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// cat /data/hdfs10/datapartition_1001_128849018880/META
{
  "VolumeID": "weqewqe",
  "PartitionID": 1001,
  "PartitionSize": 128849018880,
  "CreateTime": "2021-11-29 18:26:21",
  "Peers": [
    {
      "id": 2,
      "addr": "10.201.69.20:17310"
    },
    {
      "id": 5,
      "addr": "10.201.74.33:17310"
    },
    {
      "id": 7,
      "addr": "10.201.76.21:17310"
    }
  ],
  "Hosts": [
    "10.201.74.33:17310",
    "10.201.69.20:17310",
    "10.201.76.21:17310"
  ],
  "DataPartitionCreateType": 0,
  "LastTruncateID": 0
}

EXTENT_META

  • metadataFp

  • 保存baseExtentID + PreAllocSpaceExtentID, 2个uint64, 总共16字节;

EXTENT_CRC

  • verifyExtentFp: 保存dp所有normal extent crc;

  • extent 加载时,根据extentID, 从EXTENT_CRC中加载对应extent 的crc到extent header 中;

  • normal extent写入时, 根据offset,size计算blockNo(128K为一个Block);

  • 如果刚好是一个整block, 将crc写入EXTENT_CRC文件; 否则规整化后,写0

  • 每个datapartition存储目录中有一个EXTENT_CRC文件,用于保存该datapartition 所有normal_extent的crc校验头;

  • EXTENT_CRC文件由多个4KB大小的校验块组成,每个校验块存储一个normal_extent的crc校验;

  • 每个4KB的校验块由1000个4B的CRC检验数据组成;

  • datanode节点在加载normal_extent时,

1
2
3
4
|crc|
| 4 | 4 |...   | 4 | 4 |...   |
|--------------|--------------|---------------|
   4k                4k

extent file

  • TinyExtent file(id: 1-64):

  • Normal Extent file:

    • 写请求追加写到extent文件末尾;

    • normal extent file最大128MB, 写入前会对写入数据offset,size进行检查,超出128MB时无法写入;

NORMALEXTENT_DELETE

TINYEXTENT_DELETE

  • 记录本dp已经删除过的tiny extent 数据块, 在第一次调用fallocate puchhole后记录;

  • 每个被删除的tiny extent 数据块记录为24Byte,按<extentID><offset><size>这个依次追加到该文件中;

Status

Extent

每个dp包含多个extent, 每个extent 对应一个extent file,用于存储数据。

extent file大小限制为128MB, 每个datapartition 包含的extent 个数不超过2000个(256GB)

extent分为NormalExtentTinyExtent 两种类型。

TinyExtent

  • id范围: [1, 64]
  • 在每个dp加载时,会通过

NormalExtent

  • id: [1000, +)

ExtentStore

1

datanode 数据存储目录结构

  • EXTENT_META: metadataFp

  • EXTENT_CRC: verifyExtentFp, 存储当前datapartition 的所有normal_extentcrc校验数据。

  • TINYEXTENT_DELETE: tinyExtentDeleteFp,

TinyExtent中的删除

  • 删除的extent数据段offset必须是4K对齐的;

  • 先通过seek从文件中找到从offset开始的DATA起始位置newoffset;

  • 通过newoffset 和 offset之间的关系判断要删除的数据段是否已被删除;

  • 总共有以下4种情形:

    • 其中第2中的数据完全落在Hole中,其中数据已经删除过;

    • 其他情况都需要通过fallocate PunchHole来打洞删除从offset开始的size长的数据;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 情形1:待删除数据区offset+size完全落在DATA区域, newoffset == offset
     data        hole        data        hole
|-------------|xxxxxxxxx|--------------|xxxxxxxx|--------
      ^-----+
  offset size


// 情形2:待删除数据区offset+size完全落在HOLE区域, newoffset - offset > size
     data        hole        data        hole
|-------------|xxxxxxxxx|--------------|xxxxxxxx|--------
                 ^---+  ^ 
              offset  newoffset


// 情形3:待删除数据区offset+size部分落在DATA区域;
     data        hole        data        hole
|-------------|xxxxxxxxx|--------------|xxxxxxxx|--------
      ^----------+
  offset size


// 情形4:待删除数据区offset+size部分落在DATA区域;
     data        hole        data        hole
|-------------|xxxxxxxxx|--------------|xxxxxxxx|--------
                 ^-------------+
              offset    ^newoffset

DataPartition加载

DataPartition修复

  • 每个dp在新建和加载后会启动一个statusUpdateScheduler()协程;

    • 该协程每过1min会先更新一下dp状态(计算usage,更新status),并交替启动repair任务(TinyExtent, NormalExtent交替分开);

    • 每5min会启动ReloadSnapshot()任务;

  • Repair任务由LaunchRepair()启动;

  • 先检查dp状态, Unavailable状态的dp不参与修复;

  • 然后updateRelicas()从master获取最新的副本ip;

  • 检查是否为leader,非leader退出,不启动 修复;

  • 最后由repair()函数执行修复任务;

复制

  • 每个datanode在接受每个客户端tcp连接时,会为该连接新建一个复制服务(ReplProtocol);

  • 复制服务会启动3个后台协程:

  • 1
    2
    3
    
    go rp.OperatorAndForwardPktGoRoutine()           //从toBeProcessCh管道读取待处理的packet,判断pkt是本地处理还是转发follower给
    go rp.ReceiveResponseFromFollowersGoRoutine()    //接收follower响应并处理
    go rp.writeResponseToClientGoRroutine()          //
    
  • 最后通过readPkgAndPrepare()持续接收client conn发送的packet,先经过Prepare()对Packet做一下检查设置,然后将packet放入toBeProcessCh,交由OperatorAndForwardPktGoRoutine()协程处理;

  • Prepare()在检查packet有效性后,最后会根据packet类型,增加extent info:

    • leader上的tinyExtent的Write请求,通过store.GetAvailableTinyExtent()选择一个有效的tinyExtent ID;

    • createExtent请求,通过store.NextExtentID()设置下一个extentID作为新extentID;

    • 最后将packet.Data复制到packet.OrgBuf上;

  • OperatorAndForwardPktGoRoutine()协程主要从toBeProcessCh接收请求,然后转发处理,具体流程如下:

    • 根据packet.RemainingFollowers判断是否需要转发给follower;

    • 如果packet.RemainingFollower==0,

      • 则调用(datanode*)OperatorPacket()在本地处理,

      • 处理完成后, 将reply放入rp.responseCh管道,通知 writeResponseToClientGoRroutine()协程处理;

    • 如果packet.RemainingFollower>0,

      • 则通过rp.sendRequestToAllFollowers()将packet复制并发送个所有follower;

      • 若发送成功,则:

  • ReceiveResponseFromFollowersGoRoutine()协程:

    • 接收
  • writeResponseToClientGoRroutine()协程:

    • 主要调用writeResponse()处理响应;

    • 先检查reply中是否有错误;

    • 在调用(*datanode)Post() 根据reply 添加一些额外的后续 处理:

      • 如果是master发送的命令,则将reply.NeedReply标记为true;

      • 如果是read命令,则将reply.NeedReply标记为false(Read请求的reply 在Operation 的相关handle中已经处理);

      • 通过cleanupPkt()清理回收;

      • 设置metric;

    • 根据reply.NeedReply判断是否需要响应,如果false,则返回, 否则将reply通过sourceConn发送回对端;

updatedupdated2024-05-152024-05-15