CubeFS-BlobStore

CubeFS-BlobStore

简介

  • CubeFS-BlobStore是一个高可靠、高可用、低成本、支持超大规模(EB)的分布式存储系统。

  • 采用纠删码中的Reed-Solomon编码,对比三副本,以更低的存储成本提供更高的数据耐久性保障,支持多种纠删码模式和多可用区部署,

  • 同时针对小文件做了专门优化,可满足不同业务场景的存储需求。

架构

组件

blobstore包含如下组件:

模块描述功能依赖组件本地存储部署部署数量
ClusterMgr集群管理负责集群管理和卷的生成consul可跨机房部署,多机房>=3
Allocator集群管理代理提供数据写入资源id的分配ClusterMgr>=1
Access接入模块对外提供数据读、写和删除接口Redis,consul单节点部署,一个机房部署一个,>=1
BlobNodeblob存储提供存储资源ClusterMgr
Workerblob任务执行模块卷修补、迁移和回收任务执行模块BlobNode,Scheduler>=1
Scheduler异步任务调度中心卷修补、迁移和回收任务的生成和调度mongodb一个cluster至少一个>=1
MQProxy异步消息代理修补和删除操作消息保存模块,用于后续异步执行kafka>=1
Tinker后台数据整理管理模块修补和删除操作异步执行模块kafka,mongodb>=1
Clicli工具提供cli管理操作consul,redis>=
Consul注册管理提供节点、组件注册;必选
MongoDBmongodb为scheduler提供存储必须
Kafaka消息队列服务scheduler任务调度消息队列必须
Redis缓存服务为access,cli访问clustermgr提供缓存加速可选
Zookeeper为kafka提供注册服务

资源

  • Region(区域): 一个region可包含多个cluster;

  • Cluster(集群):一个Cluster是由多个相同cluster_id的clustermgr组成,可以提供完整的Blobstore服务的单位。cluster可以跨机房部署,多个跨机房clustermgr通过raft组成一个cluster;

  • Idc(数据中心/机房): 一个cluster可以跨多个idc部署;

  • Rack(机架):机房内一个机柜; 同一机架内的机器在一个交换机内,可以拥有很低的网络访问延迟和很高的网络带宽;

  • AZ(Available Zone):EC 中数据块的有效分区数,一个集群中,AZ的数量要和 IDC个数匹配;

  • Node(节点): 对应一台物理机或容器实例;

  • Disk(物理磁盘): 一块已格式化好,并被挂载到某一目录的物理磁盘或一个目录;

  • Chunk(数据簇):每个Disk的空间被划分为多个Chunk,Chunk对应一个Chunk File;

  • Volume(逻辑卷): 逻辑卷是面对用户的存储资源管理抽象视图,一个Volume包含多个Unit;

  • Unit(逻辑卷数据单元): 逻辑卷的数据组织单元。Unit主要包含了Vuid和bid两个字段;

  • Location(定位):数据在blobstore中的定位,一个location中的数据由多个blob组成;

  • Blob(数据段): Blob是location中的一段数据,每个Blob是EC编解码的一个数据块, Blob最大默认:4MB;

  • Shard(数据分片):一块Blob数据在ec encode时,根据编码策略,切分成多个分片Shard,每个分片称为一个shard;

