Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/store/nbs: Fixing GCGen to be more correct. #8612

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ type GCFinalizer interface {
SwapChunksInStore(ctx context.Context) error
}

type GCMode int

const (
GCMode_Default GCMode = iota
GCMode_Full
)

// ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection.
type ChunkStoreGarbageCollector interface {
ChunkStore
Expand Down Expand Up @@ -213,7 +220,7 @@ type ChunkStoreGarbageCollector interface {
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error)
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
2 changes: 1 addition & 1 deletion go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
if dest != ms {
panic("unsupported")
}
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore, mode GCMode) (GCFinalizer, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector)
return collector.MarkAndSweepChunks(ctx, hashes, collector, mode)
}

func (s *TestStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
if err != nil {
return err
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
newSpecs = append(newSpecs, spec)
}
}
err = gs.oldGen.swapTables(ctx, newSpecs)
err = gs.oldGen.swapTables(ctx, newSpecs, chunks.GCMode_Default)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/conjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
newContents := manifestContents{
nbfVers: upstream.nbfVers,
root: upstream.root,
lock: generateLockHash(upstream.root, specs, appendixSpecs),
lock: generateLockHash(upstream.root, specs, appendixSpecs, nil),
gcGen: upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest)
func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest, mode)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func trueUpBackingManifest(ctx context.Context, root hash.Hash, backing *journal
}

prev := mc.lock
next := generateLockHash(mc.root, mc.specs, mc.appendix)
next := generateLockHash(mc.root, mc.specs, mc.appendix, nil)
mc.lock = next

mc, err = backing.Update(ctx, prev, mc, &Stats{}, nil)
Expand Down
7 changes: 6 additions & 1 deletion go/store/nbs/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func formatSpecs(specs []tableSpec, tableInfo []string) {
// persisted manifest against the lock hash it saw last time it loaded the
// contents of a manifest. If they do not match, the client must not update
// the persisted manifest.
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) hash.Hash {
func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec, extra []byte) hash.Hash {
reltuk marked this conversation as resolved.
Show resolved Hide resolved
blockHash := sha512.New()
blockHash.Write(root[:])
for _, spec := range appendix {
Expand All @@ -511,6 +511,11 @@ func generateLockHash(root hash.Hash, specs []tableSpec, appendix []tableSpec) h
for _, spec := range specs {
blockHash.Write(spec.name[:])
}
if len(extra) > 0 {
blockHash.Write([]byte{0})
blockHash.Write(extra)
}
blockHash.Write([]byte{0})
var h []byte
h = blockHash.Sum(h) // Appends hash to h
return hash.New(h[:hash.ByteLen])
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest)
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest, mode)
}

func (nbsMW *NBSMetricWrapper) Count() (uint32, error) {
Expand Down
47 changes: 32 additions & 15 deletions go/store/nbs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"cloud.google.com/go/storage"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
Expand Down Expand Up @@ -266,7 +267,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
return contents, nil
}

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

// ensure we don't drop existing appendices
if contents.appendix != nil && len(contents.appendix) > 0 {
Expand Down Expand Up @@ -423,7 +424,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
newAppendixSpecs := append([]tableSpec{}, upstreamAppendixSpecs...)
contents.appendix = append(newAppendixSpecs, appendixSpecs...)

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)
return contents, nil
case ManifestAppendixOption_Set:
if len(appendixSpecs) < 1 {
Expand All @@ -438,7 +439,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
// append new appendix specs to contents.appendix
contents.appendix = append([]tableSpec{}, appendixSpecs...)

contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)
return contents, nil
default:
return manifestContents{}, ErrUnsupportedManifestAppendixOption
Expand Down Expand Up @@ -480,7 +481,7 @@ func OverwriteStoreManifest(ctx context.Context, store *NomsBlockStore, root has
s := tableSpec{name: h, chunkCount: c}
contents.specs = append(contents.specs, s)
}
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix)
contents.lock = generateLockHash(contents.root, contents.specs, contents.appendix, nil)

store.mm.LockForUpdate()
defer func() {
Expand Down Expand Up @@ -1307,7 +1308,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
newContents := manifestContents{
nbfVers: nbs.upstream.nbfVers,
root: current,
lock: generateLockHash(current, specs, appendixSpecs),
lock: generateLockHash(current, specs, appendixSpecs, nil),
gcGen: nbs.upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
Expand Down Expand Up @@ -1597,11 +1598,11 @@ func (nbs *NomsBlockStore) EndGC() {
nbs.cond.Broadcast()
}

func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest)
func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, nbs, nbs, dest, mode)
}

