Skip to content

Commit

Permalink
Merge pull request #8760 from dolthub/aaron/gc-read-dependencies
Browse files Browse the repository at this point in the history
[no-release-notes] go/store/nbs: During a GC process, take dependencies on chunks that are read through the ChunkStore.
  • Loading branch information
reltuk authored Jan 28, 2025
2 parents bde0bf9 + 5c04d5f commit 78e9a8a
Show file tree
Hide file tree
Showing 30 changed files with 933 additions and 536 deletions.
7 changes: 4 additions & 3 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func gatherAllChunks(ctx context.Context, cs chunkSource, idx tableIndex, stats
return nil, nil, err
}

bytes, err := cs.get(ctx, h, stats)
bytes, _, err := cs.get(ctx, h, nil, stats)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -907,7 +907,7 @@ func (csc *simpleChunkSourceCache) get(ctx context.Context, h hash.Hash, stats *
return chk, nil
}

bytes, err := csc.cs.get(ctx, h, stats)
bytes, _, err := csc.cs.get(ctx, h, nil, stats)
if bytes == nil || err != nil {
return nil, err
}
Expand All @@ -919,7 +919,8 @@ func (csc *simpleChunkSourceCache) get(ctx context.Context, h hash.Hash, stats *

// has returns true if the chunk is in the ChunkSource. This is not related to what is cached, just a helper.
func (csc *simpleChunkSourceCache) has(h hash.Hash) (bool, error) {
return csc.cs.has(h)
res, _, err := csc.cs.has(h, nil)
return res, err
}

// addresses get all chunk addresses of the ChunkSource as a hash.HashSet.
Expand Down
53 changes: 37 additions & 16 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,42 +64,63 @@ func openReader(file string) (io.ReaderAt, uint64, error) {
return f, uint64(stat.Size()), nil
}

func (acs archiveChunkSource) has(h hash.Hash) (bool, error) {
return acs.aRdr.has(h), nil
func (acs archiveChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) {
res := acs.aRdr.has(h)
if res && keeper != nil && keeper(h) {
return false, gcBehavior_Block, nil
}
return res, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) hasMany(addrs []hasRecord) (bool, error) {
func (acs archiveChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) {
// single threaded first pass.
foundAll := true
for i, addr := range addrs {
if acs.aRdr.has(*(addr.a)) {
h := *addr.a
if acs.aRdr.has(h) {
if keeper != nil && keeper(h) {
return false, gcBehavior_Block, nil
}
addrs[i].has = true
} else {
foundAll = false
}
}
return !foundAll, nil
return !foundAll, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) {
// ctx, stats ? NM4.
return acs.aRdr.get(h)
func (acs archiveChunkSource) get(ctx context.Context, h hash.Hash, keeper keeperF, stats *Stats) ([]byte, gcBehavior, error) {
res, err := acs.aRdr.get(h)
if err != nil {
return nil, gcBehavior_Continue, err
}
if res != nil && keeper != nil && keeper(h) {
return nil, gcBehavior_Block, nil
}
return res, gcBehavior_Continue, nil
}

func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
func (acs archiveChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
// single threaded first pass.
foundAll := true
for i, req := range reqs {
data, err := acs.aRdr.get(*req.a)
if err != nil || data == nil {
h := *req.a
data, err := acs.aRdr.get(h)
if err != nil {
return true, gcBehavior_Continue, err
}
if data == nil {
foundAll = false
} else {
if keeper != nil && keeper(h) {
return true, gcBehavior_Block, nil
}
chunk := chunks.NewChunk(data)
found(ctx, &chunk)
reqs[i].found = true
}
}
return !foundAll, nil
return !foundAll, gcBehavior_Continue, nil
}

// iterate iterates over the archive chunks. The callback is called for each chunk in the archive. This is not optimized
Expand Down Expand Up @@ -146,14 +167,14 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
return archiveChunkSource{acs.file, rdr}, nil
}

func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) (map[hash.Hash]Range, error) {
return nil, errors.New("Archive chunk source does not support getRecordRanges")
func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord, _ keeperF) (map[hash.Hash]Range, gcBehavior, error) {
return nil, gcBehavior_Continue, errors.New("Archive chunk source does not support getRecordRanges")
}

func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) {
found(ctx, ChunkToCompressedChunk(*chk))
}, stats)
}, keeper, stats)
}

