Skip to content

Commit

Permalink
go/store/nbs: Fixing GCGen to be more correct.
Browse files Browse the repository at this point in the history
The original purpose of gc gen was two fold. The first purpose was to avoid applying the garbage collection results if the store had changed due to multi-process concurrency for any reason. The second purpose was to fast-complete a `dolt gc` invocation if the store had not changed at all since the last GC run.

For the first purpose, it is no longer necessary. We no longer allow multi-process access to the same NomsBlockStore.

For the second purpose, it was implemented slightly incorrectly, given the introduction of `dolt gc --full`. This change fixes the implementation to be more correct.

In particular, the semantics are:

* After a `dolt gc --full`, an immediate invocation of `dolt gc` or `dolt gc --full` fast-completes as no collection being necessary.

* After a `dolt gc`, only a `dolt gc` fast-completes as no collection being necessary. A `dolt gc --full` will run a full GC to completion.
  • Loading branch information
reltuk committed Nov 27, 2024
1 parent d5534d1 commit adfa851
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 42 deletions.
8 changes: 7 additions & 1 deletion go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ type GCFinalizer interface {
SwapChunksInStore(ctx context.Context) error
}

type GCMode int
const (
GCMode_Default GCMode = iota
GCMode_Full
)

// ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection.
type ChunkStoreGarbageCollector interface {
ChunkStore
Expand Down Expand Up @@ -213,7 +219,7 @@ type ChunkStoreGarbageCollector interface {
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error)
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
2 changes: 1 addition & 1 deletion go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
if dest != ms {
panic("unsupported")
}
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector)
return collector.MarkAndSweepChunks(ctx, hashes, collector, mode)
}

func (s *TestStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
if err != nil {
return err
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/conjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
newContents := manifestContents{
nbfVers: upstream.nbfVers,
root: upstream.root,
lock: generateLockHash(upstream.root, specs, appendixSpecs),
lock: generateLockHash(upstream.root, specs, appendixSpecs, nil),
gcGen: upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest)
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func trueUpBackingManifest(ctx context.Context, root hash.Hash, backing *journal
}

prev := mc.lock
next := generateLockHash(mc.root, mc.specs, mc.appendix)
next := generateLockHash(mc.root, mc.specs, mc.appendix, nil)
mc.lock = next

mc, err = backing.Update(ctx, prev, mc, &Stats{}, nil)
Expand Down
7 changes: 6 additions & 1 deletion go/store/nbs/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func formatSpecs(specs []tableSpec, tableInfo []string) {
// persisted manifest against the lock hash it saw last time it loaded the
// contents of a manifest. If they do not match, the client must not update
// the persisted manifest.
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) hash.Hash {
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec, extra []byte) hash.Hash {
blockHash := sha512.New()
blockHash.Write(root[:])
for _, spec := range appendix {
Expand All @@ -511,6 +511,11 @@ func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) h
for _, spec := range specs {
blockHash.Write(spec.name[:])
}
if len(extra) > 0 {
blockHash.Write([]byte{0})
blockHash.Write(extra)
}
blockHash.Write([]byte{0})
var h []byte
h = blockHash.Sum(h) // Appends hash to h
return hash.New(h[:hash.ByteLen])
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest)
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode)
}

func (nbsMW *NBSMetricWrapper) Count() (uint32, error) {
Expand Down
47 changes: 32 additions & 15 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
Expand Down Expand Up @@ -266,7 +267,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
return contents, nil
}

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

// ensure we don't drop existing appendices
if contents.appendix != nil && len(contents.appendix) > 0 {
Expand Down Expand Up @@ -423,7 +424,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
newAppendixSpecs := append([]tableSpec{}, upstreamAppendixSpecs...)
contents.appendix = append(newAppendixSpecs, appendixSpecs...)

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)
return contents, nil
case ManifestAppendixOption_Set:
if len(appendixSpecs) < 1 {
Expand All @@ -438,7 +439,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
// append new appendix specs to contents.appendix
contents.appendix = append([]tableSpec{}, appendixSpecs...)

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)
return contents, nil
default:
return manifestContents{}, ErrUnsupportedManifestAppendixOption
Expand Down Expand Up @@ -480,7 +481,7 @@ func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root has
s := tableSpec{name: h, chunkCount: c}
contents.specs = append(contents.specs, s)
}
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

