ChubaoFS 读写分析

ChubaoFS 读写分析

简介

  • ExtentClient

  • ExtentHandler

sdk

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 
type ExtentClient struct {
    streamers       map[uint64]*Streamer
    dataWrapper     *Wrapper
    //...
}

type Streamer struct {
    extents     *ExtentCache
    handler     *ExtentHandler
    dirtylist   *DirtyExtentList
}

type ExtentHandler struct {

}

Write

chubaofs 客户端写操作由

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// sdk 写数据入口
func (ec *ExtentClient) Write(inode uint64, offset int, data []byte, direct bool) {
    s := client.GetStreamer(inode)                //
    s.once.Do(func(){ s.GetExtents(); })
    s.IssueWriteRequest(offset, data, direct)    //streamer处理写请求, 获取一个写请求,然后将请求放入s.request 通道
}

// 
func (s *Streamer)write(data []byte, offset, size int, direct bool) {
    // 准备写请求
    requests := s.extents.PrepareWriteRequests(offset, size, data)
    //...
    // 
    for _, req := range requests {
        s.doWrite()
    }
}

// streamer 通过extenthandler 将数据写入到datanode中
func (s *Streamer) doWrite(data []byte, offset, size int, direct bool) {
    // set storeMode
    storeMode := func(tailOff, tinySizeLimit) {
        if tailOff > tinySizeLimit {
            return proto.NormalExtentType
        } else {
            return proto.TinyExtentType
        }
    }(offset+size, s.tinySizeLimit)

    //重试写入
    for i := 0; i < MaxNewHandlerRetry; i++ {
        // 
        ek, err = s.handler.write(data, offset, size, direct)
        // 写成功
        if err == nil && ek != nil {
            break
        }
    }
}

// 写
func (eh *ExtentHandler) write() {
    for total < size {

        eh.flushPacket() // eh.request <- packet
    }

}

// 发送协程,
func (eh *ExtentHandler) sender() {
    for {
        select {
            case packet := <- eh.request:

        }
    }
}

// 接收协程
func (eh *ExtentHandler) receiver() {
    for {
        select {
            case reply := <- eh.re
        }
    }

}

streamer处理追加写请求

  • 追加写入口在(streamer)doWrite();

  • 首先根据写入请求在文件中的最大偏移(offset+size)来决定了extent类型:

    • <= 1MB, 使用TinyExtent;

    • >1MB, 使用NormalExtent;

  • 再获取extentHandler, 通过extentHandler将数据发送;

//doWrite: 处理追加写请求 // 1. 根据offset+size确定extent类型; // * <= 1MB,为tinyExtent; // * > 1MB, normalExtent; // 2. 获取extentHandler, 通过handler的write调用将数据写入, 失败的话,最多重试3次; // 3. 写入成功的话,将handler加入到dirtylist, 并将write获取的ek加入到extents缓存中; // 4. 返回写入的size; // 5. 上面步骤中出错时,中断流程,返回错误;

streamer处理覆盖写请求(streamer.doOverwrite())

// 处理覆盖写请求 // 1. flush dirtylist; // 2. 根据 file offset 从 extents 缓存中查找extentKey,如果为nil, 返回错误; // 3. 根据extentKey从缓存partitions中获取dp; // 4. 获取一个到主dp副本的conn; // 5. 根据size发送OverWrite请求到conn,并获取结果; // 6. 发送时,先试着往主副本发送, 如果失败,会依次重试往其他从副本发送,直到有一个返回成功; // 7. 接收到OverWriteReq的datanode如果是leader,则直接通过raft 将该指令执行,如果是从副本dn,则转发到leader上,由leader执行;

updatedupdated2024-12-152024-12-15