Skip to content

Commit

Permalink
Merge pull request #19 from Trinoooo/docs/annotation
Browse files Browse the repository at this point in the history
docs(ragdoll-wal): 完善代码文档注释,完成pkgsite文档格式测试
  • Loading branch information
Trinoooo authored Feb 20, 2024
2 parents 00e8c68 + a8476a5 commit e3728bc
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 228 deletions.
31 changes: 31 additions & 0 deletions storage/core/ragdoll/wal/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package wal

import "github.com/Trinoooo/eggie_kv/utils"

const dirLock = ".lock"

// getMaxBlockCapacityInWAL 获取wal目录下最大能容纳的block数量
func getMaxBlockCapacityInWAL() int64 {
return int64(utils.GetValueOnEnv(1e10, 1e8).(float64))
}

const (
// 字段长度,单位字节
headerLengthSize = 8
headerBlockIdSize = 8
headerSummarySize = 16
headerSize = 32

// 字段偏移量,单位字节
headerLengthOffset = 0
headerBlockIdOffset = 8
headerSummaryOffset = 16
headerDataOffset = 32
)

const suffix = ".active" // suffix 活跃segment文件的后缀标识

// getBaseFormat 获取segment文件名中blockIdx部分宽度
func getBaseFormat() string {
return utils.GetValueOnEnv("%010d", "%08d").(string)
}
25 changes: 25 additions & 0 deletions storage/core/ragdoll/wal/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Package wal 实现预写日志(Write-Ahead-Log)
//
// Log 致力于对外提供高性能读、写、截断日志服务,这些操作通过内部加锁保证外部
// 并发调用不会出现竞态条件,需要注意的是:Open 操作会有竞态问题,请避免并发
// 打开日志,或者外部主动加锁避免竞态条件。
//
// Log 提供两种数据持久化级别:
// 1. 同步:每次写日志都会将数据同步到磁盘,一致性好,但性能差。
// 2. 异步:日志会先写入内存缓冲中,后台协程周期同步,一致性稍差,但性能优异。
//
// Log 允许通过 Options 自定义配置选项,配置包括权限、容量、持久化级别、同步
// 周期。下面简要介绍其中涉及到的内部实现概念:
// 1. Block:描述单条日志记录的逻辑概念。用于定位日志记录边界、记录完整性校验、
// 限制 Log 下最大日志数量(通过maxBlockCapacityInWAL)。
// 2. Segment:日志数据文件,由零或多个 Block 组成。一个 Log 中通常有多个 Segment
// 这是出于变更 Segment 文件时写时复制(Copy-On-Write)的性能考虑,这意味着
// Segment 保证原子写入文件。Segment 允许自定义容量(可通过 Options 配置),
// 但这不意味只有当一个数据文件写满后才会创建下一个,Segment 会保证 Block完整地
// 存在一个数据文件中。
// 3. Log:先行日志实体,由多个 Segment 组成。Log 将 Segment 维护在指定目录
// 下(可通过 Options 配置)。内部通过 LRU 缓存最近读命中的 Segment,同样的,
// 可以通过 Options 配置 LRU 缓存大小。
//
// TODO(Trino):日志损坏后没有修复机制,如果出现内容被篡改、损坏会导致日志不可用
package wal
116 changes: 45 additions & 71 deletions storage/core/ragdoll/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,65 +15,43 @@ import (
"strings"
)

const (
// 字段长度,单位字节
headerLengthSize = 8
headerBlockIdSize = 8
headerSummarySize = 16
headerSize = 32

// 字段偏移量,单位字节
headerLengthOffset = 0
headerBlockIdOffset = 8
headerSummaryOffset = 16
headerDataOffset = 32
)

const suffix = ".active" // suffix 活跃segment文件的后缀标识

// getBaseFormat 获取segment文件名中blockIdx部分宽度
func getBaseFormat() string {
return utils.GetValueOnEnv("%010d", "%08d").(string)
}

type position struct {
start int64 // start 表示起始偏移量
end int64 // end 表示结束偏移量
}