func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore, mode chunks.GCMode) (chunks.GCFinalizer, error) {
ops := nbs.SupportedOperations()
if !ops.CanGC || !ops.CanPrune {
return nil, chunks.ErrUnsupportedOperation
Expand All @@ -1611,12 +1612,20 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
nbs.mu.RLock()
defer nbs.mu.RUnlock()

// check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix)
// Check to see if the specs have changed since last gc. If they haven't bail early.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix, []byte("full"))
if nbs.upstream.gcGen == gcGenCheck {
fmt.Fprintf(color.Error, "check against full gcGen passed; nothing to collect")
return chunks.ErrNothingToCollect
}

if mode != chunks.GCMode_Full {
// Allow a non-full GC to match the no-op work check as well.
gcGenCheck := generateLockHash(nbs.upstream.root, nbs.upstream.specs, nbs.upstream.appendix, nil)
if nbs.upstream.gcGen == gcGenCheck {
fmt.Fprintf(color.Error, "check against nil gcGen passed; nothing to collect")
return chunks.ErrNothingToCollect
}
}
return nil
}
err := precheck()
Expand Down Expand Up @@ -1649,12 +1658,14 @@ func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *Nom
return gcFinalizer{
nbs: destNBS,
specs: specs,
mode: mode,
}, nil
}

type gcFinalizer struct {
nbs *NomsBlockStore
specs []tableSpec
mode chunks.GCMode
}

func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc, error) {
Expand All @@ -1670,7 +1681,7 @@ func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc
}

func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {
return gcf.nbs.swapTables(ctx, gcf.specs)
return gcf.nbs.swapTables(ctx, gcf.specs, gcf.mode)
}

func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) {
Expand Down Expand Up @@ -1744,7 +1755,7 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c
return nil
}

func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (err error) {
func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec, mode chunks.GCMode) (err error) {
nbs.mu.Lock()
defer nbs.mu.Unlock()

Expand All @@ -1756,12 +1767,18 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
}
}()

newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{})
newLock := generateLockHash(nbs.upstream.root, specs, []tableSpec{}, nil)
var extra []byte
if mode == chunks.GCMode_Full {
extra = []byte("full")
}
newGCGen := generateLockHash(nbs.upstream.root, specs, []tableSpec{}, extra)

newContents := manifestContents{
nbfVers: nbs.upstream.nbfVers,
root: nbs.upstream.root,
lock: newLock,
gcGen: newLock,
gcGen: newGCGen,
specs: specs,
}

Expand Down
2 changes: 1 addition & 1 deletion go/store/nbs/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func TestNBSCopyGC(t *testing.T) {
go func() {
require.NoError(t, st.BeginGC(nil))
var finalizer chunks.GCFinalizer
finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil)
finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil, chunks.GCMode_Full)
if msErr == nil {
msErr = finalizer.SwapChunksInStore(ctx)
}
Expand Down
13 changes: 9 additions & 4 deletions go/store/types/value_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
gcs, gcsOK := lvs.cs.(chunks.GenerationalCS)
collector, collectorOK := lvs.cs.(chunks.ChunkStoreGarbageCollector)

var chksMode chunks.GCMode

if gcsOK && collectorOK {
oldGen := gcs.OldGen()
newGen := gcs.NewGen()
Expand All @@ -586,8 +588,10 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
switch mode {
case GCModeDefault:
oldGenHasMany = oldGen.HasMany
chksMode = chunks.GCMode_Default
case GCModeFull:
oldGenHasMany = unfilteredHashFunc
chksMode = chunks.GCMode_Full
default:
return fmt.Errorf("unsupported GCMode %v", mode)
}
Expand Down Expand Up @@ -617,7 +621,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer
oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, collector, oldGen, nil, func() hash.HashSet {
oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, chksMode, collector, oldGen, nil, func() hash.HashSet {
n := lvs.transitionToNewGenGC()
newGenRefs.InsertAll(n)
return make(hash.HashSet)
Expand All @@ -638,7 +642,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
oldGenHasMany = newFileHasMany
}

newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
Expand Down Expand Up @@ -685,7 +689,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)

var finalizer chunks.GCFinalizer
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC)
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
Expand Down Expand Up @@ -719,6 +723,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
func (lvs *ValueStore) gc(ctx context.Context,
toVisit hash.HashSet,
hashFilter chunks.HasManyFunc,
chksMode chunks.GCMode,
src, dest chunks.ChunkStoreGarbageCollector,
safepointF func() error,
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
Expand All @@ -729,7 +734,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
var err error
gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest)
gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest, chksMode)
return err
})

Expand Down
Loading
Loading