Skip to content

Commit

Permalink
WIP - shelve while customer issuing
Browse files Browse the repository at this point in the history
  • Loading branch information
macneale4 committed Jan 22, 2025
1 parent 1c379cb commit 7c30ec5
Show file tree
Hide file tree
Showing 25 changed files with 169 additions and 86 deletions.
7 changes: 6 additions & 1 deletion go/libraries/doltcore/remotesrv/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
var ranges []*remotesapi.RangeChunk
for h, r := range hashToRange {
hCpy := h
ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length})
ranges = append(ranges, &remotesapi.RangeChunk{
Hash: hCpy[:],
Offset: r.Offset,
Length: r.Length,
DictionaryOffset: r.DictOffset,
DictionaryLength: r.DictLength})
}

url := rs.getDownloadUrl(md, prefix+"/"+loc)
Expand Down
11 changes: 10 additions & 1 deletion go/libraries/doltcore/remotestorage/chunk_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ type ChunkFetcher struct {
eg *errgroup.Group
egCtx context.Context

// toGetCh is the channel used to request chunks. This will be initially given a root,
// and as refs are found, they will be added to the channel for workers to batch and request. NM4.
toGetCh chan hash.HashSet
resCh chan nbs.ToChunker

// resCh is the results channel for the fetcher. It is used both to return
// chunks themselves, and to indicate which chunks were requested but missing
// buy having a Hash, but are empty. NM4.
resCh chan nbs.ToChunker

abortCh chan struct{}
stats StatsRecorder
Expand Down Expand Up @@ -255,6 +261,7 @@ func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.Ge
})
eg.Go(func() error {
for {
// NM4 - Where there responses come back - resp is an rpc struct. NM4.
resp, err := stream.Recv()
if err == io.EOF {
close(resCh)
Expand Down Expand Up @@ -300,6 +307,7 @@ func getMissingChunks(req *remotesapi.GetDownloadLocsRequest, resp *remotesapi.G
numRequested := len(req.ChunkHashes)
numResponded := 0
for _, loc := range resp.Locs {
// NM4 - Looky here.
hgr := loc.Location.(*remotesapi.DownloadLoc_HttpGetRange).HttpGetRange
numResponded += len(hgr.Ranges)
}
Expand Down Expand Up @@ -362,6 +370,7 @@ func (d downloads) Add(resp *remotesapi.DownloadLoc) {
d.refreshes[path] = refresh
}
for _, r := range gr.Ranges {
// NM4 - this is where the offset is read!! do something here or nearby.
d.ranges.Insert(gr.Url, r.Hash, r.Offset, r.Length)
}
}
Expand Down
4 changes: 2 additions & 2 deletions go/store/cmd/noms/noms_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func runManifest(ctx context.Context, args []string) int {
nbsFiles := make([]NbsFile, numSpecs)
for i := 0; i < numSpecs; i++ {
tableSpecInfo := manifest.GetTableSpecInfo(i)
path := filepath.Join(spec.DatabaseName, tableSpecInfo.GetName())
path := filepath.Join(spec.DatabaseName, tableSpecInfo.GetFileName())
fileInfo, err := os.Stat(path)
nbsFiles[i] = NbsFile{tableSpecInfo, fileInfo, err}
}
Expand All @@ -130,7 +130,7 @@ func runManifest(ctx context.Context, args []string) int {
fmt.Println(" referenced nbs files:")

for _, nbsFile := range nbsFiles {
name := nbsFile.manifestSpec.GetName()
name := nbsFile.manifestSpec.GetFileName()
chunkCnt := nbsFile.manifestSpec.GetChunkCount()
sizeStr := nbsFile.sizeStr()
existsStr := nbsFile.fileInfoErr == nil
Expand Down
8 changes: 4 additions & 4 deletions go/store/nbs/archive_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func UnArchive(ctx context.Context, cs chunks.ChunkStore, smd StorageMetadata, p
specs, err := gs.oldGen.tables.toSpecs()
newSpecs := make([]tableSpec, 0, len(specs))
for _, spec := range specs {
if newSpec, exists := swapMap[spec.name]; exists {
newSpecs = append(newSpecs, tableSpec{newSpec, spec.chunkCount})
if newSpec, exists := swapMap[spec.hash]; exists {
newSpecs = append(newSpecs, tableSpec{typeNoms, newSpec, spec.chunkCount})
} else {
newSpecs = append(newSpecs, spec)
}
Expand Down Expand Up @@ -169,8 +169,8 @@ func BuildArchive(ctx context.Context, cs chunks.ChunkStore, dagGroups *ChunkRel
specs, err := gs.oldGen.tables.toSpecs()
newSpecs := make([]tableSpec, 0, len(specs))
for _, spec := range specs {
if newSpec, exists := swapMap[spec.name]; exists {
newSpecs = append(newSpecs, tableSpec{newSpec, spec.chunkCount})
if newSpec, exists := swapMap[spec.hash]; exists {
newSpecs = append(newSpecs, tableSpec{typeArchive, newSpec, spec.chunkCount})
} else {
newSpecs = append(newSpecs, spec)
}
Expand Down
33 changes: 31 additions & 2 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (acs archiveChunkSource) hash() hash.Hash {
return acs.aRdr.footer.hash
}

func (acs archiveChunkSource) name() string {
return acs.hash().String() + ".darc" // NM4 - second time this const is defined. Fix!
}

func (acs archiveChunkSource) currentSize() uint64 {
return acs.aRdr.footer.fileSize
}
Expand All @@ -146,12 +150,37 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
return archiveChunkSource{acs.file, rdr}, nil
}

func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) (map[hash.Hash]Range, error) {
return nil, errors.New("Archive chunk source does not support getRecordRanges")
func (acs archiveChunkSource) getRecordRanges(_ context.Context, requests []getRecord) (map[hash.Hash]Range, error) {
result := make(map[hash.Hash]Range, len(requests))
for _, req := range requests {
hAddr := *req.a
if acs.aRdr.has(hAddr) {
idx := acs.aRdr.search(hAddr)
if idx < 0 {
// Chunk not found.
continue
}

dictId, dataId := acs.aRdr.getChunkRef(idx)
dataSpan := acs.aRdr.getByteSpanByID(dataId)
dictSpan := acs.aRdr.getByteSpanByID(dictId)

rng := Range{
Offset: dataSpan.offset,
Length: uint32(dataSpan.length),
DictOffset: dictSpan.offset,
DictLength: uint32(dictSpan.length),
}

result[hAddr] = rng
}
}
return result, nil
}

func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, ToChunker), stats *Stats) (bool, error) {
return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) {
// NM4 - UPDATE. this is def wrong. Not sure why I did this!
found(ctx, ChunkToCompressedChunk(*chk))
}, stats)
}
Expand Down
4 changes: 4 additions & 0 deletions go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,10 @@ func (tcs *testChunkSource) hash() hash.Hash {
panic("never used")
}

func (tcs *testChunkSource) name() string {
panic("never used")
}

func (tcs *testChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, error) {
panic("never used")
}
Expand Down
4 changes: 4 additions & 0 deletions go/store/nbs/chunk_source_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (csa chunkSourceAdapter) hash() hash.Hash {
return csa.h
}

func (csa chunkSourceAdapter) name() string {
return csa.h.String()
}

func newReaderFromIndexData(ctx context.Context, q MemoryQuotaProvider, idxData []byte, name hash.Hash, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
index, err := parseTableIndexByCopy(ctx, idxData, q)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions go/store/nbs/conjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
return upstream, func() {}, nil
}
for i := range upstream.appendix {
if upstream.appendix[i].name != appendixSpecs[i].name {
if upstream.appendix[i].hash != appendixSpecs[i].hash {
return upstream, func() {}, nil
}
}
Expand All @@ -176,19 +176,19 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
conjoineeSet := map[hash.Hash]struct{}{}
upstreamNames := map[hash.Hash]struct{}{}
for _, spec := range upstream.specs {
upstreamNames[spec.name] = struct{}{}
upstreamNames[spec.hash] = struct{}{}
}
for _, c := range conjoinees {
if _, present := upstreamNames[c.name]; !present {
if _, present := upstreamNames[c.hash]; !present {
return upstream, func() {}, nil // Bail!
}
conjoineeSet[c.name] = struct{}{}
conjoineeSet[c.hash] = struct{}{}
}

// Filter conjoinees out of upstream.specs to generate new set of keepers
keepers = make([]tableSpec, 0, len(upstream.specs)-len(conjoinees))
for _, spec := range upstream.specs {
if _, present := conjoineeSet[spec.name]; !present {
if _, present := conjoineeSet[spec.hash]; !present {
keepers = append(keepers, spec)
}
}
Expand All @@ -202,7 +202,7 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
for idx := range conjoinees {
i, spec := idx, conjoinees[idx]
eg.Go(func() (err error) {
toConjoin[i], err = p.Open(ectx, spec.name, spec.chunkCount, stats)
toConjoin[i], err = p.Open(ectx, spec.hash, spec.chunkCount, stats)
return
})
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
if err != nil {
return tableSpec{}, nil, err
}
return tableSpec{h, cnt}, cleanup, nil
return tableSpec{typeNoms, h, cnt}, cleanup, nil
}

func toSpecs(srcs chunkSources) ([]tableSpec, error) {
Expand All @@ -258,7 +258,7 @@ func toSpecs(srcs chunkSources) ([]tableSpec, error) {
if err != nil {
return nil, err
}
specs[i] = tableSpec{h, cnt}
specs[i] = tableSpec{typeNoms, h, cnt}
}

return specs, nil
Expand Down
8 changes: 4 additions & 4 deletions go/store/nbs/conjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (ts tableSpecsByAscendingCount) Len() int { return len(ts) }
func (ts tableSpecsByAscendingCount) Less(i, j int) bool {
tsI, tsJ := ts[i], ts[j]
if tsI.chunkCount == tsJ.chunkCount {
return bytes.Compare(tsI.name[:], tsJ.name[:]) < 0
return bytes.Compare(tsI.hash[:], tsJ.hash[:]) < 0
}
return tsI.chunkCount < tsJ.chunkCount
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func makeTestSrcs(t *testing.T, tableSizes []uint32, p tablePersister) (srcs chu
// Makes a tableSet with len(tableSizes) upstream tables containing tableSizes[N] unique chunks
func makeTestTableSpecs(t *testing.T, tableSizes []uint32, p tablePersister) (specs []tableSpec) {
for _, src := range makeTestSrcs(t, tableSizes, p) {
specs = append(specs, tableSpec{src.hash(), mustUint32(src.count())})
specs = append(specs, tableSpec{typeNoms, src.hash(), mustUint32(src.count())})
err := src.close()
require.NoError(t, err)
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
assertContainAll := func(t *testing.T, p tablePersister, expect, actual []tableSpec) {
open := func(specs []tableSpec) (sources chunkSources) {
for _, sp := range specs {
cs, err := p.Open(context.Background(), sp.name, sp.chunkCount, stats)
cs, err := p.Open(context.Background(), sp.hash, sp.chunkCount, stats)
if err != nil {
require.NoError(t, err)
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func testConjoin(t *testing.T, factory func(t *testing.T) tablePersister) {
src, err := p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
defer src.close()
return tableSpec{src.hash(), mustUint32(src.count())}
return tableSpec{typeNoms, src.hash(), mustUint32(src.count())}
}

tc := []struct {
Expand Down
22 changes: 11 additions & 11 deletions go/store/nbs/dynamo_manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ func TestDynamoManifestParseIfExists(t *testing.T) {
assert.Equal(newLock, contents.lock)
assert.Equal(newRoot, contents.root)
if assert.Len(contents.appendix, 1) {
assert.Equal(tableName.String(), contents.specs[0].name.String())
assert.Equal(tableName.String(), contents.specs[0].hash.String())
assert.Equal(uint32(0), contents.specs[0].chunkCount)
assert.Equal(tableName.String(), contents.appendix[0].name.String())
assert.Equal(tableName.String(), contents.appendix[0].hash.String())
assert.Equal(uint32(0), contents.appendix[0].chunkCount)
}
if assert.Len(contents.specs, 2) {
assert.Equal(tableName.String(), contents.specs[1].name.String())
assert.Equal(tableName.String(), contents.specs[1].hash.String())
assert.Equal(uint32(0), contents.specs[1].chunkCount)
}
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestDynamoManifestUpdate(t *testing.T) {
stats := &Stats{}

// First, test winning the race against another process.
contents := makeContents("locker", "nuroot", []tableSpec{{computeAddr([]byte("a")), 3}}, nil)
contents := makeContents("locker", "nuroot", []tableSpec{{typeNoms, computeAddr([]byte("a")), 3}}, nil)
upstream, err := mm.Update(context.Background(), hash.Hash{}, contents, stats, func() error {
// This should fail to get the lock, and therefore _not_ clobber the manifest. So the Update should succeed.
lock := computeAddr([]byte("nolock"))
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestDynamoManifestUpdate(t *testing.T) {
require.NoError(t, err)
assert.Equal(jerkLock, upstream.lock)
assert.Equal(rejected.root, upstream.root)
assert.Equal([]tableSpec{{tableName, 1}}, upstream.specs)
assert.Equal([]tableSpec{{typeNoms, tableName, 1}}, upstream.specs)
}

func TestDynamoManifestUpdateAppendix(t *testing.T) {
Expand All @@ -155,11 +155,11 @@ func TestDynamoManifestUpdateAppendix(t *testing.T) {

// First, test winning the race against another process.
specs := []tableSpec{
{computeAddr([]byte("app-a")), 3},
{computeAddr([]byte("a")), 3},
{typeNoms, computeAddr([]byte("app-a")), 3},
{typeNoms, computeAddr([]byte("a")), 3},
}

app := []tableSpec{{computeAddr([]byte("app-a")), 3}}
app := []tableSpec{{typeNoms, computeAddr([]byte("app-a")), 3}}
contents := makeContents("locker", "nuroot", specs, app)

upstream, err := mm.Update(context.Background(), hash.Hash{}, contents, stats, func() error {
Expand Down Expand Up @@ -204,8 +204,8 @@ func TestDynamoManifestUpdateAppendix(t *testing.T) {
require.NoError(t, err)
assert.Equal(jerkLock, upstream.lock)
assert.Equal(rejected.root, upstream.root)
assert.Equal([]tableSpec{{appTableName, 1}, {tableName, 1}}, upstream.specs)
assert.Equal([]tableSpec{{appTableName, 1}}, upstream.appendix)
assert.Equal([]tableSpec{{typeNoms, appTableName, 1}, {typeNoms, tableName, 1}}, upstream.specs)
assert.Equal([]tableSpec{{typeNoms, appTableName, 1}}, upstream.appendix)
}

func TestDynamoManifestCaching(t *testing.T) {
Expand All @@ -231,7 +231,7 @@ func TestDynamoManifestCaching(t *testing.T) {

// When failing the optimistic lock, we should hit persistent storage.
reads = ddb.NumGets()
contents := makeContents("lock2", "nuroot", []tableSpec{{computeAddr([]byte("a")), 3}}, nil)
contents := makeContents("lock2", "nuroot", []tableSpec{{typeNoms, computeAddr([]byte("a")), 3}}, nil)
upstream, err := mm.Update(context.Background(), hash.Hash{}, contents, stats, nil)
require.NoError(t, err)
assert.NotEqual(contents.lock, upstream.lock)
Expand Down
4 changes: 4 additions & 0 deletions go/store/nbs/empty_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func (ecs emptyChunkSource) hash() hash.Hash {
return hash.Hash{}
}

func (ecs emptyChunkSource) name() string {
return ecs.hash().String()
}

func (ecs emptyChunkSource) index() (tableIndex, error) {
return onHeapTableIndex{}, nil
}
Expand Down
6 changes: 3 additions & 3 deletions go/store/nbs/file_manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestFileManifestLoadIfExists(t *testing.T) {
assert.Equal(jerk, upstream.lock)
assert.Equal(newRoot, upstream.root)
if assert.Len(upstream.specs, 1) {
assert.Equal(tableName.String(), upstream.specs[0].name.String())
assert.Equal(tableName.String(), upstream.specs[0].hash.String())
assert.Equal(uint32(0), upstream.specs[0].chunkCount)
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestFileManifestUpdate(t *testing.T) {
nbfVers: constants.FormatLD1String,
lock: computeAddr([]byte("locker")),
root: hash.Of([]byte("new root")),
specs: []tableSpec{{computeAddr([]byte("a")), 3}},
specs: []tableSpec{{typeNoms, computeAddr([]byte("a")), 3}},
}
upstream, err := fm.Update(context.Background(), hash.Hash{}, contents, stats, func() error {
// This should fail to get the lock, and therefore _not_ clobber the manifest. So the Update should succeed.
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestFileManifestUpdate(t *testing.T) {
require.NoError(t, err)
assert.Equal(jerkLock, upstream.lock)
assert.Equal(contents2.root, upstream.root)
assert.Equal([]tableSpec{{tableName, 1}}, upstream.specs)
assert.Equal([]tableSpec{{typeNoms, tableName, 1}}, upstream.specs)
}

// tryClobberManifest simulates another process trying to access dir/manifestFileName concurrently. To avoid deadlock, it does a non-blocking lock of dir/lockFileName. If it can get the lock, it clobbers the manifest.
Expand Down
4 changes: 4 additions & 0 deletions go/store/nbs/file_table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func (ftr *fileTableReader) hash() hash.Hash {
return ftr.h
}

func (ftr *fileTableReader) name() string {
return ftr.h.String()
}

func (ftr *fileTableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
count := ftr.idx.chunkCount()

Expand Down
Loading

0 comments on commit 7c30ec5

Please sign in to comment.