Skip to content

Commit

Permalink
Use concurrency.ForEachJob() from new dskit
Browse files Browse the repository at this point in the history
Ref: grafana/dskit#113

Updated all usages of concurrency.ForEach() to use ForEachJob() so no
type assertion is needed anymore.

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega committed Jan 12, 2022
1 parent 739c7a4 commit 0c4c32d
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 123 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/gorilla/mux v1.8.0
github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134
github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d
github.com/hashicorp/golang-lru v0.5.4
github.com/json-iterator/go v1.1.12
github.com/leanovate/gopter v0.2.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134 h1:WhDvHde5WYR/dHSFeTfTukgbvVMhO7o9zezACAKfwV0=
github.com/grafana/dskit v0.0.0-20220104154758-acbb88132134/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE=
github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d h1:YwUtZIQFjlH6e2b5dFLfW1h/vTkTXNkZqv9qeU8b5h0=
github.com/grafana/dskit v0.0.0-20220112093026-95274ccc858d/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/grafana/memberlist v0.2.5-0.20211201083710-c7bc8e9df94b/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
Expand Down
4 changes: 2 additions & 2 deletions pkg/alertmanager/alertstore/bucketclient/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string
cfgs = make(map[string]alertspb.AlertConfigDesc, len(userIDs))
)

err := concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), fetchConcurrency, func(ctx context.Context, job interface{}) error {
userID := job.(string)
err := concurrency.ForEachJob(ctx, len(userIDs), fetchConcurrency, func(ctx context.Context, idx int) error {
userID := userIDs[idx]

cfg, err := s.getAlertConfig(ctx, userID)
if s.alertsBucket.IsObjNotFoundErr(err) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,9 +1141,8 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use
)

// Note that the jobs swallow the errors - this is because we want to give each replica a chance to respond.
jobs := concurrency.CreateJobsFromStrings(addrs)
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
addr := job.(string)
err = concurrency.ForEachJob(ctx, len(addrs), len(addrs), func(ctx context.Context, idx int) error {
addr := addrs[idx]
level.Debug(am.logger).Log("msg", "contacting replica for full state", "user", userID, "addr", addr)

c, err := am.alertmanagerClientsPool.GetClientFor(addr)
Expand Down Expand Up @@ -1180,7 +1179,7 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use
}

// If all replicas do not know the user, propagate that outcome for the client to decide what to do.
if notFound == len(jobs) {
if notFound == len(addrs) {
return nil, errAllReplicasUserNotFound
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ const deleteBlocksConcurrency = 16

// Concurrently deletes blocks marked for deletion, and removes blocks from index.
func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *bucketindex.Index, userBucket objstore.Bucket, userLogger log.Logger) {
blocksToDelete := make([]interface{}, 0, len(idx.BlockDeletionMarks))
blocksToDelete := make([]ulid.ULID, 0, len(idx.BlockDeletionMarks))

// Collect blocks marked for deletion into buffered channel.
for _, mark := range idx.BlockDeletionMarks {
Expand All @@ -377,8 +377,8 @@ func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *
var mu sync.Mutex

// We don't want to return errors from our function, as that would stop ForEach loop early.
_ = concurrency.ForEach(ctx, blocksToDelete, deleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
blockID := job.(ulid.ULID)
_ = concurrency.ForEachJob(ctx, len(blocksToDelete), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
blockID := blocksToDelete[jobIdx]

if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil {
c.blocksFailedTotal.Inc()
Expand All @@ -401,7 +401,7 @@ func (c *BlocksCleaner) deleteBlocksMarkedForDeletion(ctx context.Context, idx *
// and index are updated accordingly.
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
// Collect all blocks with missing meta.json into buffered channel.
blocks := make([]interface{}, 0, len(partials))
blocks := make([]ulid.ULID, 0, len(partials))

for blockID, blockErr := range partials {
// We can safely delete only blocks which are partial because the meta.json is missing.
Expand All @@ -415,8 +415,8 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map
var mu sync.Mutex

// We don't want to return errors from our function, as that would stop ForEach loop early.
_ = concurrency.ForEach(ctx, blocks, deleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
blockID := job.(ulid.ULID)
_ = concurrency.ForEachJob(ctx, len(blocks), deleteBlocksConcurrency, func(ctx context.Context, jobIdx int) error {
blockID := blocks[jobIdx]

// We can safely delete only partial blocks with a deletion mark.
err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{})
Expand Down
34 changes: 13 additions & 21 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
// Once we have a plan we need to download the actual data.
downloadBegin := time.Now()

err = concurrency.ForEach(ctx, convertBlocksToForEachJobs(toCompact), c.blockSyncConcurrency, func(ctx context.Context, job interface{}) error {
meta := job.(*metadata.Meta)
err = concurrency.ForEachJob(ctx, len(toCompact), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
meta := toCompact[idx]

// Must be the same as in blocksToCompactDirs.
bdir := filepath.Join(subDir, meta.ULID.String())
Expand Down Expand Up @@ -442,19 +442,19 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
uploadBegin := time.Now()
uploadedBlocks := atomic.NewInt64(0)

err = concurrency.ForEach(ctx, convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger), c.blockSyncConcurrency, func(ctx context.Context, j interface{}) error {
shardID := j.(ulidWithShardIndex).shardIndex
compID := j.(ulidWithShardIndex).ulid
blocksToUpload := convertCompactionResultToForEachJobs(compIDs, job.UseSplitting(), jobLogger)
err = concurrency.ForEachJob(ctx, len(blocksToUpload), c.blockSyncConcurrency, func(ctx context.Context, idx int) error {
blockToUpload := blocksToUpload[idx]

uploadedBlocks.Inc()

bdir := filepath.Join(subDir, compID.String())
bdir := filepath.Join(subDir, blockToUpload.ulid.String())
index := filepath.Join(bdir, block.IndexFilename)

// When splitting is enabled, we need to inject the shard ID as external label.
newLabels := job.Labels().Map()
if job.UseSplitting() {
newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(shardID), uint64(job.SplittingShards()))
newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))
}

newMeta, err := metadata.InjectThanos(jobLogger, bdir, metadata.Thanos{
Expand All @@ -478,11 +478,11 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul

begin := time.Now()
if err := block.Upload(ctx, jobLogger, c.bkt, bdir, job.hashFunc); err != nil {
return errors.Wrapf(err, "upload of %s failed", compID)
return errors.Wrapf(err, "upload of %s failed", blockToUpload.ulid)
}

elapsed := time.Since(begin)
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", compID, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels))
level.Info(jobLogger).Log("msg", "uploaded block", "result_block", blockToUpload.ulid, "duration", elapsed, "duration_ms", elapsed.Milliseconds(), "external_labels", labels.FromMap(newLabels))
return nil
})
if err != nil {
Expand All @@ -505,18 +505,10 @@ func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shoul
return true, compIDs, nil
}

func convertBlocksToForEachJobs(input []*metadata.Meta) []interface{} {
result := make([]interface{}, len(input))
for ix := range input {
result[ix] = input[ix]
}
return result
}

// Converts ULIDs from compaction to interface{}, and also filters out empty ULIDs. When handling result of split
// compactions, shard index is index in the slice returned by compaction.
func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob bool, jobLogger log.Logger) []interface{} {
result := make([]interface{}, 0, len(compactedBlocks))
// convertCompactionResultToForEachJobs filters out empty ULIDs.
// When handling result of split compactions, shard index is index in the slice returned by compaction.
func convertCompactionResultToForEachJobs(compactedBlocks []ulid.ULID, splitJob bool, jobLogger log.Logger) []ulidWithShardIndex {
result := make([]ulidWithShardIndex, 0, len(compactedBlocks))

for ix, id := range compactedBlocks {
// Skip if it's an empty block.
Expand Down
11 changes: 1 addition & 10 deletions pkg/querier/queryrange/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ func (q *shardedQuerier) handleEmbeddedQueries(queries []string, hints *storage.
streams := make([][]SampleStream, len(queries))

// Concurrently run each query. It breaks and cancels each worker context on first error.
err := concurrency.ForEach(q.ctx, createJobIndexes(len(queries)), len(queries), func(ctx context.Context, job interface{}) error {
idx := job.(int)
err := concurrency.ForEachJob(q.ctx, len(queries), len(queries), func(ctx context.Context, idx int) error {
resp, err := q.handler.Do(ctx, q.req.WithQuery(queries[idx]))
if err != nil {
return err
Expand Down Expand Up @@ -149,14 +148,6 @@ func (q *shardedQuerier) Close() error {
return nil
}

func createJobIndexes(l int) []interface{} {
jobs := make([]interface{}, l)
for j := 0; j < l; j++ {
jobs[j] = j
}
return jobs
}

type responseHeadersTracker struct {
headersMx sync.Mutex
headers map[string][]string
Expand Down
17 changes: 7 additions & 10 deletions pkg/querier/queryrange/split_and_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
}

// Generate some random requests.
reqs := make([]interface{}, 0, numQueries)
reqs := make([]Request, 0, numQueries)
for q := 0; q < numQueries; q++ {
// Generate a random time range within min/max time.
startTime := minTime.Add(time.Duration(rand.Int63n(maxTime.Sub(minTime).Milliseconds())) * time.Millisecond)
Expand All @@ -585,15 +585,14 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
// Run the query without the split and cache middleware and store it as expected result.
expectedResMx := sync.Mutex{}
expectedRes := make(map[int64]Response, len(reqs))
require.NoError(t, concurrency.ForEach(ctx, reqs, len(reqs), func(ctx context.Context, job interface{}) error {
req := job.(Request)
res, err := downstream.Do(ctx, req)
require.NoError(t, concurrency.ForEachJob(ctx, len(reqs), len(reqs), func(ctx context.Context, idx int) error {
res, err := downstream.Do(ctx, reqs[idx])
if err != nil {
return err
}

expectedResMx.Lock()
expectedRes[req.GetId()] = res
expectedRes[reqs[idx].GetId()] = res
expectedResMx.Unlock()

return nil
Expand Down Expand Up @@ -628,12 +627,10 @@ func TestSplitAndCacheMiddleware_ResultsCacheFuzzy(t *testing.T) {
).Wrap(downstream)

// Run requests honoring concurrency.
require.NoError(t, concurrency.ForEach(ctx, reqs, maxConcurrency, func(ctx context.Context, job interface{}) error {
req := job.(Request)

actual, err := mw.Do(ctx, req)
require.NoError(t, concurrency.ForEachJob(ctx, len(reqs), maxConcurrency, func(ctx context.Context, idx int) error {
actual, err := mw.Do(ctx, reqs[idx])
require.NoError(t, err)
require.Equal(t, expectedRes[req.GetId()], actual)
require.Equal(t, expectedRes[reqs[idx].GetId()], actual)

return nil
}))
Expand Down
33 changes: 9 additions & 24 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package tenantfederation

import (
"context"
"fmt"
"sort"
"strings"

Expand Down Expand Up @@ -228,8 +227,7 @@ type stringSliceFuncJob struct {
// It doesn't require the output of the stringSliceFunc to be sorted, as results
// of LabelValues are not sorted.
func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, tenants map[string]struct{}) ([]string, storage.Warnings, error) {
var jobs []interface{}

jobs := make([]*stringSliceFuncJob, 0, len(m.ids))
for pos, id := range m.ids {
if tenants != nil {
if _, matched := tenants[id]; !matched {
Expand All @@ -243,13 +241,8 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te
})
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}

var err error
run := func(ctx context.Context, idx int) (err error) {
job := jobs[idx]
job.result, job.warnings, err = f(ctx, job.querier)
if err != nil {
return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(m.idLabelName), job.id)
Expand All @@ -258,20 +251,15 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te
return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
err := concurrency.ForEachJob(m.ctx, len(jobs), maxConcurrency, run)
if err != nil {
return nil, nil, err
}

// aggregate warnings and deduplicate string results
var warnings storage.Warnings
resultMap := make(map[string]struct{})
for _, jobIntf := range jobs {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return nil, nil, fmt.Errorf("unexpected type %T", jobIntf)
}

for _, job := range jobs {
for _, e := range job.result {
resultMap[e] = struct{}{}
}
Expand Down Expand Up @@ -312,7 +300,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
log, ctx := spanlogger.NewWithLogger(m.ctx, m.logger, "mergeQuerier.Select")
defer log.Span.Finish()
matchedValues, filteredMatchers := filterValuesByMatchers(m.idLabelName, m.ids, matchers...)
var jobs = make([]interface{}, len(matchedValues))
var jobs = make([]*selectJob, len(matchedValues))
var seriesSets = make([]storage.SeriesSet, len(matchedValues))
var jobPos int
for labelPos := range m.ids {
Expand All @@ -327,11 +315,8 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
jobPos++
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*selectJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}
run := func(ctx context.Context, idx int) error {
job := jobs[idx]
seriesSets[job.pos] = &addLabelsSeriesSet{
upstream: job.querier.Select(sortSeries, hints, filteredMatchers...),
labels: labels.Labels{
Expand All @@ -344,7 +329,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
return nil
}

err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,9 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta

// Concurrently fetch rules from all rulers. Since rules are not replicated,
// we need all requests to succeed.
jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses())
err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error {
addr := job.(string)
addrs := rulers.GetAddresses()
err = concurrency.ForEachJob(ctx, len(addrs), len(addrs), func(ctx context.Context, idx int) error {
addr := addrs[idx]

rulerClient, err := r.clientsPool.GetClientFor(addr)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions tools/listblocks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ func fetchDeletionTimes(ctx context.Context, bkt objstore.Bucket, deletionMarker
mu := sync.Mutex{}
times := map[ulid.ULID]time.Time{}

return times, concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(deletionMarkers), concurrencyLimit, func(ctx context.Context, job interface{}) error {
r, err := bkt.Get(ctx, job.(string))
return times, concurrency.ForEachJob(ctx, len(deletionMarkers), concurrencyLimit, func(ctx context.Context, idx int) error {
r, err := bkt.Get(ctx, deletionMarkers[idx])
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil
Expand Down Expand Up @@ -179,8 +179,8 @@ func fetchMetas(ctx context.Context, bkt objstore.Bucket, metaFiles []string) (m
mu := sync.Mutex{}
metas := map[ulid.ULID]*metadata.Meta{}

return metas, concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(metaFiles), concurrencyLimit, func(ctx context.Context, job interface{}) error {
r, err := bkt.Get(ctx, job.(string))
return metas, concurrency.ForEachJob(ctx, len(metaFiles), concurrencyLimit, func(ctx context.Context, idx int) error {
r, err := bkt.Get(ctx, metaFiles[idx])
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil
Expand Down
Loading

0 comments on commit 0c4c32d

Please sign in to comment.