Skip to content

Commit

Permalink
fix: errors reported by the race detector (#13174)
Browse files Browse the repository at this point in the history
Fixes #8586

Signed-off-by: Bryan Boreham <[email protected]>
  • Loading branch information
bboreham authored Jun 10, 2024
1 parent d4fcef5 commit 2b19dac
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 79 deletions.
47 changes: 25 additions & 22 deletions pkg/analytics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,31 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed {
MaxRetries: 0,
})
for backoff.Ongoing() {
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
PrometheusVersion: build.GetVersion(),
CreatedAt: time.Now(),
}
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
{
// create a new cluster seed
seed := ClusterSeed{
UID: uuid.NewString(),
PrometheusVersion: build.GetVersion(),
CreatedAt: time.Now(),
}
if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) {
// The key is already set, so we don't need to do anything
if in != nil {
if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID {
seed = *kvSeed
return nil, false, nil
}
}
return &seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
return &seed, true, nil
}); err != nil {
level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err)
continue
}
// ensure stability of the cluster seed
stableSeed := ensureStableKey(ctx, kvClient, rep.logger)
seed = *stableSeed
// This is a new local variable so that Go knows it's not racing with the previous usage.
seed := *stableSeed
// Fetch the remote cluster seed.
remoteSeed, err := rep.fetchSeed(ctx,
func(err error) bool {
Expand Down Expand Up @@ -262,7 +265,7 @@ func (rep *Reporter) running(ctx context.Context) error {
}
return nil
}
rep.startCPUPercentCollection(ctx)
rep.startCPUPercentCollection(ctx, time.Minute)
// check every minute if we should report.
ticker := time.NewTicker(reportCheckInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -317,13 +320,13 @@ func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error
return errs.Err()
}

const cpuUsageKey = "cpu_usage"

var (
cpuUsageKey = "cpu_usage"
cpuUsage = NewFloat(cpuUsageKey)
cpuCollectionInterval = time.Minute
cpuUsage = NewFloat(cpuUsageKey)
)

func (rep *Reporter) startCPUPercentCollection(ctx context.Context) {
func (rep *Reporter) startCPUPercentCollection(ctx context.Context, cpuCollectionInterval time.Duration) {
proc, err := process.NewProcess(int32(os.Getpid()))
if err != nil {
level.Debug(rep.logger).Log("msg", "failed to get process", "err", err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/analytics/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,13 @@ func TestWrongKV(t *testing.T) {
}

func TestStartCPUCollection(t *testing.T) {
cpuCollectionInterval = 1 * time.Second
r, err := NewReporter(Config{Leader: true, Enabled: true}, kv.Config{
Store: "inmemory",
}, nil, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry())
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r.startCPUPercentCollection(ctx)
r.startCPUPercentCollection(ctx, 1*time.Second)
require.Eventually(t, func() bool {
return cpuUsage.Value() > 0
}, 5*time.Second, 1*time.Second)
Expand Down
7 changes: 4 additions & 3 deletions pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -87,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
require.NoError(t, err)

require.Eventually(t, func() bool {
return server.completedTasks == len(tasks)
return int(server.completedTasks.Load()) == len(tasks)
}, 5*time.Second, 100*time.Millisecond)

err = services.StopAndAwaitTerminated(context.Background(), builder)
Expand All @@ -98,7 +99,7 @@ func Test_BuilderLoop(t *testing.T) {

type fakePlannerServer struct {
tasks []*protos.ProtoTask
completedTasks int
completedTasks atomic.Int64
shutdownCalled bool

addr string
Expand Down Expand Up @@ -148,7 +149,7 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive task response: %w", err)
}
f.completedTasks++
f.completedTasks.Add(1)
}

// No more tasks. Wait until shutdown.
Expand Down
10 changes: 5 additions & 5 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (p *Planner) totalPendingTasks() (total int) {
func (p *Planner) enqueueTask(task *QueueTask) error {
p.activeUsers.UpdateUserTimestamp(task.Tenant, time.Now())
return p.tasksQueue.Enqueue(task.Tenant, nil, task, func() {
task.timesEnqueued++
task.timesEnqueued.Add(1)
p.addPendingTask(task)
})
}
Expand Down Expand Up @@ -761,12 +761,12 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
result, err := p.forwardTaskToBuilder(builder, builderID, task)
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && task.timesEnqueued >= maxRetries {
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.metrics.tasksFailed.Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
"retries", task.timesEnqueued,
"retries", task.timesEnqueued.Load(),
"maxRetries", maxRetries,
"err", err,
)
Expand All @@ -792,7 +792,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
p.metrics.tasksRequeued.Inc()
level.Error(logger).Log(
"msg", "error forwarding task to builder, Task requeued",
"retries", task.timesEnqueued,
"retries", task.timesEnqueued.Load(),
"err", err,
)
continue
Expand All @@ -801,7 +801,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
level.Debug(logger).Log(
"msg", "task completed",
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued,
"retries", task.timesEnqueued.Load(),
)
p.removePendingTask(task)

Expand Down
38 changes: 23 additions & 15 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -517,7 +518,7 @@ func Test_BuilderLoop(t *testing.T) {
resultsCh := make(chan *protos.TaskResult, nTasks)
tasks := createTasks(nTasks, resultsCh)
for _, task := range tasks {
err = planner.enqueueTask(task)
err := planner.enqueueTask(task)
require.NoError(t, err)
}

Expand All @@ -527,10 +528,10 @@ func Test_BuilderLoop(t *testing.T) {
builder := newMockBuilder(fmt.Sprintf("builder-%d", i))
builders = append(builders, builder)

go func() {
err = planner.BuilderLoop(builder)
require.ErrorIs(t, err, tc.expectedBuilderLoopError)
}()
go func(expectedBuilderLoopError error) {
err := planner.BuilderLoop(builder)
require.ErrorIs(t, err, expectedBuilderLoopError)
}(tc.expectedBuilderLoopError)
}

// Eventually, all tasks should be sent to builders
Expand Down Expand Up @@ -558,7 +559,7 @@ func Test_BuilderLoop(t *testing.T) {

// Enqueue tasks again
for _, task := range tasks {
err = planner.enqueueTask(task)
err := planner.enqueueTask(task)
require.NoError(t, err)
}

Expand Down Expand Up @@ -809,14 +810,15 @@ func Test_processTenantTaskResults(t *testing.T) {
}

type fakeBuilder struct {
mx sync.Mutex // Protects tasks and currTaskIdx.
id string
tasks []*protos.Task
currTaskIdx int
grpc.ServerStream

returnError bool
returnErrorMsg bool
wait bool
returnError atomic.Bool
returnErrorMsg atomic.Bool
wait atomic.Bool
ctx context.Context
ctxCancel context.CancelFunc
}
Expand All @@ -833,19 +835,21 @@ func newMockBuilder(id string) *fakeBuilder {
}

func (f *fakeBuilder) ReceivedTasks() []*protos.Task {
f.mx.Lock()
defer f.mx.Unlock()
return f.tasks
}

func (f *fakeBuilder) SetReturnError(b bool) {
f.returnError = b
f.returnError.Store(b)
}

func (f *fakeBuilder) SetReturnErrorMsg(b bool) {
f.returnErrorMsg = b
f.returnErrorMsg.Store(b)
}

func (f *fakeBuilder) SetWait(b bool) {
f.wait = b
f.wait.Store(b)
}

func (f *fakeBuilder) CancelContext(b bool) {
Expand Down Expand Up @@ -873,6 +877,8 @@ func (f *fakeBuilder) Send(req *protos.PlannerToBuilder) error {
return err
}

f.mx.Lock()
defer f.mx.Unlock()
f.tasks = append(f.tasks, task)
f.currTaskIdx++
return nil
Expand All @@ -886,12 +892,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}, nil
}

if f.returnError {
if f.returnError.Load() {
return nil, fmt.Errorf("fake error from %s", f.id)
}

// Wait until `wait` is false
for f.wait {
for f.wait.Load() {
time.Sleep(time.Second)
}

Expand All @@ -901,10 +907,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) {
}

var errMsg string
if f.returnErrorMsg {
if f.returnErrorMsg.Load() {
errMsg = fmt.Sprintf("fake error from %s", f.id)
}

f.mx.Lock()
defer f.mx.Unlock()
return &protos.BuilderToPlanner{
BuilderID: f.id,
Result: protos.ProtoTaskResult{
Expand Down
4 changes: 3 additions & 1 deletion pkg/bloombuild/planner/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"time"

"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
)

Expand All @@ -13,7 +15,7 @@ type QueueTask struct {
resultsChannel chan *protos.TaskResult

// Tracking
timesEnqueued int
timesEnqueued atomic.Int64
queueTime time.Time
ctx context.Context
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("request fails when providing invalid block", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(refs, queriers, metas)

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.err = errors.New("request failed")

reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

// replace store implementation and re-initialize workers and sub-services
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.delay = 2000 * time.Millisecond

reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, newMockBloomStore(nil, nil), logger, reg)
gw, err := New(cfg, newMockBloomStore(nil, nil, nil), logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

reg := prometheus.NewRegistry()
store := newMockBloomStore(queriers, metas)
store := newMockBloomStore(refs, queriers, metas)

gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 2b19dac

Please sign in to comment.