EraserCode

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ec参数
type Tactic struct {
    N         int   //数据块个数;
    M         int   //校验块个数;
    L         int   //LRC中,本地校验块个数;
    AZCount   int   //分区个数;
    PutQuorum int   //写操作时,最少写成功的数据块个数, 
                    // 为了保证写入时,有一个az挂掉后还能恢复数,最小写入块数必须>=(N+M)/AZCount+N
    GetQuorum int   //读操作时,最少读成功的数据块个数;
    MinShardSize int //每个数据块最小大小;
                     //当1en(data) < MinShardSize*N, 每个shard大小为MinShardSize, 后面不够的填0
                     //当len(data) > MinShardSize*N, 每个shard大小为[len(data)/N], 后面填0;
}
// 定义的ec算法
var constCodeModeTactic = map[CodeMode]Tactic{
    EC15P12:   {N: 15, M: 12, L: 0, AZCount: 3, PutQuorum: 24, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC6P6:     {N: 6, M: 6, L: 0, AZCount: 3, PutQuorum: 11, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC16P20L2: {N: 16, M: 20, L: 2, AZCounect: 2, PutQuorum: 34, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC6P10L2:  {N: 6, M: 10, L: 2, AZCount: 2, PutQuorum: 14, GetQuorum: 0, MinShardSize: alignSize2KB},

    // single az
    EC12P4: {N: 12, M: 4, L: 0, AZCount: 1, PutQuorum: 15, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC16P4: {N: 16, M: 4, L: 0, AZCount: 1, PutQuorum: 19, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC3P3:  {N: 3, M: 3, L: 0, AZCount: 1, PutQuorum: 5, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC10P4: {N: 10, M: 4, L: 0, AZCount: 1, PutQuorum: 13, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC6P3:  {N: 6, M: 3, L: 0, AZCount: 1, PutQuorum: 8, GetQuorum: 0, MinShardSize: alignSize2KB},
    // for env test
    EC6P3L3:       {N: 6, M: 3, L: 3, AZCount: 3, PutQuorum: 9, GetQuorum: 0, MinShardSize: alignSize2KB},
    EC6P6Align0:   {N: 6, M: 6, L: 0, AZCount: 3, PutQuorum: 11, GetQuorum: 0, MinShardSize: alignSize0B},
    EC6P6Align512: {N: 6, M: 6, L: 0, AZCount: 3, PutQuorum: 11, GetQuorum: 0, MinShardSize: alignSize512B},
    EC4P4L2:       {N: 4, M: 4, L: 2, AZCount: 2, PutQuorum: 6, GetQuorum: 0, MinShardSize: alignSize2KB},
}


//vol layout ep:EC6P10L2
//|----N------|--------M----------------|--L--|
//|0,1,2,3,4,5|6,7,8,9,10,11,12,13,14,15|16,17|
// global stripe:[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15], n=6 m=10
// two local stripes:
// local stripe1:[0,1,2,  6, 7, 8, 9,10, 16] n=8 m=1
// local stripe2:[3,4,5, 11,12,13,14,15, 17] n=8 m=1

Table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// normaldb
    scopeCF            = "scope"        //分配的bid范围
    diskCF             = "disk"         //
    configCF           = "config"
    diskDropCF         = "disk_drop"
    serviceCF          = "service"
    diskStatusIndexCF  = "disk-status"
    diskHostIndexCF    = "disk-host"
    diskIDCIndexCF     = "disk-idc"
    diskIDCRackIndexCF = "disk-idc-rack"

// volumedb
    volumeCF                = "volume"
    volumeUnitCF            = "volume_unit"
    volumeTokenCF           = "volume_token"
    volumeTaskCF            = "volume_task"
    transitedVolumeCF       = "transited_volume"
    transitedVolumeUnitCF   = "transited_volume_unit"
    volumeUnitDiskIDIndexCF = "volumeUnit_DiskID"

// raftdb

组件详情

Access(访问接入点)

  • access是面向client,提供访问api接口的组件;

  • access为无状态节点;

  • 一个access为一个region下的所有cluster提供访问接入点;

  • access主要提供如下rpc:

    • /put: put数据,动态分配location, 默认最大size:5GB;

    • /putAt: 将数据put到指定的location;

    • /get: 根据location读取数据;

    • /delete:删除指定location的数据;

    • /deleteblob: 删除指定blob;

    • /alloc: 分配blob;

    • /sign: 签名;

  • 启动时,会从consul_agent_addr获取配置region的cluster信息加载到当前节点内存中;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Location: 一次put的数据位置信息;
//
type Location struct {
    _         [0]byte
    ClusterID proto.ClusterID   `json:"cluster_id"`
    CodeMode  codemode.CodeMode `json:"code_mode"`
    Size      uint64            `json:"size"`
    BlobSize  uint32            `json:"blob_size"`
    Crc       uint32            `json:"crc"`
    Blobs     []SliceInfo       `json:"blobs"`
}

type Blob struct {
    Bid  proto.BlobID
    Vid  proto.Vid
    Size uint32
}

type SliceInfo struct {
    _      [0]byte
    MinBid proto.BlobID `json:"min_bid"`
    Vid    proto.Vid    `json:"vid"`
    Count  uint32       `json:"count"`
}

    MaxLocationBlobs uint32 = 4    //location中最大blob数量,4个
    MaxDeleteLocations int = 1024  //delete请求中最多包含location数:1024
    MaxBlobSize uint32 = 1 << 25    // location中单个blob最大size:32MB
  • 配置文件
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
//access.conf
{
    "bind_addr": ":9500",
    "auditlog": {
        "logdir": "/tmp/access/"
    },
    "consul_agent_addr": "127.0.0.1:8500",    //
    "service_register": {
        "consul_addr": "127.0.0.1:8500",
        "service_ip": "x.x.x.x"
    },
    "stream": {
        "idc": "idc",
        "cluster_config": {
            "region": "region"
        }
    }
}

Access Client

  • access client 为访问access 提供api;

  • access client api通过配置文件中的PriorityAddrs或consul 来连接access;

  • 每次api发送请求时,会从发现的所有access中随机选择MaxHostRetry(默认:3)个依次给access发送api请求,如果一个请求发送成功,则成功返回;

  • access client 主要 提供3个api:

    • Put(): 写入数据, 返回数据location;

    • Get(): 根据location,获取数据内容;

    • Delete(): 删除指定location的数据;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// API access api for s3
// To trace request id, the ctx is better WithRequestID(ctx, rid).
type API interface {
    // Put object once if size is not greater than MaxSizePutOnce, otherwise put blobs one by one.
    // return a location and map of hash summary bytes you excepted.
    Put(ctx context.Context, args *PutArgs) (location Location, hashSumMap HashSumMap, err error)
    // Get object, range is supported.
    Get(ctx context.Context, args *GetArgs) (body io.ReadCloser, err error)
    // Delete all blobs in these locations.
    // return failed locations which have yet been deleted if error is not nil.
    Delete(ctx context.Context, args *DeleteArgs) (failedLocations []Location, err error)
}

Allocator(资源分配器)

  • allocator主要为accessor提供volume/alloc, list两个接口,负责vid,bid的分配;

  • allocator启动时,将自身组成到clustermgr的服务中;

  • allocator内置了volumeMgr, bidmgr

    • volumeMgr:负责volume的分配的管理。启动时,开启后台线程,将clusterMgr上的所有volInfo加载到allocate内存中;

    • bidMgr:负责volume 的bid range分配;

  • rpc:

    • /volume/alloc:

    • /volume/list:

  • 配置文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// allocator.conf
{
    "bind_addr": ":9100",
    "host": "http://127.0.0.1:9100",
    "cluster_id": 1,
    "idc": "z0",
    "clustermgr": {
        "hosts": [
            "http://127.0.0.1:9998",
            "http://127.0.0.1:9999",
            "http://127.0.0.1:10000"
        ]
    },
    "log": {
        "level": 1
    },
    "auditlog": {
        "logdir": "/tmp/allocator/"
    }
}

ClusterMgr(集群管理器)

  • clustermgr用于集群相关信息的管理;

  • clustermgr内部由多个管理器组成,包括:

    • ServiceMgr(服务管理):管理集群中的各个Service, 包括:allocator,mqproxy, tinker;

    • ConfigMgr(配置管理): cluster_config, 集群的全局配置;

    • VolumeMgr(逻辑卷管理):负责volume的allocate,状态监测等, ;

    • DiskMgr(磁盘管理):管理集群内所有blobnode磁盘;

    • ScopeMgr(Bid管理):id分配管理;

  • clustermgr内置kvdb(目前为rocksdb)来保存各种管理器的相关数据;

  • 多个clustermgr通过raft保证kvdb数据在多个节点的同步以保证高可用;

  • clustermgr周期性(参数cluster_report_interval_s,默认:60s)的将clusterinfo汇报给consul_agent_addr上,以ebs/<region>/clusters/<clusterID>:<clusterInfo>

  • leader clustermgr周期性的(默认: 10s)检查并更新disk状态;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// clustermgr.conf
{
    "max_procs":0,
    "bind_addr":":9998",
    "cluster_id":1,
    "idc": ["z0"],
    "region": "test-region",
    "readonly": false,
    "chunk_size": 16777216,
    "consul_agent_addr": "127.0.0.1:8500",  //option, default: 127.0.0.1:8500
    "log": {
        "level": 1
    },
    "auditlog": {
        "logdir": "/tmp/clustermgr/",
        "backup": 9,
        "log_file_suffix": ".log",
        "rotate_new": true,
        "chunkbits": 14

    },
    "auth": {
        "enable_auth": false,
        "secret": "testsecret"
    },
    "normal_db_path":"/tmp/normaldb0",
    "normal_db_option": {"create_if_missing":true},
    "code_mode_policies": [
        {"mode_name":"EC3P3","min_size":0,"max_size":1024,"size_ratio":0.2,"enable":true},
        {"mode_name":"EC6P6","min_size":1025,"max_size":2048,"size_ratio":0.8,"enable":false}
    ],
    "volume_mgr_config": {
        "volume_db_path":"/tmp/volumedb0",
        "volume_db_option": {"create_if_missing":true},
        "min_allocable_volume_count":4,
        "each_allocator_volume_threshold":500,
        "retain_time_s":600
    },
    "cluster_config": {
        "disk_repair":"Disable",
        "balance":"Disable",
        "disk_drop":"Disable",
        "blob_delete":"Disable",
        "shard_repair":"Disable",
        "vol_inspect":"Disable",
        "init_volume_num":100,
        "volume_reserve_size":10485760,
        "data_node_heartbeat_interval_s": 60
    },
    "raft_config": {
        "raft_db_path": "/tmp/raftdb0",
        "raft_db_option": {"create_if_missing":true},
        "snapshot_patch_num": 64,
        "server_config": {
            "nodeId": 1,
            "listen_port": 10110,
            "raft_wal_dir": "/tmp/raftwal0",
            "peers": {"1":"127.0.0.1:10110","2":"127.0.0.1:10111","3":"127.0.0.1:10112"}
        },
        "raft_node_config":{
            "flush_num_interval": 10000,
            "flush_time_interval_s": 10,
            "truncate_num_interval": 10,
            "node_protocol": "http://",
            "nodes": {"1":"127.0.0.1:9998", "2":"127.0.0.1:9999", "3":"127.0.0.1:10000"}
        }
    },
    "disk_mgr_config": {
        "refresh_interval_s": 300,
        "rack_aware":false,
        "host_aware":false
    }
}

Cluster(集群)

  • 一个cluster由多个cluster_id相同的多个clustermgr节点组成;

  • 一个cluster中的多个clustermgr通过raft组成一个raft group进行数据复制同步;

  • 一个region中可包含多个cluster,同一region下的cluster注册为consul下的多个服务;

  • 数据put时,access根据配置的region,会随机在region下选择一个cluster来put数据,返回的location中包含cluster_id用以指明所在cluster;

Volume(卷)

  • Volume是数据管理逻辑单元,是面向客户端的数据视图;

  • 基本信息

    • vid(32bits)

    • CodeMode: 编码模式

    • Status: 状态

    • HealthScore: 健康分

    • Total: 总空间

    • Free: 可用空间

    • Used: 已使用空间

    • CreateByNodeID: 创建NodeID

  • Volume Status

    • idle: volume刚创建时,尚未被分配的volume状态;

    • active: volume分配给某一个allocator后的状态;

    • lock:

    • unlocking

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
//卷信息
type VolumeInfo struct {
    Units []Unit
    VolumeInfoBase
}
//卷单元
type Unit struct {
    Vuid       proto.Vuid 
    DiskID     proto.DiskID
    Host       string
}

// clustermgr/srv.go
    BidScopeName             = "bid"
    MaxBidCount              = 100000
    DefaultChunkSize         = 17179869184
    DefaultVolumeReserveSize = 10485760

    defaultClusterReportIntervalS   = 60
    defaultHeartbeatNotifyIntervalS = 10
    defaultMaxHeartbeatNotifyNum    = 2000
    defaultMetricReportIntervalM    = 2

BlobNode

  • BlobNode是blob管理节点,提供数据的blob存储管理;

  • blobnode启动时,将扫描并加载配置文件下的disk,然后后clusterMgr上已有的disk进行对比,如果clusterMgrnot found,则AddDiskclusterMgr的管理器中;

  • 启动时开启如下全局协程:

    • loopHeartbeatToClusterMgr(): 周期性(默认:30s)维持和clusterMgr间的心跳;

    • loopReportChunkInfoToClusterMgr():周期性(默认:57s)向clusterMgr汇报chunkInfo;

    • loopGcRubbishChunkFile(): 周期(默认:60min)检查chunkfile,执行垃圾回收任务;

    • loopCleanExpiredStatFile(): 周期()清理过期stat文件(/tmp/shm/目录下的iostat文件);

blobnode中disk

  • blobnode中disk由配置文件中disks项指定,一个blobnode可以包含多个disk;

  • blobnode启动时,先通过rpc从clusterMgr获取该host所有已经注册的disk;

  • 然后根据配置文件, 逐个加载disk;

  • 每个disk包含.sys, meta等子目录,

    • .sys目录里面保存disk的格式相关信息;

    • .meta里面使用rocksdb来记录chunk相关的元数据信息;

    • .data目录包含chunkfile;

  • blobnode disk加载流程:

    • 通过配置文件获取disk根目录;

    • 加载磁盘format信息。通过读取.sys目录下的format.json内获取disk format信息(包括DiskID), 如果该文件不存在,则从clusterMgr分配一个新的DiskID, 并注册disk到clusterMgr的table中;

    • 打开meta目录下的rocksdb,初始化化superblock;

    • 从meta db中读取diskinfo来加载diskinfo;

    • 设置disk iostat 相关数据结构;

    • 启动loop相关后台协程

      • ds.loopCleanChunk: 清理已released 的chunk;

      • ds.loopCompactFile: compact chunk file;

      • ds.loopDiskUsage:

      • ds.loopCleanTrash:

      • ds.loopMetricReport:

    • 加载好的diskStorage记录在map[diskID]Disks 中;

  • blobnode rpc:

    • /chunk/create

    • /chunk/release

    • /chunk/compact

    • ...

    • /shard/get/...

    • /shard/put/....

    • /shard/delete/...

配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// blobnode.conf
{
  "bind_addr": ":8899",
  "cluster_id": 1,
  "idc": "z0",
  "rack": "testrack",
  "host": "http://127.0.0.1:8899",
  "disks": [
    {
      "path": "./run/disks/disk1",
      "auto_format": true,
      "max_chunks": 1024
    },
    {
      "path": "./run/disks/disk2",
      "auto_format": true,
      "max_chunks": 1024
    },
    {
      "path": "./run/disks/disk3",
      "auto_format": true,
      "max_chunks": 1024
    },
    {
      "path": "./run/disks/disk4",
      "auto_format": true,
      "max_chunks": 1024
    },
    {
      "path": "./run/disks/disk5",
      "auto_format": true,
      "max_chunks": 1024
    },
    {
      "path": "./run/disks/disk6",
      "auto_format": true,
      "max_chunks": 1024
    }
  ],
  "clustermgr": {
    "hosts": [
      "http://127.0.0.1:9998",
      "http://127.0.0.1:9999",
      "http://127.0.0.1:10000"
    ]
  },
  "log": {
    "level": 0,
    "filename": "./run/blobnode.log"
  },
  "auditlog": {
    "logdir": "./run/auditlog"
  }
}

存储格式

  • volume->chunk

  • chunk->shard

Disk(磁盘)

  • Disk是BlobNode节点上一个格式化后挂载到某一目录的物理磁盘,用来存储具体的数据;

  • Disk基本信息包括:

    • DiskID: 磁盘ID(uint32), 1~1^31-1;

    • ClusterID: 所属ClusterID(uint32);

    • Idc: Idc

    • Path:

    • Host:

    • Status

    • ReadOnly: 是否只读

  • Disk Status包括:

    • Normal: 正常

    • Broken:

    • Repairing

    • Repaired

    • Droped

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// disk format
// blobnode/core/format.go
/*
 * ${diskRoot}/
 *         - .sys/                 //disk
 *          - .format.json         //disk 格式信息;
 *          - .format.json.tmp
 *         - .trash/                //回收站,被清理的chunk
 *         - data/                  //disk数据信息, chunk file
            - <chunkID1>file     // chunk file 1  (最多8196个chunk)
            - <chunkID2>file     // chunk file 2 
 *         - meta/                  //disk元信息
            - superblock/        //disk超级块,disk meta kvdb
*/

// 磁盘格式信息受保护域,
type FormatInfoProtectedField struct {
    DiskID  proto.DiskID `json:"diskid"`    //
    Version uint8        `json:"version"`   //
    Ctime   int64        `json:"ctime"`     //
    Format  string       `json:"format"`    //disk格式,当前只支持fs
}
//磁盘格式信息
type FormatInfo struct {
    FormatInfoProtectedField
    CheckSum uint32 `json:"check_sum"`
}

//disk metadb(rocksdb)
//    k       ----        v
// disk/<diskid>        [diskid]  
// chunk/<chunkid>      [chunkid]
// vuid/<vuid>          [vuid]
// diskinfo             [diskmeta]

//DiskMeta
type DiskMeta struct {
    FormatInfo
    Host       string           `json:"host"`
    Path       string           `json:"path"`
    Status     proto.DiskStatus `json:"status"`
    Registered bool             `json:"registered"`
    Readonly   bool             `json:"readonly"`
    Mtime      int64            `json:"mtime"`
}

Chunk

  • Chunk是对Disk上具体数据的一种抽象,当前的chunk实现为一个file;

  • chunk file 位于disk的data目录下, 以chunkID为名的file;

  • chunkID组成:vuid+<创建chunk时的时间戳>

  • 每个chunk由多个连续的shard组成,

  • 一个disk默认最多chunk数DefaultMaxChunks:8192;

  • 一个chunk默认最大sizeDefaultMaxChunk: 1TiB;

  • 默认chunk sizeDefaultChunkSize:16GiB;

  • chunk status:

    • Default:

    • Normal:

    • ReadOnly:

    • Release:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// cubufs-blobstore/blobnode/core/shard.go

// chunk datafile format:
//  --------------
// | chunk  header | (4k)
//  --------------
// |    shard    |         ----------------------
// |    shard    |        |  header (32 Bytes) |
// |    shard    | ---->  |  data   (...)      |
// |    shard    |        |  footer (8 Bytes)  |
// |    ....    |        ----------------------
//
// Chunkdata header format:
//  --------------
// | magic number |   ---- 4 bytes
// | version      |   ---- 1 byte
// | parent chunk |   ---- 16 byte    //compact前的chunk   
// | create time  |   ---- 8 byte
// | padding      |   ---- aligned with shard padding size ( 4k-4-1-16-8)

// shard
// shard header format:
// ---------------------
// |crc(header)(uint32)|
// |  magic  (uint32)  |
// |  bid    (int64)   |
// |  vuid   (uint64)  |
// |  size   (uint32)  |
// | padding (4 bytes) |
// ---------------------
//
// shard data format.
//  --------------
// | block        |   ---- 64 KiB
// | block        |   ---- 64 KiB
// | block        |   ---- 64 KiB
//  ---------------
//
// block format.
//  --------------
// | crc          |   ---- 4 Byte
// | data         |   ---- (64 KiB - 4)
//  ---------------
//
// footer format:
// ----------------------
// |  magic   (int32)   |
// | crc(shard) (int32) |
// | padding  (0 bytes) |
// ----------------------


// Chunkdata has a header (4k).
// Chunkdata header format:
//  --------------
// | magic number |   ---- 4 bytes
// | version      |   ---- 1 byte
// | parent chunk |   ---- 16 byte
// | create time  |   ---- 8 byte
// | padding      |   ---- aligned with shard padding size ( 4k-4-1-16-8)
//  --------------
// |    shard     |
// |    shard     |
// |    shard     |
// |    shard     |
// |    shard     |
// |    ....      |

type ChunkHeader struct {
    magic       [_chunkMagicSize]byte
    version     byte
    parentChunk bnapi.ChunkId
    createTime  int64
}

// Chunk ID
// vuid + timestamp
const (
    chunkVuidLen      = 8
    chunkTimestmapLen = 8
    ChunkIdLength     = chunkVuidLen + chunkTimestmapLen
)

Shard

  • shard是chunk中用于存储一个ec编码

  • shard status:

    • Default:

    • Normal:

    • MarkDelete:

MQProxy(消息代理)

  • mqproxy为kafka mq代理,作为异步消息传输通道;
  • 启动时,将自身作为 service注册到clustermsg中的service table中;
  • mqproxy接收access下发的shardRepardMsgblobDeleteMsg消息,发送到kafka中,最后被scheduler消费;
  • scheduler中的inspectMgr在巡检时,如果发现丢失的shard,将给mqproxy发送shardRepairedMsg;
  • 提供rpc接口:
    • /repairmsg: shard修复消息;
    • /deletemsg: blob删除消息;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//mqproxy.conf
{
  "bind_addr": ":9600",
  "cluster_id": 1,
  "cm_cfg": {
    "hosts": ["http://127.0.0.1:7000", "http://127.0.0.1:7010", "http://127.0.0.1:7020"]
  },
  "clustermgr": {
    "blob_delete_topic": "blob_delete",
    "shard_repair_topic": "shard_repair",
    "shard_repair_priority_topic": "shard_repair_prior",
    "msg_sender": {
      "broker_list": ["127.0.0.1:9092"]
    }
  },
  "service_register": {
    "host": "http://127.0.0.1:9600",
    "idc": "z0"
  },
  "log": {
    "level": 0,
    "filename": "/tmp/mqproxy.log"
  },
  "auditlog": {
    "logdir": "./auditlog/mqproxy"
  }
}

Scheduler(任务调度器)

  • scheduler提供所在cluster后台任务调度服务;
  • 内置多种管理器,来对各种任务进行管理:
    • ClusterTopoMgr: 集群拓扑管理器, 周期性(默认:5min)的从clustermgr获取配置集群的disk信息,然后构建拓扑信息;
    • BalanceMgr:数据平衡管理器, 周期性(默认:5s)收集disk分布情况,构建MigrateMgr任务,使数据自动保证平衡;
    • MigrateMgr: 迁移管理器, 分为自动、手动迁移管理器两种。自动迁移管理器由BalanceMgr管理;手动迁移管理器由api调用触发:
    • DiskDropMgr:下线磁盘管理器,负责;
    • RepairMgr:shard修复任务管理器,管理下发shardreapirmsg task;
    • InspectMgr: 巡检管理器, 巡检vol,发现坏的blob时,给mqproxy发送ShardRepairMsg
  • 启动时,先根据配置文件初始化各个管理器 ,再从mongo中加载已有的task,最后使各个mgr run起来;
  • scheduler内置service table, tinker和worker组件在启动时,会将自身注册到scheduler的service table中;
  • 各管理器生成task,并插入到相应的mongdb task table中;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  // scheduler.conf
  {
   "bind_addr": ":9800",         # 服务端口
   "cluster_id": 1,              # 集群id
   "clustermgr": {               # clustermgr地址
     "hosts": ["http://127.0.0.1:9998", "http://127.0.0.1:9999", "http://127.0.0.1:10000"]
   },
   "database": {                 # 后台任务相关配置
     "mongo": {
       "uri": "mongodb://127.0.0.1:27017" # mongodb 地址
     },
     "db_name": "scheduler"      # 数据库名
   },
   "task_archive_store_db": {    # 后台任务备份表
     "mongo": {
       "uri": "mongodb://127.0.0.1:27017" # mongodb 地址
     },
     "db_name": "task_archive_store" # 数据库名
   },
   "log":{                         # 运行日志相关配置
     "level":0,                    # 0:debug, 1:info, 2:warn, 3:error, 4:panic, 5:fatal
     "filename": "/tmp/scheduler.log" # 运行日志文件,会自动轮转
   },
   "auditlog": {                    # 审计日志相关配置
     "logdir": "./auditlog/scheduler" # 审计日志目录
   }
  }
  • rpc:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// scheduler/startup.go
    rpc.GET("/task/acquire", service.HTTPTaskAcquire, rpc.OptArgsQuery())
    rpc.POST("/task/reclaim", service.HTTPTaskReclaim, rpc.OptArgsBody())
    rpc.POST("/task/cancel", service.HTTPTaskCancel, rpc.OptArgsBody())
    rpc.POST("/task/complete", service.HTTPTaskComplete, rpc.OptArgsBody())
    rpc.POST("/manual/migrate/task/add", service.HTTPManualMigrateTaskAdd, rpc.OptArgsBody())

    rpc.GET("/inspect/acquire", service.HTTPInspectAcquire, rpc.OptArgsQuery())
    rpc.POST("/inspect/complete", service.HTTPInspectComplete, rpc.OptArgsBody())

    rpc.POST("/task/report", service.HTTPTaskReport, rpc.OptArgsBody())
    rpc.POST("/task/renewal", service.HTTPTaskRenewal, rpc.OptArgsBody())

    rpc.POST("/balance/task/detail", service.HTTPBalanceTaskDetail, rpc.OptArgsBody())
    rpc.POST("/repair/task/detail", service.HTTPRepairTaskDetail, rpc.OptArgsBody())
    rpc.POST("/drop/task/detail", service.HTTPDropTaskDetail, rpc.OptArgsBody())
    rpc.POST("/manual/migrate/task/detail", service.HTTPManualMigrateTaskDetail, rpc.OptArgsBody())
    rpc.GET("/stats", service.HTTPStats, rpc.OptArgsQuery())

    rpc.GET("/service/list", service.HTTPServiceList, rpc.OptArgsQuery())
    rpc.POST("/service/register", service.HTTPServiceRegister, rpc.OptArgsBody())
    rpc.GET("/service/get", service.HTTPServiceGet, rpc.OptArgsQuery())
    rpc.POST("/service/delete", service.HTTPServiceDelete, rpc.OptArgsBody())

Tinker(修补器)

  • 启动时,将自身注册到scheduler的service服务列表中;
  • 内置shardRepairMgr, BlobDeleteMgr, 执行shard repair,blob delete任务;
  • 从kafka mq中消费相应的消息, 并通过rpc通知worker执行相应的任务;
  • tinker使用mongodb保存kafka offset和orphanShard;
  • rpc:
    • /update/vol:
    • /stats:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
//tinker.conf
{
   "bind_addr": ":9700",           # 服务端口
   "cluster_id":1,                 # 集群id
   "database_conf": {              # mongodb相关配置
       "mongo": {
         "uri": "mongodb://127.0.0.1:27017" # mongodb地址
       },
       "db_name": "tinker"         # 数据库名
   },
   "shard_repair":{                # 数据修补相关配置
        "broker_list":["127.0.0.1:9092"], # kafka 地址
        "priority_topics":[        # 修补主题配置
            {
                 "priority":1,     # 修复优先级,数值越大优先级越高
                 "topic":"shard_repair", # 主题
                 "partitions":[0]  # 消费分区
            },
            {
                "priority":2,      # 修复优先级,数值越大优先级越高
                "topic":"shard_repair_prior", # 主题
                "partitions":[0]   # 消费分区
             }
        ],
        "fail_topic":{             # 修补主题消费配置
             "topic":"shard_repair_failed", # 主题
             "partitions":[0]      # 消费分区
        }
   },   
    "blob_delete":{                 # 数据删除相关配置
         "broker_list":["127.0.0.1:9092"], # kafka地址
         "normal_topic":{          # 删除消息消费配置
             "topic":"blob_delete",# 主题
             "partitions":[0]      # 消费分区
         },
         "fail_topic":{            # 删除失败消息消费配置
             "topic":"fail_blob_delete", # 主题
             "partitions":[0]      # 分区
         },
         "safe_delay_time_h":72,   # 删除保护期
         "dellog":{                # 删除记录相关配置
             "dir": "./delete_log" # 删除日志目录
         }
    }, 
    "clustermgr": { # clustermgr地址
         "hosts": ["http://127.0.0.1:9998", "http://127.0.0.1:9999", "http://127.0.0.1:10000"]
     },
     "scheduler": { # scheduler服务地址
         "host": "http://127.0.0.1:9800"
     },
     "service_register":{ # 自身服务注册信息
         "host":"http://127.0.0.1:9700", # 服务地址
         "idc":"z0" # 服务所属机房
     },
     "log":{ # 运行日志相关配置
         "level":0, # 0:debug, 1:info, 2:warn, 3:error, 4:panic, 5:fatal
         "filename": "/tmp/tinker.log" # 运行日志文件,会自动轮转
     },
     "auditlog": { # 审计日志相关配置
         "logdir": "./auditlog/tinker" # 审计日志目录
     }
}

Worker()

  • 执行tinker下发(rpc api)的shard repair, blob delete任务;

  • 启动时,将自身注册到scheduler的service 列表中;

  • 周期性(默认: 500ms)的从scheduler获取task(调用scheduler rpc:/task/acquire), 并执行task(包括: Repair, Balance, DiskDrop, ManualMigrate, Inspect);

  • rpc api:

    • /shard/repair

    • /stats

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
  //worker.conf
  {
   "bind_addr": ":9910",                # 服务端口
   "cluster_id": 1,                     # 集群id
   "service_register": {                # 自身服务consul注册信息
     "host": "http://127.0.0.1:9910",   # 服务地址
     "idc": "z0"                        # 服务所属机房
   },
   "scheduler": {                       # scheduler服务相关配置
     "host": "http://127.0.0.1:9800"    # 服务地址
   },
   "dropped_bid_record": {              # 丢弃blob id原因记录
     "dir": "./dropped"                 # 记录目录
   },
   "log":{                              # 运行日志相关配置
     "level":0,                         # 0:debug, 1:info, 2:warn, 3:error, 4:panic, 5:fatal
     "filename": "/tmp/worker.log"      # 运行日志文件,会自动轮转
   },
   "auditlog": {                        # 审计日志相关配置
     "logdir": "./auditlog/worker"      # 审计日志目录
   }
  }

Cli

  • cli是blobstore提供的一个命令行工具,用于对blobstore进行管理操作;

  • cli通过access-clinetapi 提供和access交互的能力;

  • cli可配置redis来缓存;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//cli.conf
{
    "access": {
        "conn_mode": 4,                          //4: no timeout
        "consul_addr": "http://127.0.0.1:8500",  //
        "max_host_retry": 0,
        "max_part_retry": 0,
        "max_size_put_once": 0,
        "priority_addrs": [
            "http://localhost:9500",
            "http://127.0.0.1:9500"
        ],
        "service_interval_ms": 0
    },
    "redis_addrs": [
        "127.0.0.1:6379"
    ],
    "redis_user": "",
    "redis_pass": "",
    "cm_addrs": [
        "http://localhost:9998",
        "http://127.0.0.1:9998"
    ],
    "verbose": false,
    "vverbose": false
}

外部组件

Consul

  • 提供cluster信息注册,查询服务;
1
2
3
// ConsulRegisterPath
                   key                             value
clusterMgr: ebs/<region>/clusters/<cluster_id> :  <cluster_info>

MongoDB

  • mongodb用于存储scheduler模块调度信息;

dbdafault tableconfig key说明模块
scheduler"database:db_name"scheduler.conf
balance_tbl
disk_drop_tbl
repair_tbl
inspect_checkpoint_tbl
manual_migrate_tbl
svr_register_tbl
tasks_tbl

Kafka

  • 用于组件间异步 任务消息传递, mqproxy对其相关操作进行封装;

  • 主要用来传递shardRepairblobDelete两类消息;

Redis

  • 为access访问cm volume提供缓存;

  • 缓存expires:30-60min;

1
2
3
//redis key
GroupVolumeKey   "get-volume-<cvid>"
RedisVolumeKey   "access/volume/<clusterID>/<vid>"  Value: &VolumePly{}

参数

BlobNode

  • 一个disk默认最多chunk数DefaultMaxChunks:8192;

  • 一个chunk默认最大sizeDefaultMaxChunk: 1TiB;

  • 默认chunk sizeDefaultChunkSize:16GiB;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//blobnode/core/config.go

    DefaultDiskReservedSpaceB           = int64(60 << 30) // 60 GiB
    DefaultChunkSize                    = int64(16 << 30) // 16 GiB
    DefaultMaxChunks                    = int32(1 << 13)  // 8192
    DefaultChunkReleaseProtectionM      = 1440            // 1 days
    DefaultChunkGcCreateTimeProtectionM = 1440            // 1 days
    DefaultChunkGcModifyTimeProtectionM = 1440            // 1 days
    DefaultChunkCompactIntervalSec      = 10 * 60         // 10 min
    DefaultChunkCleanIntervalSec        = 60              // 1 min
    DefaultDiskUsageIntervalSec         = 60              // 1 min
    DefaultDiskCleanTrashIntervalSec    = 60 * 60         // 60 min
    DefaultDiskTrashProtectionM         = 2880            // 2 days
    DefaultCompactBatchSize             = 1024            // 1024 counts
    DefaultCompactMinSizeThreshold      = 16 * (1 << 30)  // 16 GiB
    DefaultCompactTriggerThreshold      = 1 * (1 << 40)   // 1 TiB
    DefaultMetricReportIntervalS        = 30              // 30 Sec
    DefaultCompactEmptyRateThreshold    = float64(0.8)    // 80% rate

// blobnode/core/disk/disk.go
    MaxChunkSize    = int64(1024 << 30) // 1024 GiB, 一个disk chunk最大size,

// access config
defaultMaxObjectSize int64 = 5 * (1<<30)  //单个object最大put大小;

关键流程

创建卷(CreateVolume)

  • ClusterMgr启动后,会根据配置为每种ec策略预先创建一定数量的Volume用于分配;

  • 入口为clusterMgr中volumeMgr的createVolume()函数;

  • 先通过scopMgr.Alloc 分配一个新vid;

  • 根据mode获取unitCount(为tactic的N+M+L);

  • 初始化volume units信息vuInfos;

  • 初始化volInfo;

  • 通过raft执行initCreateVolume指令,将volume信息记录到transitedTable中;

  • 通过allocChunkForAllUnits为vol所有的units分配chunk;

    • allocChunkForAllUnits会根据vol的编码策略,将chunk随机分布到不同idc上;

    • 通过blobnode ChunkCreate rpc调用在blobnode上创建chunk;

  • 通过raft执行CreateVolume指令,删除transitedTbl中的volume units,将volume units相关信息记录到volumeTbl中 ;

写入(Put)

  • put提供将数据写入到blobstore中的能力;

Put流程从access-client中的Put()方法开始:

access-client

  • access-client Put()流程:

    • 根据数据判断,如果size == 0, 返回空location;

    • Size <= MaxSizePutOnec(默认:256MB) ,通过putObject()单个对象上传;

    • 否则,通过putParts分块上传:

  • putObject流程:

    • 数据size小于PutOnce缓存(默认: 8MB),把数据一次读入buffer中;

    • 调用access put rpc将数据put上去;

    • 返回location;

  • putParts流程:

    • 通过rpc 从access中alloc一个location和tokens

    • 启动一个后台goroutine,将数据按config.PartConcurrenc(默认:4)读取到buffer中;

    • 将buffer中的数据增加token,cid,vid等相关信息组装为一个parts;

    • 通过putPartsBatch将parts批量上传;

access

  • access put rpc的入口为service.Put, 其处理流程如下:

    • ParseArgs(args)解析参数;

    • args.IsValid()判断参数是否有效;

    • 根据args中的hashes设置hasherMap;

    • 调用streamHandler.Put()

    • 通过hasherMap计算返回的hashSumMap;

  • stream_put Put()流程:

    • 如果size>maxObjectSize(默认:256MB), 返回错误;

    • 如果hasherMap个数>0; 复制一个reader用于计算hash;

    • 调用SelectCodeMode(size),根据大小选择一个合适的纠删码策略;

    • 根据access配置MaxBlobSize,设置blobSize(默认: 4MB);

    • 调用allocFromAllocatorWithHystrix(), 根据纠删码、size, blobSize,从allocator分配clusterID,blobs, blob个数由size, blobSize计算而来;

    • 根据要读取blob的大小和纠删码策略,在编码器encoder中ec.NewBuffer()一块ec.Buffer;

    • 通过ec.Split()将ec.Buffer中数据缓存切分为对应编码的shards;

    • 依次处理各个blob:

      • 读取blob数据填充到ec.Buffer的dataBuf中;

      • 调用对应编码器的Encode()将shards进行编码;

      • 通过writeToBlobnodesWithHystrix()将shards写入到blobnode中;

      • 如果有写入出错的个数>0, 则调用RepairMsgBg(),往mqproxy发送repair消息sendRepairMsgBg

    • 上传过程如果不成功,通过clearGarbage()清理无用垃圾数据;

  • ec.NewBuffer()建立编码缓冲区流程:

    • 根据dataSize和编码策略数据分片数shardN,计算每个分片的大小shardSize=(dataSize+shardN-1)/shardN;

    • 根据分片大小shardSize和策略参数,计算编码总缓冲大小ecSize=shardSize*(N+M+L);

    • 从内存池中申请ecSize大小的缓存buf;

    • 根据参数设置buf中的不同部分;

    • 返回设置好的Buffer;

  • writeToBlobnodes()数据写入到blobnode流程:

    • 根据clusterID、vid参数获取volume;
    • 获取volume中的ec相关参数(包括:tactic,putQuorum等);
    • 根据volume中的units, 开启goroutine,通过h.blobnodeClient.PutShard()往blobnode节点同时发送写入shard请求;
    • 等待请求发送完后,如果写入成功的个数writtenNum>=putQuorum成功写入标准(),则写入成功;
    • 否则, 判断当AZCount >=3时,如果一个AZ down掉,判断剩下的AZ是否每个shard都写入成功,如果是,则也可以判断写入成功;
    • 否则写入不成功,返回错误;

blobnode

  • blobnode ShardPut_():

    • ShardPut_接收access节点发送的PutShard Rpc调用,执行shard put 写入shard数据操作;
  • datafile Write():

    • 根据Shard.Size大小, 计算(Alignphysize)写入到chunkfile中的编码后物理大小phySize(Shard 包含header,footer和block crc);

    • 根据phySize在datafile中分配待写入空间(allocSpace), 更新变量(cd.wOff)写入偏移(偏移需和4k对齐),并获取旧的(cd.wOff)作为shard.Offset本次shard写入偏移;

    • 写入shard header数据到chunk file;

    • crc32block.NewEncoder新建一个crc编码写入器,通过编码器将数据编码,并写入到chunkfile的shard header数据区后面;

    • 写入shard footer数据到 chunk file;

读取(Get)

access-client

  • access-client 中get入口为api/access/client.go (c *client)Get()

  • 先检查get参数有效性,如果size 为0,直接返回空;

  • 然后随机顺序向多个access发送get rpc请求,返回第一个成功的请求;

access

  • access 由access/stream_get.go (h *Handler)Get()负责get rpc 入口;

  • 先调用genLocationBlobs根据参数location,offset,readSize获取location中需要读取的blobs;

  • 在根据location获取clusterMgr controller sc;

  • 如果len(blobs)== 1, 如果大小较小,直接调用getDataShardOnly()读取blob数据;

  • 开启一个后台协程,依次通过readOneBlob从blobnode读取blobs中的每个blob的数据,通过chan传送个主流程

  • 主流程依次接收读取的数据,并写入到get结果的buffer中;

  • 读取一个blob数据access/readOneBlob():

    • 根据BlobSize和编码策略,分配一块新的EC bufferGetBufferSizes()

    • // 1. try to min-read shards byte

    • // 2. if failed try to read next shard to reconstruct

    •  // 3. write the the right offset bytes to writer

blobnode

  • ShardGet_:

删除(Delete)

  • 删除调用了punch hole来是否被删除的data;

  • 通过compact异步对碎片化过多的chunkfile进行整理;

合并(Compact)

  • compact用于在后台整理chunkfile,由blobnode提供了rpc api;

  • chunkfile 是否需要compact(NeedCompact):

    • chunk file size >= conf.CompactTriggerThredshold(默认:1TiB),return true;
    • chunk file size > conf.CompactMinSizeThreshold(默认: 16GB)且chunkfile 稀疏率>=80%(CompactEnptyRateThreshold参数);
  • blobnode/core/chunk/chunk.doCompact():

IOType

1
2
3
4
5
    NormalIO     IOType = iota // From: external: user io: read/write
    BackgroundIO               // From: external: repair, chunk transfer, delete
    CompactIO                  // From: internal: chunk compact
    DeleteIO                   // From: external: delete io
    InternalIO                 // From: internal: io, such rubbish clean, batch delete

安全

  • uptoken
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// token between alloc and putat
    // TokenSize max size of token array
    // clusterID + vid + bid + count + time + size
    TokenSize = 4 + 4 + 10 + 5 + 5 + 5

// UploadToken token between alloc and putat
// [0:8]   hmac (8) first 8 bytes of sha1 summary
// [8:16]  minBid (8) in the SliceInfo
// [16:20] count (4) in the SliceInfo
// [20:24] time (4) expired unix utc time, 0 means not expired
type UploadToken struct {
    Data   [TokenSize]byte
    Offset uint8
}

参考

  1. https://cubefs.readthedocs.io/zh_CN/latest/design/blobstore.html

  2. 详解Hadoop3.x新特性功能-HDFS纠删码 - 五分钟学大数据 - 博客园

  3. Erasure Code - EC纠删码原理_shelldon的专栏-CSDN博客_ec纠删码

updatedupdated2024-05-152024-05-15