func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
Expand Down
16 changes: 8 additions & 8 deletions go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,28 +655,28 @@ type testChunkSource struct {

var _ chunkSource = (*testChunkSource)(nil)

func (tcs *testChunkSource) get(_ context.Context, h hash.Hash, _ *Stats) ([]byte, error) {
func (tcs *testChunkSource) get(_ context.Context, h hash.Hash, _ keeperF, _ *Stats) ([]byte, gcBehavior, error) {
for _, chk := range tcs.chunks {
if chk.Hash() == h {
return chk.Data(), nil
return chk.Data(), gcBehavior_Continue, nil
}
}
return nil, errors.New("not found")
return nil, gcBehavior_Continue, errors.New("not found")
}

func (tcs *testChunkSource) has(h hash.Hash) (bool, error) {
func (tcs *testChunkSource) has(h hash.Hash, keeper keeperF) (bool, gcBehavior, error) {
panic("never used")
}

func (tcs *testChunkSource) hasMany(addrs []hasRecord) (bool, error) {
func (tcs *testChunkSource) hasMany(addrs []hasRecord, keeper keeperF) (bool, gcBehavior, error) {
panic("never used")
}

func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
func (tcs *testChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
panic("never used")
}

func (tcs *testChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
func (tcs *testChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), keeper keeperF, stats *Stats) (bool, gcBehavior, error) {
panic("never used")
}

Expand All @@ -700,7 +700,7 @@ func (tcs *testChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64,
panic("never used")
}

func (tcs *testChunkSource) getRecordRanges(ctx context.Context, requests []getRecord) (map[hash.Hash]Range, error) {
func (tcs *testChunkSource) getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
panic("never used")
}

Expand Down
20 changes: 13 additions & 7 deletions go/store/nbs/aws_table_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,31 @@ func (s3p awsTablePersister) key(k string) string {
return k
}

func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
name, data, chunkCount, err := mt.write(haver, stats)

func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, gcBehavior_Continue, err
}
if gcb != gcBehavior_Continue {
return emptyChunkSource{}, gcb, nil
}

if chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, gcBehavior_Continue, nil
}

err = s3p.multipartUpload(ctx, bytes.NewReader(data), uint64(len(data)), name.String())

if err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, gcBehavior_Continue, err
}

tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name}
return newReaderFromIndexData(ctx, s3p.q, data, name, tra, s3BlockSize)
src, err := newReaderFromIndexData(ctx, s3p.q, data, name, tra, s3BlockSize)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
}
return src, gcBehavior_Continue, nil
}

func (s3p awsTablePersister) multipartUpload(ctx context.Context, r io.Reader, sz uint64, key string) error {
Expand Down
22 changes: 11 additions & 11 deletions go/store/nbs/aws_table_persister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := makeFakeS3(t)
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}

src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
src, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
defer src.close()

Expand All @@ -108,7 +108,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := makeFakeS3(t)
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits64mb, ns: ns, q: &UnlimitedQuotaProvider{}}

src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
src, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
defer src.close()
if assert.True(mustUint32(src.count()) > 0) {
Expand All @@ -133,7 +133,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := makeFakeS3(t)
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}

src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{})
src, _, err := s3p.Persist(context.Background(), mt, existingTable, nil, &Stats{})
require.NoError(t, err)
defer src.close()
assert.True(mustUint32(src.count()) == 0)
Expand All @@ -148,7 +148,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}

_, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
_, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
assert.Error(err)
})
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
for i := 0; i < len(chunks); i++ {
mt := newMemTable(uint64(2 * targetPartSize))
mt.addChunk(computeAddr(chunks[i]), chunks[i])
cs, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
cs, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
sources = append(sources, cs)
}
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
}