// TODO: 补充corrupt处理
// segment 存储日志数据的文件
// 注意:所有字段都不允许外部直接访问
// segment 日志数据的文件
// 所有字段外部都不应该外部直接访问
// 出于性能考虑所有方法均不保证并发安全
type segment struct {
fd *os.File // fd segment 文件描述符
path string // path segment 文件路径
// startBlockIdx segment 中的起始 blockIdx,和 firstBlockIdx 的区别是前者指代 segment 文件的起始边界
// 边界存在不意味着第一条记录存在,后者指代 segment 文件中第一个block的blockIdx
fd *os.File // fd 数据文件描述符
path string // path 数据文件路径
// startBlockIdx 数据文件的起始 block 索引,和 firstBlockIdx 的区别是前者指代数据文件的起始边界
// 边界存在不意味着第一条记录存在,后者指代数据文件中第一个 block 的索引
startBlockIdx *int64
firstBlockIdx int64 // firstBlockIdx segment 中的起始 blockIdx
lastBlockIdx int64 // lastBlockIdx segment 中最后的 blockIdx
// bbuf segment 中存储的数据内容
// segment 存储的最大容量取决于外部,存储结构如下:
// | block #1 | block #2 | 0000 |
// 当文件剩余容量不足以再写下一个完整 block 时
// 文件末尾剩余内容不再使用,保证文件开头是一个完整的 block
firstBlockIdx int64 // firstBlockIdx 数据文件维护的起始 block 索引
lastBlockIdx int64 // lastBlockIdx 数据文件维护的最后 block 索引
// bbuf 数据文件活跃状态下在内存中维护日志数据,存储结构如下:
// | block #1 | block #2 | block #3 | block #4 | 0000 |
// 当文件剩余容量不足以再写下一个完整 block 时文件末尾剩余内容不再使用,
// 保证文件开头是一个完整的 block
bbuf []byte
bpos []*position // bpos 指示 bbuf 中每个block的位置
bbufSyncIdx int64 // bbufSyncIdx 下一个要刷盘的 bbuf 偏移量

maxSize int64 // maxSize segment 文件最大体积
hasSuffix bool // hasSuffix segment 文件路径中是否包含.active后缀
bbufSyncIdx int64 // bbufSyncIdx 下一个要同步的 bbuf 数据下标
maxSize int64 // maxSize 数据文件最大体积
hasSuffix bool // hasSuffix 数据文件路径中是否包含.active后缀,用于标识最后活跃的数据文件
opened bool // opened 数据文件是否已经打开
}

opened bool // opened 文件是否已经打开
// position 日志位置信息
type position struct {
start int64 // start 在bbuf中的起始偏移量
end int64 // end 在bbuf中结束偏移量
}

