Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace usage of sync/atomic with uber-go/atomic #2449

Merged
merged 5 commits into from
Jul 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ publish: dist

lint:
GO111MODULE=on GOGC=10 golangci-lint run -v $(GOLANGCI_ARG)
faillint -paths "sync/atomic=go.uber.org/atomic" ./...

########
# Test #
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9
go.etcd.io/bbolt v1.3.5-0.20200615073812-232d8fc87f50
go.uber.org/atomic v1.6.0
golang.org/x/net v0.0.0-20200707034311-ab3426394381
google.golang.org/grpc v1.29.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
Expand Down
9 changes: 9 additions & 0 deletions loki-build-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ RUN apk add --no-cache docker-cli
FROM golang:1.14.2 as drone
RUN GO111MODULE=on go get github.com/drone/drone-cli/drone@1fad337d74ca0ecf420993d9d2d7229a1c99f054

# Install faillint used to lint go imports in CI.
# This collisions with the version of go tools used in the base image, thus we install it in its own image and copy it over.
# Error:
# github.com/fatih/[email protected] requires golang.org/x/[email protected]
# (not golang.org/x/[email protected] from golang.org/x/tools/cmd/goyacc@58d531046acdc757f177387bc1725bfa79895d69)
FROM golang:1.14.2 as faillint
RUN GO111MODULE=on go get github.com/fatih/[email protected]

FROM golang:1.14.2-stretch
RUN apt-get update && \
apt-get install -qy \
Expand All @@ -33,6 +41,7 @@ COPY --from=docker /usr/bin/docker /usr/bin/docker
COPY --from=helm /usr/bin/helm /usr/bin/helm
COPY --from=golangci /bin/golangci-lint /usr/local/bin
COPY --from=drone /go/bin/drone /usr/bin/drone
COPY --from=faillint /go/bin/faillint /usr/bin/faillint

# Install some necessary dependencies.
# Forcing GO111MODULE=on is required to specify dependencies at specific versions using the go mod notation.
Expand Down
20 changes: 10 additions & 10 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"net/http"
"sync/atomic"
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/pkg/errors"
"go.uber.org/atomic"

"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -164,14 +164,14 @@ type streamTracker struct {
stream logproto.Stream
minSuccess int
maxFailures int
succeeded int32
failed int32
succeeded atomic.Int32
failed atomic.Int32
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
type pushTracker struct {
samplesPending int32
samplesFailed int32
samplesPending atomic.Int32
samplesFailed atomic.Int32
done chan struct{}
err chan error
}
Expand Down Expand Up @@ -263,10 +263,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

tracker := pushTracker{
samplesPending: int32(len(streams)),
done: make(chan struct{}),
err: make(chan error),
}
tracker.samplesPending.Store(int32(len(streams)))
for ingester, samples := range samplesByIngester {
go func(ingester ring.IngesterDesc, samples []*streamTracker) {
// Use a background context to make sure all ingesters get samples even if we return early
Expand Down Expand Up @@ -304,17 +304,17 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes
// goroutine will write to either channel.
for i := range streamTrackers {
if err != nil {
if atomic.AddInt32(&streamTrackers[i].failed, 1) <= int32(streamTrackers[i].maxFailures) {
if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) {
continue
}
if atomic.AddInt32(&pushTracker.samplesFailed, 1) == 1 {
if pushTracker.samplesFailed.Inc() == 1 {
pushTracker.err <- err
}
} else {
if atomic.AddInt32(&streamTrackers[i].succeeded, 1) != int32(streamTrackers[i].minSuccess) {
if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) {
continue
}
if atomic.AddInt32(&pushTracker.samplesPending, -1) == 0 {
if pushTracker.samplesPending.Dec() == 0 {
pushTracker.done <- struct{}{}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"

"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -14,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
)

const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping.
Expand All @@ -24,7 +24,7 @@ var separatorString = string([]byte{model.SeparatorByte})
// collisions.
type fpMapper struct {
// highestMappedFP has to be aligned for atomic operations.
highestMappedFP model.Fingerprint
highestMappedFP atomic.Uint64

mtx sync.RWMutex // Protects mappings.
// maps original fingerprints to a map of string representations of
Expand Down Expand Up @@ -163,7 +163,7 @@ func (m *fpMapper) maybeAddMapping(fp model.Fingerprint, collidingMetric []clien
}

func (m *fpMapper) nextMappedFP() model.Fingerprint {
mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1))
mappedFP := model.Fingerprint(m.highestMappedFP.Inc())
if mappedFP > maxMappedFP {
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
}
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ go.opencensus.io/trace/internal
go.opencensus.io/trace/propagation
go.opencensus.io/trace/tracestate
# go.uber.org/atomic v1.6.0
## explicit
go.uber.org/atomic
# go.uber.org/goleak v1.0.0
go.uber.org/goleak
Expand Down