var err error
sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{})
sources[i], _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
}
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
Expand Down Expand Up @@ -417,9 +417,9 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
rand.Read(medChunks[i])
mt.addChunk(computeAddr(medChunks[i]), medChunks[i])
}
cs1, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
cs1, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
cs2, err := s3p.Persist(context.Background(), mtb, nil, &Stats{})
cs2, _, err := s3p.Persist(context.Background(), mtb, nil, nil, &Stats{})
require.NoError(t, err)
sources := chunkSources{cs1, cs2}

Expand Down Expand Up @@ -450,7 +450,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
mt := newMemTable(uint64(2 * targetPartSize))
mt.addChunk(computeAddr(smallChunks[i]), smallChunks[i])
var err error
sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{})
sources[i], _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
}

Expand All @@ -461,7 +461,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
}

var err error
cs, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
cs, _, err := s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
sources = append(sources, cs)

Expand All @@ -474,7 +474,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
mt.addChunk(computeAddr(medChunks[i]), medChunks[i])
}

cs, err = s3p.Persist(context.Background(), mt, nil, &Stats{})
cs, _, err = s3p.Persist(context.Background(), mt, nil, nil, &Stats{})
require.NoError(t, err)
sources = append(sources, cs)

Expand Down
36 changes: 22 additions & 14 deletions go/store/nbs/bs_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ var _ tableFilePersister = &blobstorePersister{}

// Persist makes the contents of mt durable. Chunks already present in
// |haver| may be dropped in the process.
func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
address, data, chunkCount, err := mt.write(haver, stats)
func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, err
} else if chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, gcBehavior_Continue, err
}
if gcb != gcBehavior_Continue {
return emptyChunkSource{}, gcb, nil
}
if chunkCount == 0 {
return emptyChunkSource{}, gcBehavior_Continue, nil
}
name := address.String()

Expand All @@ -59,24 +63,28 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver

// first write table records and tail (index+footer) as separate blobs
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() (err error) {
_, err = bsp.bs.Put(ectx, name+tableRecordsExt, int64(len(records)), bytes.NewBuffer(records))
return
eg.Go(func() error {
_, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(len(records)), bytes.NewBuffer(records))
return err
})
eg.Go(func() (err error) {
_, err = bsp.bs.Put(ectx, name+tableTailExt, int64(len(tail)), bytes.NewBuffer(tail))
return
eg.Go(func() error {
_, err := bsp.bs.Put(ectx, name+tableTailExt, int64(len(tail)), bytes.NewBuffer(tail))
return err
})
if err = eg.Wait(); err != nil {
return nil, err
return nil, gcBehavior_Continue, err
}

// then concatenate into a final blob
if _, err = bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt}); err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, gcBehavior_Continue, err
}
rdr := &bsTableReaderAt{name, bsp.bs}
return newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize)
src, err := newReaderFromIndexData(ctx, bsp.q, data, address, rdr, bsp.blockSize)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
}
return src, gcBehavior_Continue, nil
}

// ConjoinAll implements tablePersister.
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/cmp_chunk_table_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
found := make([]CompressedChunk, 0)

eg, egCtx := errgroup.WithContext(ctx)
_, err = tr.getManyCompressed(egCtx, eg, reqs, func(ctx context.Context, c CompressedChunk) { found = append(found, c) }, &Stats{})
_, _, err = tr.getManyCompressed(egCtx, eg, reqs, func(ctx context.Context, c CompressedChunk) { found = append(found, c) }, nil, &Stats{})
require.NoError(t, err)
require.NoError(t, eg.Wait())

Expand Down Expand Up @@ -146,7 +146,7 @@ func readAllChunks(ctx context.Context, hashes hash.HashSet, reader tableReader)
reqs := toGetRecords(hashes)
found := make([]*chunks.Chunk, 0)
eg, ctx := errgroup.WithContext(ctx)
_, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, &Stats{})
_, _, err := reader.getMany(ctx, eg, reqs, func(ctx context.Context, c *chunks.Chunk) { found = append(found, c) }, nil, &Stats{})
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 78e9a8a

Please sign in to comment.