Skip to content

Commit

Permalink
WIP: More use of the ToChunker interface
Browse files Browse the repository at this point in the history
  • Loading branch information
macneale4 committed Jan 7, 2025
1 parent e7343c9 commit 2a67256
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 6 deletions.
10 changes: 5 additions & 5 deletions go/libraries/doltcore/remotestorage/chunk_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type ChunkFetcher struct {
egCtx context.Context

toGetCh chan hash.HashSet
resCh chan nbs.CompressedChunk
resCh chan nbs.ToChunker

abortCh chan struct{}
stats StatsRecorder
Expand All @@ -69,7 +69,7 @@ func NewChunkFetcher(ctx context.Context, dcs *DoltChunkStore) *ChunkFetcher {
egCtx: ctx,

toGetCh: make(chan hash.HashSet),
resCh: make(chan nbs.CompressedChunk),
resCh: make(chan nbs.ToChunker),

abortCh: make(chan struct{}),
stats: StatsFactory(),
Expand Down Expand Up @@ -219,7 +219,7 @@ func fetcherHashSetToGetDlLocsReqsThread(ctx context.Context, reqCh chan hash.Ha
// delivered in |reqCh|, and they will be delivered in order.
//
// This function handles backoff and retries for the underlying streaming RPC.
func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.GetDownloadLocsRequest, resCh chan []*remotesapi.DownloadLoc, client remotesapi.ChunkStoreServiceClient, storeRepoToken func(string), missingChunkCh chan nbs.CompressedChunk, host string) error {
func fetcherRPCDownloadLocsThread(ctx context.Context, reqCh chan *remotesapi.GetDownloadLocsRequest, resCh chan []*remotesapi.DownloadLoc, client remotesapi.ChunkStoreServiceClient, storeRepoToken func(string), missingChunkCh chan nbs.ToChunker, host string) error {
stream, err := reliable.MakeCall[*remotesapi.GetDownloadLocsRequest, *remotesapi.GetDownloadLocsResponse](
ctx,
reliable.CallOptions[*remotesapi.GetDownloadLocsRequest, *remotesapi.GetDownloadLocsResponse]{
Expand Down Expand Up @@ -527,7 +527,7 @@ func (cc *ConcurrencyControl) Run(ctx context.Context, done <-chan struct{}, ss
}
}

func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, doneCh chan struct{}, chunkCh chan nbs.CompressedChunk, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, doneCh chan struct{}, chunkCh chan nbs.ToChunker, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
eg, ctx := errgroup.WithContext(ctx)
cc := &ConcurrencyControl{
MaxConcurrency: params.MaximumConcurrentDownloads,
Expand Down Expand Up @@ -559,7 +559,7 @@ func fetcherDownloadURLThreads(ctx context.Context, fetchReqCh chan fetchReq, do
return nil
}

func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, doneCh <-chan struct{}, chunkCh chan nbs.CompressedChunk, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
func fetcherDownloadURLThread(ctx context.Context, fetchReqCh chan fetchReq, doneCh <-chan struct{}, chunkCh chan nbs.ToChunker, client remotesapi.ChunkStoreServiceClient, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams) error {
respCh := make(chan fetchResp)
cancelCh := make(chan struct{})
for {
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func sortRangesBySize(ranges []*GetRange) {

type resourcePathToUrlFunc func(ctx context.Context, lastError error, resourcePath string) (url string, err error)

func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams, chunkChan chan nbs.CompressedChunk, pathToUrl resourcePathToUrlFunc) func() error {
func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, health reliable.HealthRecorder, fetcher HTTPFetcher, params NetworkRequestParams, chunkChan chan nbs.ToChunker, pathToUrl resourcePathToUrlFunc) func() error {
if len(gr.Ranges) == 0 {
return func() error { return nil }
}
Expand Down

0 comments on commit 2a67256

Please sign in to comment.