Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add reading comments #426

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions leveldb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ func batchesLen(batches []*Batch) int {
}

func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
// 实现是 singleWriter.Write
if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
return err
}
Expand Down
16 changes: 11 additions & 5 deletions leveldb/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (x mNodes) Swap(i, j int) { x[i], x[j] = x[j], x[i] }

func (x mNodes) sort() { sort.Sort(x) }

// 找第一个大于等于 <ns, key> 的index
func (x mNodes) search(ns, key uint64) int {
return sort.Search(len(x), func(i int) bool {
a := x[i].ns
Expand Down Expand Up @@ -155,7 +156,8 @@ func (b *mBucket) get(r *Cache, h *mHead, hash uint32, ns, key uint64, getOnly b
if i == len(b.nodes) {
b.nodes = append(b.nodes, n)
} else {
b.nodes = append(b.nodes[:i+1], b.nodes[i:]...)
// 把新的 node 插入到 i 的前面
b.nodes = append(b.nodes[:i+1], b.nodes[i:]...) // 添加到有序数组的对应位置,插入排序,所以 b.nodes 不可以过长
b.nodes[i] = n
}
bLen := len(b.nodes)
Expand Down Expand Up @@ -188,6 +190,7 @@ func (b *mBucket) get(r *Cache, h *mHead, hash uint32, ns, key uint64, getOnly b
return true, true, n
}

// delete 的返回值约定值得参考
func (b *mBucket) delete(r *Cache, h *mHead, hash uint32, ns, key uint64) (done, deleted bool) {
b.mu.Lock()

Expand Down Expand Up @@ -260,9 +263,12 @@ func (b *mBucket) delete(r *Cache, h *mHead, hash uint32, ns, key uint64) (done,
}

type mHead struct {
buckets []mBucket
mask uint32
predecessor unsafe.Pointer // *mNode
buckets []mBucket
mask uint32 // 用当前的 mask 和 predecessor 的 mask 作对比,可以知道是在 grow 还是在 shrink

// 下面注释应该错了, 应该是 *mHead 吧
predecessor unsafe.Pointer // *mNode

resizeInProgress int32

overflow int32
Expand Down Expand Up @@ -301,7 +307,7 @@ func (h *mHead) initBucket(i uint32) *mBucket {
nodes = make(mNodes, 0, len(m0)+len(m1))
nodes = append(nodes, m0...)
nodes = append(nodes, m1...)
nodes.sort()
nodes.sort() // 可以用 merge sort?不过 m0, m1 的 size 比较小,也无所谓
}
b.nodes = nodes
b.state = bucketInitialized
Expand Down
5 changes: 5 additions & 0 deletions leveldb/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/syndtr/goleveldb/leveldb/comparer"
)

// iComparer 处理关于 sequence 相关的逻辑
// 感觉可以称之为 internal Comparer?
type iComparer struct {
ucmp comparer.Comparer
}
Expand All @@ -34,9 +36,12 @@ func (icmp *iComparer) Name() string {
return icmp.uName()
}

//
func (icmp *iComparer) Compare(a, b []byte) int {
x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey())
if x == 0 {
// 按照 seq 排序,seq 大的排在前面
// uCompare 指的是 user 指定的 compare function
if m, n := internalKey(a).num(), internalKey(b).num(); m > n {
return -1
} else if m < n {
Expand Down
4 changes: 3 additions & 1 deletion leveldb/comparer/bytes_comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

package comparer

import "bytes"
import (
"bytes"
)

type bytesComparer struct{}

Expand Down
10 changes: 9 additions & 1 deletion leveldb/comparer/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,21 @@ type Comparer interface {
//
// Either contents of a or b should not by any means modified. Doing so
// may cause corruption on the internal state.
//
// 以 LevelDB 提供的 bytes comparer,a = {0xff, 0xff, 0x11}, b = {0xff, 0xff, 0x1A, ...} 来讲,
// 前提是 b > a,否则返回 nil
// Separator(dst, a, b) 的结果是:dst = dst + {0xff, 0xff, 0x12}
// “分隔”体现在 dst 的最后一个 0x12 把 a 和 b 给分隔开了
Separator(dst, a, b []byte) []byte

// Successor appends a sequence of bytes x to dst such that x >= b, where
// 'less than' is consistent with Compare. An implementation should return
// nil if x equal to b.
//
// Contents of b should not by any means modified. Doing so may cause
// corruption on the internal state.
//
// 以 LevelDB 提供的 bytes comparer,b = {0xff, 0xff, 0x1A, ...} 来讲, Successor(dst, b) 的结果是:
// dst = dst + {0xff, 0xff, 0x1B},注意结尾是比 0x1A 大了 1 的 0x1B
// 如果 b 里面都是 0xff,会返回 nil
Successor(dst, b []byte) []byte
}
32 changes: 18 additions & 14 deletions leveldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,33 @@ type DB struct {
memMu sync.RWMutex
memPool chan *memdb.DB
mem, frozenMem *memDB
journal *journal.Writer
journal *journal.Writer // 写 WAL
journalWriter storage.Writer
journalFd storage.FileDesc
frozenJournalFd storage.FileDesc
frozenSeq uint64
frozenSeq uint64 // 哪一个 seq 的写操作导致了 memDB 的切换,或者说导致了当前的 memDB 被 freeze 成 immutable memDB

// Snapshot.
snapsMu sync.Mutex
snapsList *list.List

// Write.
batchPool sync.Pool
writeMergeC chan writeMerge
writeMergedC chan bool
writeLockC chan struct{}
writeAckC chan error
writeDelay time.Duration
writeDelayN int
writeMergeC chan writeMerge // 有 merge write 需求时 chan<-,执行 merge write 的一方会不断地 <-chan 合并可以合并过来的写请求
writeMergedC chan bool // chan<- false 用来通知某个 write 它没有被 merge
writeLockC chan struct{} // write 的写锁,成功 chan<- 说明拿到了锁,用完了之后 <-chan 就是释放锁
writeAckC chan error // chan<- 通知等待 ACK 的一方;<-chan 等待的一方读取 Write 的结果
writeDelay time.Duration // 记录 Write 被 compaction 所 delay 的时长
writeDelayN int // 记录 Write 被 compaction 所 delay 的次数
tr *Transaction

// Compaction.
compCommitLk sync.Mutex
tcompCmdC chan cCmd
tcompPauseC chan chan<- struct{}
mcompCmdC chan cCmd
compErrC chan error
compPerErrC chan error
tcompPauseC chan chan<- struct{} // 从 tcompPauseC 读取 <-chan 读到了一个 chan<- struct{} 到了之后要 pause compaction,直到可以写入 chan<- struct{} 才恢复
mcompCmdC chan cCmd // 全称 memdb compaction command channel?
compErrC chan error // compaction error
compPerErrC chan error // 全称 compaction persistent error
compErrSetC chan error
compWriteLocking bool
compStats cStats
Expand All @@ -104,9 +104,9 @@ func openDB(s *session) (*DB, error) {
snapsList: list.New(),
// Write
batchPool: sync.Pool{New: newBatch},
writeMergeC: make(chan writeMerge),
writeMergeC: make(chan writeMerge), // 用来合并 Write 操作
writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1),
writeLockC: make(chan struct{}, 1), // 注意:channel 的 buffer 为 1,因为首个去获取 Write Lock 的 goroutine 必须得能通过 chan<- 拿到 Lock
writeAckC: make(chan error),
// Compaction
tcompCmdC: make(chan cCmd),
Expand Down Expand Up @@ -778,6 +778,8 @@ func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byt
return
}

// 正常的查询 auxm 和 auxt 都为 nil
// 猜:auxm and auxt is only for testing?
func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
ikey := makeInternalKey(nil, key, seq, keyTypeSeek)

Expand All @@ -794,6 +796,8 @@ func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.R
}
defer m.decref()

// 依次从 memtable 和 immutable memtable 获取
// 如果找到的话,可以返回结果,因为找到的这个必然是 seq number 小于等于 seq 且是最大的那个,小的 seq number 会被 compaction
if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
return append([]byte(nil), mv...), me
}
Expand Down
Loading