// newSegment 初始化数据文件
func newSegment(path string, maxSize int64) (*segment, error) {
seg := &segment{}
seg.path = path
seg.maxSize = maxSize
seg.firstBlockIdx = -1
seg.lastBlockIdx = -1
startBlockIdx, hasSuffix, err := baseToBlockId(filepath.Base(seg.path))
startBlockIdx, hasSuffix, err := baseToBlockIdx(filepath.Base(seg.path))
if err != nil {
return nil, err
}
Expand All @@ -82,7 +60,7 @@ func newSegment(path string, maxSize int64) (*segment, error) {
return seg, nil
}

// open 打开segment
// open 打开数据文件
func (seg *segment) open(perm os.FileMode) error {
fd, err := utils.CheckAndCreateFile(seg.path, os.O_RDWR|os.O_CREATE|os.O_APPEND, perm)
if err != nil {
Expand All @@ -99,7 +77,7 @@ func (seg *segment) open(perm os.FileMode) error {
return e
}

bps, bbf, err := loadBinary(all)
bps, bbf, err := loadBlocks(all)
if err != nil {
return err
}
Expand All @@ -115,23 +93,21 @@ func (seg *segment) open(perm os.FileMode) error {
return nil
}

// isOpened 判断数据文件是否处于打开状态
func (seg *segment) isOpened() bool {
return seg.opened
}

// getStartBlockIdx 获取 segment 文件的起始 blockIdx
// 注意:起始 blockIdx 不等价于 第一个 blockIdx,因为 segment 文件可能为空
// 需要外部保证线程安全
func (seg *segment) getStartBlockIdx() int64 {
if seg.startBlockIdx != nil {
return *seg.startBlockIdx
}
blockId, _, _ := baseToBlockId(filepath.Base(seg.path))
blockId, _, _ := baseToBlockIdx(filepath.Base(seg.path))
return blockId
}

// close 关闭 segment 文件
// 需要外部保证线程安全
func (seg *segment) close() error {
err := seg.checkState()
if err != nil {
Expand Down Expand Up @@ -159,7 +135,6 @@ func (seg *segment) close() error {
}

// write 写日志到数据文件中
// 需要外部保证线程安全
func (seg *segment) write(data []byte) error {
err := seg.checkState()
if err != nil {
Expand Down Expand Up @@ -192,7 +167,6 @@ func (seg *segment) write(data []byte) error {
}

// sync 持久化数据到磁盘
// 需要外部保证线程安全
func (seg *segment) sync() error {
err := seg.checkState()
if err != nil {
Expand All @@ -214,7 +188,6 @@ func (seg *segment) sync() error {

// read 查询 segment 文件中的指定范围数据
// 如果执行成功会截断 segment 文件中 [firstBlockIdx, idx] 范围数据
// 需要外部保证线程安全
func (seg *segment) read(idx int64) (map[int64][]byte, error) {
err := seg.checkState()
if err != nil {
Expand All @@ -241,7 +214,6 @@ func (seg *segment) read(idx int64) (map[int64][]byte, error) {
// truncate 截断 segment 文件中的指定范围数据
// 如果执行成功会截断 segment 文件中 [firstBlockIdx, idx] 范围数据
// 如果idx超过 segment 文件容纳的block数量,该文件会被截断成空文件
// 需要外部保证线程安全
func (seg *segment) truncate(idx int64) (err error) {
if idx < seg.firstBlockIdx || idx >= seg.lastBlockIdx {
seg.bpos = make([]*position, 0, consts.KB)
Expand All @@ -252,7 +224,7 @@ func (seg *segment) truncate(idx int64) (err error) {
seg.firstBlockIdx = idx + 1
seg.bpos = seg.bpos[seg.firstBlockIdx-seg.getStartBlockIdx():]
seg.bbuf = seg.bbuf[seg.bpos[0].start:] // 前面的判断保证这里取seg.bpos[0]不会有问题
// note: 这里rename可能导致不一致问题
// note: 这里 rename 可能导致不一致问题
// 即原文件内容没有被截断,但是文件名被修改成截断后的
oldPath := seg.path
defer func() {
Expand All @@ -267,7 +239,7 @@ func (seg *segment) truncate(idx int64) (err error) {
err = e
}
}()
seg.path = filepath.Join(filepath.Dir(seg.path), blockIdToBase(seg.firstBlockIdx, seg.hasSuffix))
seg.path = filepath.Join(filepath.Dir(seg.path), blockIdxToBase(seg.firstBlockIdx, seg.hasSuffix))
}
seg.bbufSyncIdx = 0

Expand All @@ -280,7 +252,6 @@ func (seg *segment) truncate(idx int64) (err error) {
}

// rename 重命名 segment 文件
// 需要外部保证线程安全
func (seg *segment) rename() error {
oldPath := seg.path
if seg.hasSuffix {
Expand All @@ -299,6 +270,8 @@ func (seg *segment) rename() error {
return nil
}

// remove 移除该数据文件
// 内部会在移除之前关闭文件
func (seg *segment) remove() error {
err := seg.close()
if err != nil {
Expand All @@ -314,12 +287,12 @@ func (seg *segment) remove() error {
return nil
}

// size 返回 segment 中的block数
// 需要外部保证线程安全
// size 返回数据文件中的 block 数
func (seg *segment) size() int64 {
return int64(len(seg.bpos))
}

// checkState 检查数据文件状态
func (seg *segment) checkState() error {
if !seg.opened {
e := errs.NewFileClosedErr()
Expand All @@ -329,6 +302,7 @@ func (seg *segment) checkState() error {
return nil
}

// copyOnWrite 写时复制
func (seg *segment) copyOnWrite(copy bool) error {
dir, base := filepath.Dir(seg.path), filepath.Base(seg.path)
tempFile, err := os.CreateTemp(dir, base)
Expand Down Expand Up @@ -388,19 +362,19 @@ func (seg *segment) copyOnWrite(copy bool) error {
return nil
}

// blockIdToBase 起始blockIdx转文件名
// blockIdxToBase 起始blockIdx转文件名
// setSuffix 设置为true时会在文件名末尾追加.active后缀
func blockIdToBase(blockId int64, setSuffix bool) string {
func blockIdxToBase(blockId int64, setSuffix bool) string {
base := fmt.Sprintf(getBaseFormat(), blockId)
if setSuffix {
base += suffix
}
return base
}

// baseToBlockId 文件名转起始blockIdx
// baseToBlockIdx 文件名转起始blockIdx
// 额外返回文件名中是否包含.active后缀
func baseToBlockId(base string) (int64, bool, error) {
func baseToBlockIdx(base string) (int64, bool, error) {
blockIdStr, hasSuffix := strings.CutSuffix(base, suffix)
firstBlockIdOfSegment, err := strconv.ParseInt(blockIdStr, 10, 64)
if err != nil {
Expand All @@ -411,9 +385,8 @@ func baseToBlockId(base string) (int64, bool, error) {
return firstBlockIdOfSegment, hasSuffix, nil
}

// buildBinary 日志数据 -> 格式化二进制数据
// 存储在文件中的block结构:
// | length 8字节 | blockid 8字节 | checksum 16字节 | payload x字节 |
// buildBinary 日志数据格式化成二进制数据
// 存储在数据文件中的 block 结构:| length 8字节 | blockid 8字节 | checksum 16字节 | payload x字节 |
func buildBinary(blockId int64, data []byte) []byte {
length := int64(len(data))
// prof: 避免buf重分配
Expand All @@ -431,8 +404,8 @@ func buildBinary(blockId int64, data []byte) []byte {
return buf
}

// loadBinary 从文件装载格式化二进制数据
func loadBinary(raw []byte) ([]*position, []byte, error) {
// loadBlocks 从数据文件中装载并解析二进制数据
func loadBlocks(raw []byte) ([]*position, []byte, error) {
var start int64
fileSize := int64(len(raw))
// prof: 粗拍一个cap,避免小数据段导致的频繁重分配问题
Expand All @@ -459,7 +432,8 @@ func loadBinary(raw []byte) ([]*position, []byte, error) {
return bps, bbf, nil
}

// parseBinary 解析单个格式化二进制数据 -> 日志数据
// parseBinary 二进制数据解析成日志数据
// 返回日志内容、日志大小、blockid
func parseBinary(raw []byte) ([]byte, int64, int64, error) {
rawSize := int64(len(raw))
// note: 先校验headerSize是不是比raw的长度大,校验通过后
Expand Down
Loading

0 comments on commit e3728bc

Please sign in to comment.