store.mm.LockForUpdate()
defer func() {
Expand Down Expand Up @@ -1307,7 +1308,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
newContents := manifestContents{
nbfVers: nbs.upstream.nbfVers,
root: current,
lock: generateLockHash(current, specs, appendixSpecs),
lock: generateLockHash(current, specs, appendixSpecs, nil),
gcGen: nbs.upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
Expand Down Expand Up @@ -1597,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() {
nbs.cond.Broadcast()
}

func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest)
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode)
}

func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return nil, chunks.ErrUnsupportedOperation
Expand All @@ -1611,12 +1612,20 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
nbs.mu.RLock()
defer nbs.mu.RUnlock()

// check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix)
// Check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix, []byte("full"))
if nbs.upstream.gcGen == gcGenCheck {
fmt.Fprintf(color.Error, "check against full gcGen passed; nothing to collect")
return chunks.ErrNothingToCollect
}

if mode != chunks.GCMode_Full {
// Allow a non-full GC to match the no-op work check as well.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix, nil)
if nbs.upstream.gcGen == gcGenCheck {
fmt.Fprintf(color.Error, "check against nil gcGen passed; nothing to collect")
return chunks.ErrNothingToCollect
}
}
return nil
}
err := precheck()
Expand Down Expand Up @@ -1649,12 +1658,14 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
return gcFinalizer{
nbs: destNBS,
specs: specs,
mode: mode,
}, nil
}

type gcFinalizer struct {
nbs *NomsBlockStore
specs []tableSpec
mode chunks.GCMode
}

func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc, error) {
Expand All @@ -1670,7 +1681,7 @@ func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc
}

func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {
return gcf.nbs.swapTables(ctx, gcf.specs)
return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode)
}

func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) {
Expand Down Expand Up @@ -1744,7 +1755,7 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c
return nil
}

func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (err error) {
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mode chunks.GCMode) (err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()

Expand All @@ -1756,12 +1767,18 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
}
}()

newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{})
newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{}, nil)
var extra []byte
if mode == chunks.GCMode_Full {
extra = []byte("full")
}
newGCGen := generateLockHash(nbs.upstream.root, specs, []tableSpec{}, extra)

newContents := manifestContents{
nbfVers: nbs.upstream.nbfVers,
root: nbs.upstream.root,
lock: newLock,
gcGen: newLock,
gcGen: newGCGen,
specs: specs,
}

Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func TestNBSCopyGC(t *testing.T) {
go func() {
require.NoError(t, st.BeginGC(nil))
var finalizer chunks.GCFinalizer
finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil)
finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil, chunks.GCMode_Full)
if msErr == nil {
msErr = finalizer.SwapChunksInStore(ctx)
}
Expand Down
13 changes: 9 additions & 4 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
gcs, gcsOK := lvs.cs.(chunks.GenerationalCS)
collector, collectorOK := lvs.cs.(chunks.ChunkStoreGarbageCollector)

var chksMode chunks.GCMode

if gcsOK && collectorOK {
oldGen := gcs.OldGen()
newGen := gcs.NewGen()
Expand All @@ -586,8 +588,10 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
switch mode {
case GCModeDefault:
oldGenHasMany = oldGen.HasMany
chksMode = chunks.GCMode_Default
case GCModeFull:
oldGenHasMany = unfilteredHashFunc
chksMode = chunks.GCMode_Full
default:
return fmt.Errorf("unsupported GCMode %v", mode)
}
Expand Down Expand Up @@ -617,7 +621,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer
oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, collector, oldGen, nil, func() hash.HashSet {
oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, chksMode, collector, oldGen, nil, func() hash.HashSet {
n := lvs.transitionToNewGenGC()
newGenRefs.InsertAll(n)
return make(hash.HashSet)
Expand All @@ -638,7 +642,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
oldGenHasMany = newFileHasMany
}

newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
Expand Down Expand Up @@ -685,7 +689,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var finalizer chunks.GCFinalizer
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC)
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
Expand Down Expand Up @@ -719,6 +723,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
func (lvs *ValueStore) gc(ctx context.Context,
toVisit hash.HashSet,
hashFilter chunks.HasManyFunc,
chksMode chunks.GCMode,
src, dest chunks.ChunkStoreGarbageCollector,
safepointF func() error,
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
Expand All @@ -729,7 +734,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
var err error
gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest)
gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest, chksMode)
return err
})

Expand Down
Loading

0 comments on commit adfa851

Please sign in to comment.