diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index fd5c9b12fc1..25747046396 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -18,8 +18,9 @@ package channel import ( + "sync/atomic" + "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" ) type outlet struct { @@ -31,15 +32,14 @@ type outlet struct { func newOutlet(client beat.Client) *outlet { o := &outlet{ client: client, - isOpen: atomic.MakeBool(true), done: make(chan struct{}), } + o.isOpen.Store(true) return o } func (o *outlet) Close() error { - isOpen := o.isOpen.Swap(false) - if isOpen { + if o.isOpen.Swap(false) { close(o.done) return o.client.Close() } diff --git a/filebeat/input/filestream/internal/input-logfile/harvester_test.go b/filebeat/input/filestream/internal/input-logfile/harvester_test.go index d8800c85996..6f5938e308a 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester_test.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester_test.go @@ -23,6 +23,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "testing" "time" @@ -32,7 +33,6 @@ import ( "github.com/elastic/beats/v7/filebeat/input/filestream/internal/task" input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/tests/resources" "github.com/elastic/elastic-agent-libs/logp" ) @@ -128,7 +128,7 @@ func TestDefaultHarvesterGroup(t *testing.T) { t.Run("assert a harvester is only started if harvester limit haven't been reached", func(t *testing.T) { var wg sync.WaitGroup - var harvesterRunningCount atomic.Int + var harvesterRunningCount atomic.Int64 var harvester1Finished, harvester2Finished atomic.Bool done1, done2 := make(chan struct{}), make(chan struct{}) diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 024ca5c9bfd..14c7869d087 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -21,9 +21,9 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/cleanup" "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/statestore" @@ -461,14 +461,14 @@ func (r *resource) isDeleted() bool { // Retain is used to indicate that 'resource' gets an additional 'owner'. // Owners of an resource can be active inputs or pending update operations // not yet written to disk. -func (r *resource) Retain() { r.pending.Inc() } +func (r *resource) Retain() { r.pending.Add(1) } // Release reduced the owner ship counter of the resource. -func (r *resource) Release() { r.pending.Dec() } +func (r *resource) Release() { r.pending.Add(^uint64(0)) } // UpdatesReleaseN is used to release ownership of N pending update operations. func (r *resource) UpdatesReleaseN(n uint) { - r.pending.Sub(uint64(n)) + r.pending.Add(^uint64(n - 1)) } // Finished returns true if the resource is not in use and if there are no pending updates diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 6cc74514bef..3f330587f82 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -25,9 +25,9 @@ import ( "io" "strings" "sync" + "sync/atomic" "time" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/mapstr" input "github.com/elastic/beats/v7/filebeat/input/v2" @@ -391,9 +391,10 @@ func (l *listFromFieldReader) Next() (reader.Message, error) { timestamp, kafkaFields := composeEventMetadata(l.claim, l.groupHandler, msg) messages := l.parseMultipleMessages(msg.Value) - neededAcks := atomic.MakeInt(len(messages)) + neededAcks := atomic.Int64{} + neededAcks.Add(int64(len(messages))) ackHandler := func() { - if neededAcks.Dec() == 0 { + if neededAcks.Add(-1) == 0 { l.groupHandler.ack(msg) } } diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index ad93632b372..e2090f6ebb8 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -25,6 +25,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/gofrs/uuid/v5" @@ -35,7 +36,6 @@ import ( "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/management/status" conf "github.com/elastic/elastic-agent-libs/config" @@ -737,8 +737,8 @@ func (p *Input) createHarvester(logger *logp.Logger, state file.State, onTermina // startHarvester starts a new harvester with the given offset // In case the HarvesterLimit is reached, an error is returned func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int64) error { - if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 { - p.numHarvesters.Dec() + if p.numHarvesters.Add(1) > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 { + p.numHarvesters.Add(^uint32(0)) harvesterSkipped.Add(1) return errHarvesterLimit } @@ -747,15 +747,15 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int state.Offset = offset // Create harvester with state - h, err := p.createHarvester(logger, state, func() { p.numHarvesters.Dec() }) + h, err := p.createHarvester(logger, state, func() { p.numHarvesters.Add(^uint32(0)) }) if err != nil { - p.numHarvesters.Dec() + p.numHarvesters.Add(^uint32(0)) return err } err = h.Setup() if err != nil { - p.numHarvesters.Dec() + p.numHarvesters.Add(^uint32(0)) return fmt.Errorf("error setting up harvester: %w", err) } @@ -765,7 +765,7 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int h.SendStateUpdate() if err = p.harvesters.Start(h); err != nil { - p.numHarvesters.Dec() + p.numHarvesters.Add(^uint32(0)) } return err } diff --git a/filebeat/input/v2/input-cursor/store.go b/filebeat/input/v2/input-cursor/store.go index cc755f046ca..a53bc77a79f 100644 --- a/filebeat/input/v2/input-cursor/store.go +++ b/filebeat/input/v2/input-cursor/store.go @@ -20,9 +20,9 @@ package cursor import ( "strings" "sync" + "sync/atomic" "time" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/cleanup" "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" "github.com/elastic/beats/v7/libbeat/statestore" @@ -235,14 +235,14 @@ func (r *resource) IsNew() bool { // Retain is used to indicate that 'resource' gets an additional 'owner'. // Owners of an resource can be active inputs or pending update operations // not yet written to disk. -func (r *resource) Retain() { r.pending.Inc() } +func (r *resource) Retain() { r.pending.Add(1) } // Release reduced the owner ship counter of the resource. -func (r *resource) Release() { r.pending.Dec() } +func (r *resource) Release() { r.pending.Add(^uint64(0)) } // UpdatesReleaseN is used to release ownership of N pending update operations. func (r *resource) UpdatesReleaseN(n uint) { - r.pending.Sub(uint64(n)) + r.pending.Add(^uint64(n - 1)) } // Finished returns true if the resource is not in use and if there are no pending updates @@ -290,7 +290,7 @@ func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*stat } err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) { - if !strings.HasPrefix(string(key), keyPrefix) { + if !strings.HasPrefix(key, keyPrefix) { return true, nil } diff --git a/filebeat/input/v2/input-stateless/stateless_test.go b/filebeat/input/v2/input-stateless/stateless_test.go index 2febcb7e1b6..731577e76c3 100644 --- a/filebeat/input/v2/input-stateless/stateless_test.go +++ b/filebeat/input/v2/input-stateless/stateless_test.go @@ -22,6 +22,7 @@ import ( "errors" "runtime" "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/require" @@ -29,7 +30,6 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" @@ -111,12 +111,12 @@ func TestStateless_Run(t *testing.T) { defer cancel() // connector creates a client the blocks forever until the shutdown signal is received - var publishCalls atomic.Int + var publishCalls atomic.Int64 connector := pubtest.FakeConnector{ ConnectFunc: func(config beat.ClientConfig) (beat.Client, error) { return &pubtest.FakeClient{ PublishFunc: func(event beat.Event) { - publishCalls.Inc() + publishCalls.Add(1) // Unlock Publish once the input has been cancelled <-ctx.Done() }, @@ -141,24 +141,24 @@ func TestStateless_Run(t *testing.T) { // validate require.Equal(t, context.Canceled, err) - require.Equal(t, 1, publishCalls.Load()) + require.Equal(t, int64(1), publishCalls.Load()) }) t.Run("do not start input of pipeline connection fails", func(t *testing.T) { errOpps := errors.New("oops") connector := pubtest.FailingConnector(errOpps) - var run atomic.Int + var run atomic.Int64 input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{ OnRun: func(_ v2.Context, publisher stateless.Publisher) error { - run.Inc() + run.Add(1) return nil }, }), nil) err := input.Run(v2.Context{}, connector) require.True(t, errors.Is(err, errOpps)) - require.Equal(t, 0, run.Load()) + require.Equal(t, int64(0), run.Load()) }) } diff --git a/filebeat/inputsource/common/streaming/listener.go b/filebeat/inputsource/common/streaming/listener.go index ce0f69d030e..5816c1fa81c 100644 --- a/filebeat/inputsource/common/streaming/listener.go +++ b/filebeat/inputsource/common/streaming/listener.go @@ -26,9 +26,9 @@ import ( "net" "strings" "sync" + "sync/atomic" "github.com/elastic/beats/v7/filebeat/inputsource" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" ) @@ -44,7 +44,7 @@ type Listener struct { wg sync.WaitGroup log *logp.Logger ctx ctxtool.CancelContext - clientsCount atomic.Int + clientsCount atomic.Int64 handlerFactory HandlerFactory listenerFactory ListenerFactory } @@ -190,10 +190,10 @@ func (l *Listener) handleConnection(conn net.Conn) { defer cancel() // Track number of clients. - l.clientsCount.Inc() + l.clientsCount.Add(1) log.Debugw("New client connection", "active_clients", l.clientsCount.Load()) defer func() { - l.clientsCount.Dec() + l.clientsCount.Add(-1) log.Debugw("Client disconnected", "active_clients", l.clientsCount.Load()) }() diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index e32ac671c8e..72db7cdf731 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -236,8 +236,8 @@ func TestDisabledMonitor(t *testing.T) { require.NoError(t, err) require.IsType(t, NoopRunner{}, runner) - require.Equal(t, 0, built.Load()) - require.Equal(t, 0, closed.Load()) + require.Equal(t, int64(0), built.Load()) + require.Equal(t, int64(0), closed.Load()) } } @@ -353,7 +353,7 @@ func TestDuplicateMonitorIDs(t *testing.T) { // Two are counted as built. The bad config is missing a stdfield so it // doesn't complete construction - require.Equal(t, 2, built.Load()) + require.Equal(t, int64(2), built.Load()) // Only 2 closes, because the bad config isn't closed - require.Equal(t, 2, closed.Load()) + require.Equal(t, int64(2), closed.Load()) } diff --git a/heartbeat/monitors/mocks.go b/heartbeat/monitors/mocks.go index 77dee19858a..2c4e5e9d0b5 100644 --- a/heartbeat/monitors/mocks.go +++ b/heartbeat/monitors/mocks.go @@ -21,6 +21,7 @@ import ( "fmt" "regexp" "sync" + "sync/atomic" "testing" "time" @@ -42,7 +43,6 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/heartbeat/scheduler" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" beatversion "github.com/elastic/beats/v7/libbeat/version" ) @@ -212,17 +212,17 @@ func createMockJob() []jobs.Job { return []jobs.Job{j} } -func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { +func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int64, *atomic.Int64) { reg := monitoring.NewRegistry() - built := atomic.NewInt(0) - closed := atomic.NewInt(0) + built := &atomic.Int64{} + closed := &atomic.Int64{} return plugin.PluginFactory{ Name: "test", Aliases: []string{"testAlias"}, Make: func(s string, config *config.C) (plugin.Plugin, error) { - built.Inc() + built.Add(1) // Declare a real config block with a required attr so we can see what happens when it doesn't work unpacked := struct { URLs []string `config:"urls" validate:"required"` @@ -230,7 +230,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { // track all closes, even on error closer := func() error { - closed.Inc() + closed.Add(1) return nil } @@ -248,7 +248,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { closed } -func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int, closed *atomic.Int) { +func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int64, closed *atomic.Int64) { reg := plugin.NewPluginsReg() builder, built, closed := mockPluginBuilder() _ = reg.Add(builder) diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 0890a1697be..7bb3b5d3fae 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -106,10 +106,10 @@ func testMonitorConfig(t *testing.T, conf *conf.C, eventValidator validator.Vali t.Fatalf("No publishes detected!") } - assert.Equal(t, 1, built.Load()) + assert.Equal(t, int64(1), built.Load()) mon.Stop() - assert.Equal(t, 1, closed.Load()) + assert.Equal(t, int64(1), closed.Load()) assert.Equal(t, true, pcClient.closed) } @@ -129,8 +129,8 @@ func TestCheckInvalidConfig(t *testing.T) { require.Nil(t, m, "For this test to work we need a nil value for the monitor.") // These counters are both zero since this fails at config parse time - require.Equal(t, 0, built.Load()) - require.Equal(t, 0, closed.Load()) + require.Equal(t, int64(0), built.Load()) + require.Equal(t, int64(0), closed.Load()) require.Error(t, checkMonitorConfig(serverMonConf, reg)) } diff --git a/heartbeat/scheduler/schedjob.go b/heartbeat/scheduler/schedjob.go index 2a8172b9564..50f94a895d0 100644 --- a/heartbeat/scheduler/schedjob.go +++ b/heartbeat/scheduler/schedjob.go @@ -20,11 +20,11 @@ package scheduler import ( "context" "sync" + "sync/atomic" "time" "golang.org/x/sync/semaphore" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/logp" ) @@ -35,7 +35,7 @@ type schedJob struct { wg *sync.WaitGroup entrypoint TaskFunc jobLimitSem *semaphore.Weighted - activeTasks atomic.Int + activeTasks atomic.Int64 } // runRecursiveJob runs the entry point for a job, blocking until all subtasks are completed. @@ -48,7 +48,6 @@ func newSchedJob(ctx context.Context, s *Scheduler, id string, jobType string, t scheduler: s, jobLimitSem: s.jobLimitSem[jobType], entrypoint: task, - activeTasks: atomic.MakeInt(0), wg: &sync.WaitGroup{}, } } @@ -59,7 +58,7 @@ func newSchedJob(ctx context.Context, s *Scheduler, id string, jobType string, t // The wait group passed into this function expects to already have its count incremented by one. func (sj *schedJob) run() (startedAt time.Time) { sj.wg.Add(1) - sj.activeTasks.Inc() + sj.activeTasks.Add(1) if sj.jobLimitSem != nil { err := sj.jobLimitSem.Acquire(sj.ctx, 1) // Defer release only if acquired @@ -82,7 +81,7 @@ func (sj *schedJob) run() (startedAt time.Time) { // The wait group passed into this function expects to already have its count incremented by one. func (sj *schedJob) runTask(task TaskFunc) time.Time { defer sj.wg.Done() - defer sj.activeTasks.Dec() + defer sj.activeTasks.Add(-1) // The accounting for waiting/active tasks is done using atomics. // Absolute accuracy is not critical here so the gap between modifying waitingTasks and activeJobs is acceptable. @@ -113,7 +112,7 @@ func (sj *schedJob) runTask(task TaskFunc) time.Time { sj.scheduler.stats.activeTasks.Dec() sj.wg.Add(len(continuations)) - sj.activeTasks.Add(len(continuations)) + sj.activeTasks.Add(int64(len(continuations))) for _, cont := range continuations { // Run continuations in parallel, note that these each will acquire their own slots // We can discard the started at times for continuations as those are diff --git a/heartbeat/scheduler/schedjob_test.go b/heartbeat/scheduler/schedjob_test.go index 24e6178e162..f989014f467 100644 --- a/heartbeat/scheduler/schedjob_test.go +++ b/heartbeat/scheduler/schedjob_test.go @@ -20,13 +20,13 @@ package scheduler import ( "context" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/heartbeat/config" - batomic "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -72,7 +72,7 @@ func TestSchedJobRun(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - executed := batomic.MakeBool(false) + executed := &atomic.Bool{} tf := func(ctx context.Context) []TaskFunc { executed.Store(true) @@ -102,14 +102,14 @@ func TestRecursiveForkingJob(t *testing.T) { s := Create(1000, monitoring.NewRegistry(), tarawaTime(), map[string]*config.JobLimit{ "atype": {Limit: 1}, }, false) - ran := batomic.NewInt(0) + var ran atomic.Int64 var terminalTf TaskFunc = func(ctx context.Context) []TaskFunc { - ran.Inc() + ran.Add(1) return nil } var forkingTf TaskFunc = func(ctx context.Context) []TaskFunc { - ran.Inc() + ran.Add(1) return []TaskFunc{ terminalTf, terminalTf, terminalTf, } @@ -118,6 +118,6 @@ func TestRecursiveForkingJob(t *testing.T) { sj := newSchedJob(context.Background(), s, "myid", "atype", forkingTf) sj.run() - require.Equal(t, 4, ran.Load()) + require.Equal(t, int64(4), ran.Load()) } diff --git a/libbeat/common/acker/acker.go b/libbeat/common/acker/acker.go index c75aec13564..47c019a576f 100644 --- a/libbeat/common/acker/acker.go +++ b/libbeat/common/acker/acker.go @@ -19,9 +19,9 @@ package acker import ( "sync" + "sync/atomic" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" ) // Nil creates an ACKer that does nothing. @@ -98,7 +98,7 @@ type gapInfo struct { } func (a *trackingACKer) AddEvent(_ beat.Event, published bool) { - a.events.Inc() + a.events.Add(1) if published { a.addPublishedEvent() } else { @@ -148,7 +148,7 @@ func (a *trackingACKer) addDropEvent() { a.lst.Unlock() current.Unlock() - a.events.Dec() + a.events.Add(^uint32(0)) return } @@ -202,7 +202,7 @@ func (a *trackingACKer) ACKEvents(n int) { current.Unlock() } - a.events.Sub(uint32(total)) + a.events.Add(^uint32(total - 1)) a.fn(acked, total) } diff --git a/libbeat/common/atomic/atomic.go b/libbeat/common/atomic/atomic.go deleted file mode 100644 index 09e83614184..00000000000 --- a/libbeat/common/atomic/atomic.go +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Package atomic provides common primitive types with atomic accessors. -package atomic - -import a "sync/atomic" - -// Bool provides an atomic boolean type. -type Bool struct{ u Uint32 } - -// Int32 provides an atomic int32 type. -type Int32 struct{ value int32 } - -// Int64 provides an atomic int64 type. -type Int64 struct{ value int64 } - -// Uint32 provides an atomic uint32 type. -type Uint32 struct{ value uint32 } - -// Uint64 provides an atomic uint64 type. -type Uint64 struct{ value uint64 } - -func MakeBool(v bool) Bool { return Bool{MakeUint32(encBool(v))} } -func NewBool(v bool) *Bool { return &Bool{MakeUint32(encBool(v))} } -func (b *Bool) Load() bool { return b.u.Load() == 1 } -func (b *Bool) Store(v bool) { b.u.Store(encBool(v)) } -func (b *Bool) Swap(new bool) bool { return b.u.Swap(encBool(new)) == 1 } -func (b *Bool) CAS(old, new bool) bool { return b.u.CAS(encBool(old), encBool(new)) } - -func MakeInt32(v int32) Int32 { return Int32{v} } -func NewInt32(v int32) *Int32 { return &Int32{v} } -func (i *Int32) Load() int32 { return a.LoadInt32(&i.value) } -func (i *Int32) Store(v int32) { a.StoreInt32(&i.value, v) } -func (i *Int32) Swap(new int32) int32 { return a.SwapInt32(&i.value, new) } -func (i *Int32) Add(delta int32) int32 { return a.AddInt32(&i.value, delta) } -func (i *Int32) Sub(delta int32) int32 { return a.AddInt32(&i.value, -delta) } -func (i *Int32) Inc() int32 { return i.Add(1) } -func (i *Int32) Dec() int32 { return i.Add(-1) } -func (i *Int32) CAS(old, new int32) bool { return a.CompareAndSwapInt32(&i.value, old, new) } - -func MakeInt64(v int64) Int64 { return Int64{v} } -func NewInt64(v int64) *Int64 { return &Int64{v} } -func (i *Int64) Load() int64 { return a.LoadInt64(&i.value) } -func (i *Int64) Store(v int64) { a.StoreInt64(&i.value, v) } -func (i *Int64) Swap(new int64) int64 { return a.SwapInt64(&i.value, new) } -func (i *Int64) Add(delta int64) int64 { return a.AddInt64(&i.value, delta) } -func (i *Int64) Sub(delta int64) int64 { return a.AddInt64(&i.value, -delta) } -func (i *Int64) Inc() int64 { return i.Add(1) } -func (i *Int64) Dec() int64 { return i.Add(-1) } -func (i *Int64) CAS(old, new int64) bool { return a.CompareAndSwapInt64(&i.value, old, new) } - -func MakeUint32(v uint32) Uint32 { return Uint32{v} } -func NewUint32(v uint32) *Uint32 { return &Uint32{v} } -func (u *Uint32) Load() uint32 { return a.LoadUint32(&u.value) } -func (u *Uint32) Store(v uint32) { a.StoreUint32(&u.value, v) } -func (u *Uint32) Swap(new uint32) uint32 { return a.SwapUint32(&u.value, new) } -func (u *Uint32) Add(delta uint32) uint32 { return a.AddUint32(&u.value, delta) } -func (u *Uint32) Sub(delta uint32) uint32 { return a.AddUint32(&u.value, ^uint32(delta-1)) } -func (u *Uint32) Inc() uint32 { return u.Add(1) } -func (u *Uint32) Dec() uint32 { return u.Add(^uint32(0)) } -func (u *Uint32) CAS(old, new uint32) bool { return a.CompareAndSwapUint32(&u.value, old, new) } - -func MakeUint64(v uint64) Uint64 { return Uint64{v} } -func NewUint64(v uint64) *Uint64 { return &Uint64{v} } -func (u *Uint64) Load() uint64 { return a.LoadUint64(&u.value) } -func (u *Uint64) Store(v uint64) { a.StoreUint64(&u.value, v) } -func (u *Uint64) Swap(new uint64) uint64 { return a.SwapUint64(&u.value, new) } -func (u *Uint64) Add(delta uint64) uint64 { return a.AddUint64(&u.value, delta) } -func (u *Uint64) Sub(delta uint64) uint64 { return a.AddUint64(&u.value, ^uint64(delta-1)) } -func (u *Uint64) Inc() uint64 { return u.Add(1) } -func (u *Uint64) Dec() uint64 { return u.Add(^uint64(0)) } -func (u *Uint64) CAS(old, new uint64) bool { return a.CompareAndSwapUint64(&u.value, old, new) } - -func encBool(b bool) uint32 { - if b { - return 1 - } - return 0 -} diff --git a/libbeat/common/atomic/atomic32.go b/libbeat/common/atomic/atomic32.go deleted file mode 100644 index d811070a6cd..00000000000 --- a/libbeat/common/atomic/atomic32.go +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//go:build 386 || arm || mips || mipsle - -package atomic - -// atomic Uint/Int for 32bit systems - -// Uint provides an architecture specific atomic uint. -type Uint struct{ a Uint32 } - -// Int provides an architecture specific atomic uint. -type Int struct{ a Int32 } - -func MakeUint(v uint) Uint { return Uint{MakeUint32(uint32(v))} } -func NewUint(v uint) *Uint { return &Uint{MakeUint32(uint32(v))} } -func (u *Uint) Load() uint { return uint(u.a.Load()) } -func (u *Uint) Store(v uint) { u.a.Store(uint32(v)) } -func (u *Uint) Swap(new uint) uint { return uint(u.a.Swap(uint32(new))) } -func (u *Uint) Add(delta uint) uint { return uint(u.a.Add(uint32(delta))) } -func (u *Uint) Sub(delta uint) uint { return uint(u.a.Add(uint32(-delta))) } -func (u *Uint) Inc() uint { return uint(u.a.Inc()) } -func (u *Uint) Dec() uint { return uint(u.a.Dec()) } -func (u *Uint) CAS(old, new uint) bool { return u.a.CAS(uint32(old), uint32(new)) } - -func MakeInt(v int) Int { return Int{MakeInt32(int32(v))} } -func NewInt(v int) *Int { return &Int{MakeInt32(int32(v))} } -func (i *Int) Load() int { return int(i.a.Load()) } -func (i *Int) Store(v int) { i.a.Store(int32(v)) } -func (i *Int) Swap(new int) int { return int(i.a.Swap(int32(new))) } -func (i *Int) Add(delta int) int { return int(i.a.Add(int32(delta))) } -func (i *Int) Sub(delta int) int { return int(i.a.Add(int32(-delta))) } -func (i *Int) Inc() int { return int(i.a.Inc()) } -func (i *Int) Dec() int { return int(i.a.Dec()) } -func (i *Int) CAS(old, new int) bool { return i.a.CAS(int32(old), int32(new)) } diff --git a/libbeat/common/atomic/atomic64.go b/libbeat/common/atomic/atomic64.go deleted file mode 100644 index 3f3648e91ce..00000000000 --- a/libbeat/common/atomic/atomic64.go +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//go:build amd64 || arm64 || ppc64 || ppc64le || mips64 || mips64le || s390x - -package atomic - -// atomic Uint/Int for 64bit systems - -// Uint provides an architecture specific atomic uint. -type Uint struct{ a Uint64 } - -// Int provides an architecture specific atomic uint. -type Int struct{ a Int64 } - -func MakeUint(v uint) Uint { return Uint{MakeUint64(uint64(v))} } -func NewUint(v uint) *Uint { return &Uint{MakeUint64(uint64(v))} } -func (u *Uint) Load() uint { return uint(u.a.Load()) } -func (u *Uint) Store(v uint) { u.a.Store(uint64(v)) } -func (u *Uint) Swap(new uint) uint { return uint(u.a.Swap(uint64(new))) } -func (u *Uint) Add(delta uint) uint { return uint(u.a.Add(uint64(delta))) } -func (u *Uint) Sub(delta uint) uint { return uint(u.a.Add(uint64(-delta))) } -func (u *Uint) Inc() uint { return uint(u.a.Inc()) } -func (u *Uint) Dec() uint { return uint(u.a.Dec()) } -func (u *Uint) CAS(old, new uint) bool { return u.a.CAS(uint64(old), uint64(new)) } - -func MakeInt(v int) Int { return Int{MakeInt64(int64(v))} } -func NewInt(v int) *Int { return &Int{MakeInt64(int64(v))} } -func (i *Int) Load() int { return int(i.a.Load()) } -func (i *Int) Store(v int) { i.a.Store(int64(v)) } -func (i *Int) Swap(new int) int { return int(i.a.Swap(int64(new))) } -func (i *Int) Add(delta int) int { return int(i.a.Add(int64(delta))) } -func (i *Int) Sub(delta int) int { return int(i.a.Add(int64(-delta))) } -func (i *Int) Inc() int { return int(i.a.Inc()) } -func (i *Int) Dec() int { return int(i.a.Dec()) } -func (i *Int) CAS(old, new int) bool { return i.a.CAS(int64(old), int64(new)) } diff --git a/libbeat/common/atomic/atomic_test.go b/libbeat/common/atomic/atomic_test.go deleted file mode 100644 index f4df6fc4d89..00000000000 --- a/libbeat/common/atomic/atomic_test.go +++ /dev/null @@ -1,273 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package atomic - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAtomicBool(t *testing.T) { - assert := assert.New(t) - - var b Bool - assert.False(b.Load(), "check zero value is false") - - b = MakeBool(true) - assert.True(b.Load(), "check value initializer with 'true' value") - - b.Store(false) - assert.False(b.Load(), "check store to false") - - old := b.Swap(true) - assert.False(old, "check old value of swap operation is 'false'") - assert.True(b.Load(), "check new value after swap is 'true'") - - old = b.Swap(false) - assert.True(old, "check old value of second swap operation is 'true'") - assert.False(b.Load(), "check new value after second swap is 'false'") - - ok := b.CAS(true, true) - assert.False(ok, "check CAS fails with wrong 'old' value") - assert.False(b.Load(), "check failed CAS did not change value 'false'") - - ok = b.CAS(false, true) - assert.True(ok, "check CAS succeeds with correct 'old' value") - assert.True(b.Load(), "check CAS did change value to 'true'") -} - -func TestAtomicInt32(t *testing.T) { - assert := assert.New(t) - check := func(expected, actual int32, msg string) { - assert.Equal(expected, actual, msg) - } - - var v Int32 - check(0, v.Load(), "check zero value") - - v = MakeInt32(23) - check(23, v.Load(), "check value initializer") - - v.Store(42) - check(42, v.Load(), "check store new value") - - new := v.Inc() - check(43, new, "check increment returns new value") - check(43, v.Load(), "check increment did store new value") - - new = v.Dec() - check(42, new, "check decrement returns new value") - check(42, v.Load(), "check decrement did store new value") - - new = v.Add(8) - check(50, new, "check add returns new value") - check(50, v.Load(), "check add did store new value") - - new = v.Sub(8) - check(42, new, "check sub returns new value") - check(42, v.Load(), "check sub did store new value") - - old := v.Swap(101) - check(42, old, "check swap returns old value") - check(101, v.Load(), "check swap stores new value") - - ok := v.CAS(0, 23) - assert.False(ok, "check CAS with wrong old value fails") - check(101, v.Load(), "check failed CAS did not change value") - - ok = v.CAS(101, 23) - assert.True(ok, "check CAS succeeds") - check(23, v.Load(), "check CAS did store new value") -} - -func TestAtomicInt64(t *testing.T) { - assert := assert.New(t) - check := func(expected, actual int64, msg string) { - assert.Equal(expected, actual, msg) - } - - var v Int64 - check(0, v.Load(), "check zero value") - - v = MakeInt64(23) - check(23, v.Load(), "check value initializer") - - v.Store(42) - check(42, v.Load(), "check store new value") - - new := v.Inc() - check(43, new, "check increment returns new value") - check(43, v.Load(), "check increment did store new value") - - new = v.Dec() - check(42, new, "check decrement returns new value") - check(42, v.Load(), "check decrement did store new value") - - new = v.Add(8) - check(50, new, "check add returns new value") - check(50, v.Load(), "check add did store new value") - - new = v.Sub(8) - check(42, new, "check sub returns new value") - check(42, v.Load(), "check sub did store new value") - - old := v.Swap(101) - check(42, old, "check swap returns old value") - check(101, v.Load(), "check swap stores new value") - - ok := v.CAS(0, 23) - assert.False(ok, "check CAS with wrong old value fails") - check(101, v.Load(), "check failed CAS did not change value") - - ok = v.CAS(101, 23) - assert.True(ok, "check CAS succeeds") - check(23, v.Load(), "check CAS did store new value") -} - -func TestAtomicUint32(t *testing.T) { - assert := assert.New(t) - check := func(expected, actual uint32, msg string) { - assert.Equal(expected, actual, msg) - } - - var v Uint32 - check(0, v.Load(), "check zero value") - - v = MakeUint32(23) - check(23, v.Load(), "check value initializer") - - v.Store(42) - check(42, v.Load(), "check store new value") - - new := v.Inc() - check(43, new, "check increment returns new value") - check(43, v.Load(), "check increment did store new value") - - new = v.Dec() - check(42, new, "check decrement returns new value") - check(42, v.Load(), "check decrement did store new value") - - new = v.Add(8) - check(50, new, "check add returns new value") - check(50, v.Load(), "check add did store new value") - - new = v.Sub(8) - check(42, new, "check sub returns new value") - check(42, v.Load(), "check sub did store new value") - - old := v.Swap(101) - check(42, old, "check swap returns old value") - check(101, v.Load(), "check swap stores new value") - - ok := v.CAS(0, 23) - assert.False(ok, "check CAS with wrong old value fails") - check(101, v.Load(), "check failed CAS did not change value") - - ok = v.CAS(101, 23) - assert.True(ok, "check CAS succeeds") - check(23, v.Load(), "check CAS did store new value") -} - -func TestAtomicUint64(t *testing.T) { - assert := assert.New(t) - check := func(expected, actual uint64, msg string) { - assert.Equal(expected, actual, msg) - } - - var v Uint64 - check(0, v.Load(), "check zero value") - - v = MakeUint64(23) - check(23, v.Load(), "check value initializer") - - v.Store(42) - check(42, v.Load(), "check store new value") - - new := v.Inc() - check(43, new, "check increment returns new value") - check(43, v.Load(), "check increment did store new value") - - new = v.Dec() - check(42, new, "check decrement returns new value") - check(42, v.Load(), "check decrement did store new value") - - new = v.Add(8) - check(50, new, "check add returns new value") - check(50, v.Load(), "check add did store new value") - - new = v.Sub(8) - check(42, new, "check sub returns new value") - check(42, v.Load(), "check sub did store new value") - - old := v.Swap(101) - check(42, old, "check swap returns old value") - check(101, v.Load(), "check swap stores new value") - - ok := v.CAS(0, 23) - assert.False(ok, "check CAS with wrong old value fails") - check(101, v.Load(), "check failed CAS did not change value") - - ok = v.CAS(101, 23) - assert.True(ok, "check CAS succeeds") - check(23, v.Load(), "check CAS did store new value") -} - -func TestAtomicUint(t *testing.T) { - assert := assert.New(t) - check := func(expected, actual uint, msg string) { - assert.Equal(expected, actual, msg) - } - - var v Uint - check(0, v.Load(), "check zero value") - - v = MakeUint(23) - check(23, v.Load(), "check value initializer") - - v.Store(42) - check(42, v.Load(), "check store new value") - - new := v.Inc() - check(43, new, "check increment returns new value") - check(43, v.Load(), "check increment did store new value") - - new = v.Dec() - check(42, new, "check decrement returns new value") - check(42, v.Load(), "check decrement did store new value") - - new = v.Add(8) - check(50, new, "check add returns new value") - check(50, v.Load(), "check add did store new value") - - new = v.Sub(8) - check(42, new, "check sub returns new value") - check(42, v.Load(), "check sub did store new value") - - old := v.Swap(101) - check(42, old, "check swap returns old value") - check(101, v.Load(), "check swap stores new value") - - ok := v.CAS(0, 23) - assert.False(ok, "check CAS with wrong old value fails") - check(101, v.Load(), "check failed CAS did not change value") - - ok = v.CAS(101, 23) - assert.True(ok, "check CAS succeeds") - check(23, v.Load(), "check CAS did store new value") -} diff --git a/libbeat/idxmgmt/index_support.go b/libbeat/idxmgmt/index_support.go index 4526d916c35..57816b3625c 100644 --- a/libbeat/idxmgmt/index_support.go +++ b/libbeat/idxmgmt/index_support.go @@ -21,10 +21,10 @@ import ( "errors" "fmt" "strings" + "sync/atomic" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outil" @@ -314,7 +314,7 @@ func (m *indexManager) setupWithILM() (bool, error) { } if withILM { // mark ILM as enabled in indexState - m.support.st.withILM.CAS(false, true) + m.support.st.withILM.CompareAndSwap(false, true) } } return withILM, nil diff --git a/libbeat/management/agent.go b/libbeat/management/agent.go index 84f5a1e9f6b..cd14f8cb969 100644 --- a/libbeat/management/agent.go +++ b/libbeat/management/agent.go @@ -17,17 +17,15 @@ package management -import ( - "github.com/elastic/beats/v7/libbeat/common/atomic" -) +import "sync/atomic" var ( // underAgent is set to true with this beat is being ran under the elastic-agent - underAgent = atomic.MakeBool(false) + underAgent atomic.Bool // underAgentTrace is set to true when the elastic-agent has placed this beat into // trace mode (which enables logging of published events) - underAgentTrace = atomic.MakeBool(false) + underAgentTrace atomic.Bool ) // SetUnderAgent sets that the processing pipeline is being ran under the elastic-agent. diff --git a/libbeat/outputs/elasticsearch/client_proxy_test.go b/libbeat/outputs/elasticsearch/client_proxy_test.go index bd6739c3bf0..b0cef282487 100644 --- a/libbeat/outputs/elasticsearch/client_proxy_test.go +++ b/libbeat/outputs/elasticsearch/client_proxy_test.go @@ -29,12 +29,12 @@ import ( "net/url" "os" "os/exec" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/elastic-agent-libs/transport/httpcommon" @@ -224,17 +224,17 @@ type serverState struct { serverURL string proxyURL string - _serverRequestCount atomic.Int // Requests directly to the server - _proxyRequestCount atomic.Int // Requests via the proxy + _serverRequestCount atomic.Int64 // Requests directly to the server + _proxyRequestCount atomic.Int64 // Requests via the proxy } // Convenience functions to unwrap the atomic primitives -func (s serverState) serverRequestCount() int { - return s._serverRequestCount.Load() +func (s *serverState) serverRequestCount() int { + return int(s._serverRequestCount.Load()) } -func (s serverState) proxyRequestCount() int { - return s._proxyRequestCount.Load() +func (s *serverState) proxyRequestCount() int { + return int(s._proxyRequestCount.Load()) } // startServers starts endpoints representing a backend server and a proxy, @@ -246,13 +246,13 @@ func startServers(t *testing.T) (*serverState, func()) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, headerTestValue, r.Header.Get(headerTestField)) fmt.Fprintln(w, "Hello, client") - state._serverRequestCount.Inc() + state._serverRequestCount.Add(1) })) proxy := httptest.NewServer( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, headerTestValue, r.Header.Get(headerTestField)) fmt.Fprintln(w, "Hello, client") - state._proxyRequestCount.Inc() + state._proxyRequestCount.Add(1) })) state.serverURL = server.URL state.proxyURL = proxy.URL diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index a980d1cef32..f0da5e45943 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -22,10 +22,10 @@ import ( "errors" "net" "sync" + "sync/atomic" "time" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/elastic-agent-libs/logp" @@ -147,13 +147,13 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { ref := &msgRef{ client: c, - count: atomic.MakeUint32(1), batch: batch, slice: events, batchSize: len(events), win: c.win, err: nil, } + ref.count.Store(1) defer ref.dec() for len(events) > 0 { @@ -218,7 +218,7 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { for i := range events { window[i] = &events[i].Content } - ref.count.Inc() + ref.count.Add(1) return client.Send(ref.callback, window) } @@ -261,7 +261,7 @@ func (r *msgRef) fail(n uint32, err error) { } func (r *msgRef) dec() { - i := r.count.Dec() + i := r.count.Add(^uint32(0)) if i > 0 { return } diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 2385e5f99de..78e67f8e4d7 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -22,11 +22,11 @@ import ( "fmt" "reflect" "strconv" + "sync/atomic" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/processors" jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" conf "github.com/elastic/elastic-agent-libs/config" @@ -131,7 +131,7 @@ func NewWithConfig(opts ...ConfigOption) (beat.Processor, error) { func newProcessMetadataProcessorWithProvider(config config, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) { // Logging (each processor instance has a unique ID). var ( - id = int(instanceID.Inc()) + id = int(instanceID.Add(1)) log = logp.NewLogger(processorName).With("instance_id", id) ) diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index a7ce4876f50..45c1e41872e 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -22,10 +22,10 @@ import ( "errors" "fmt" "os" + "sync/atomic" "time" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/processors" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -67,7 +67,7 @@ func New(cfg *conf.C) (beat.Processor, error) { return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) } // Logging (each processor instance has a unique ID). - id := int(instanceID.Inc()) + id := int(instanceID.Add(1)) log := logp.NewLogger(name).With("instance_id", id) src, cancel, err := getStoreFor(config, log) diff --git a/libbeat/processors/dns/dns.go b/libbeat/processors/dns/dns.go index d4f3d2ba57b..48b08ab671c 100644 --- a/libbeat/processors/dns/dns.go +++ b/libbeat/processors/dns/dns.go @@ -22,9 +22,9 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/processors" jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" conf "github.com/elastic/elastic-agent-libs/config" @@ -36,7 +36,7 @@ import ( const logName = "processor.dns" // instanceID is used to assign each instance a unique monitoring namespace. -var instanceID = atomic.MakeUint32(0) +var instanceID atomic.Uint32 func init() { processors.RegisterPlugin("dns", New) @@ -58,7 +58,7 @@ func New(cfg *conf.C) (beat.Processor, error) { // Logging and metrics (each processor instance has a unique ID). var ( - id = int(instanceID.Inc()) + id = int(instanceID.Add(1)) log = logp.NewLogger(logName).With("instance_id", id) metrics = monitoring.Default.NewRegistry(logName+"."+strconv.Itoa(id), monitoring.DoNotReport) ) diff --git a/libbeat/processors/ratelimit/rate_limit.go b/libbeat/processors/ratelimit/rate_limit.go index f558b076e2b..92d9b4c37a0 100644 --- a/libbeat/processors/ratelimit/rate_limit.go +++ b/libbeat/processors/ratelimit/rate_limit.go @@ -22,12 +22,12 @@ import ( "fmt" "sort" "strconv" + "sync/atomic" "github.com/jonboulle/clockwork" "github.com/mitchellh/hashstructure" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/processors" c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -36,7 +36,7 @@ import ( ) // instanceID is used to assign each instance a unique monitoring namespace. -var instanceID = atomic.MakeUint32(0) +var instanceID atomic.Uint32 const processorName = "rate_limit" const logName = "processor." + processorName @@ -79,7 +79,7 @@ func new(cfg *c.C) (beat.Processor, error) { // Logging and metrics (each processor instance has a unique ID). var ( - id = int(instanceID.Inc()) + id = int(instanceID.Add(1)) log = logp.NewLogger(logName).With("instance_id", id) reg = monitoring.Default.NewRegistry(logName+"."+strconv.Itoa(id), monitoring.DoNotReport) ) diff --git a/libbeat/processors/ratelimit/token_bucket.go b/libbeat/processors/ratelimit/token_bucket.go index 1f1381fd8df..1e84f799b98 100644 --- a/libbeat/processors/ratelimit/token_bucket.go +++ b/libbeat/processors/ratelimit/token_bucket.go @@ -20,13 +20,13 @@ package ratelimit import ( "fmt" "sync" + "sync/atomic" "time" "github.com/jonboulle/clockwork" "github.com/elastic/go-concert/unison" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/logp" ) @@ -50,7 +50,7 @@ type tokenBucket struct { gc struct { thresholds tokenBucketGCConfig metrics struct { - numCalls atomic.Uint + numCalls atomic.Uint64 } } @@ -93,7 +93,7 @@ func newTokenBucket(config algoConfig) (algorithm, error) { gc: struct { thresholds tokenBucketGCConfig metrics struct { - numCalls atomic.Uint + numCalls atomic.Uint64 } }{ thresholds: tokenBucketGCConfig{ @@ -112,7 +112,7 @@ func (t *tokenBucket) IsAllowed(key uint64) bool { b := t.getBucket(key) allowed := b.withdraw() - t.gc.metrics.numCalls.Inc() + t.gc.metrics.numCalls.Add(1) return allowed } @@ -126,6 +126,7 @@ func (t *tokenBucket) getBucket(key uint64) *bucket { tokens: t.depth, lastReplenish: t.clock.Now(), }) + //nolint:errcheck // ignore b := v.(*bucket) if exists { @@ -154,7 +155,7 @@ func (b *bucket) replenish(rate rate, clock clockwork.Clock) { func (t *tokenBucket) runGC() { // Don't run GC if thresholds haven't been crossed. - if t.gc.metrics.numCalls.Load() < t.gc.thresholds.NumCalls { + if t.gc.metrics.numCalls.Load() < uint64(t.gc.thresholds.NumCalls) { return } @@ -171,7 +172,9 @@ func (t *tokenBucket) runGC() { toDelete := make([]uint64, 0) numBucketsBefore := 0 t.buckets.Range(func(k, v interface{}) bool { + //nolint:errcheck // ignore key := k.(uint64) + //nolint:errcheck // ignore b := v.(*bucket) b.replenish(t.limit, t.clock) @@ -190,9 +193,9 @@ func (t *tokenBucket) runGC() { } // Reset GC metrics - t.gc.metrics.numCalls = atomic.MakeUint(0) + t.gc.metrics.numCalls = atomic.Uint64{} - gcDuration := time.Now().Sub(gcStartTime) + gcDuration := time.Since(gcStartTime) numBucketsDeleted := len(toDelete) numBucketsAfter := numBucketsBefore - numBucketsDeleted t.logger.Debugf("gc duration: %v, buckets: (before: %v, deleted: %v, after: %v)", diff --git a/libbeat/processors/syslog/syslog.go b/libbeat/processors/syslog/syslog.go index 96c21d3d773..9a6e71fcca3 100644 --- a/libbeat/processors/syslog/syslog.go +++ b/libbeat/processors/syslog/syslog.go @@ -22,9 +22,9 @@ import ( "errors" "fmt" "strconv" + "sync/atomic" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/cfgtype" "github.com/elastic/beats/v7/libbeat/common/jsontransform" "github.com/elastic/beats/v7/libbeat/processors" @@ -43,7 +43,7 @@ const ( ) // instanceID is used to assign each instance a unique monitoring namespace. -var instanceID = atomic.MakeUint32(0) +var instanceID atomic.Uint32 // config defines the configuration for this processor. type config struct { @@ -114,7 +114,7 @@ func New(c *conf.C) (beat.Processor, error) { return nil, fmt.Errorf("fail to unpack the "+procName+" processor configuration: %w", err) } - id := int(instanceID.Inc()) + id := int(instanceID.Add(1)) log := logp.NewLogger(logName).With("instance_id", id) registryName := logName + "." + strconv.Itoa(id) diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index af756213a63..3048ec4a001 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -19,10 +19,10 @@ package pipeline import ( "sync" + "sync/atomic" "time" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" @@ -41,8 +41,7 @@ type client struct { canDrop bool // Open state, signaling, and sync primitives for coordinating client Close. - isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. - closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once + isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore. observer observer eventListener beat.EventListener @@ -204,12 +203,12 @@ func newClientCloseWaiter(timeout time.Duration) *clientCloseWaiter { func (w *clientCloseWaiter) AddEvent(_ beat.Event, published bool) { if published { - w.events.Inc() + w.events.Add(1) } } func (w *clientCloseWaiter) ACKEvents(n int) { - value := w.events.Sub(uint32(n)) + value := w.events.Add(^uint32(n - 1)) if value != 0 { return } diff --git a/libbeat/publisher/pipeline/client_worker_test.go b/libbeat/publisher/pipeline/client_worker_test.go index 97692b2aada..ed97cd11ac8 100644 --- a/libbeat/publisher/pipeline/client_worker_test.go +++ b/libbeat/publisher/pipeline/client_worker_test.go @@ -22,6 +22,7 @@ import ( "math" "strings" "sync" + "sync/atomic" "testing" "testing/quick" "time" @@ -30,7 +31,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/internal/testutil" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" @@ -48,7 +48,7 @@ func TestMakeClientWorker(t *testing.T) { err := quick.Check(func(i uint) bool { numBatches := 300 + (i % 100) // between 300 and 399 - var numEvents uint + var numEvents uint64 logger := makeBufLogger(t) @@ -56,9 +56,9 @@ func TestMakeClientWorker(t *testing.T) { retryer := newStandaloneRetryer(workQueue) defer retryer.close() - var published atomic.Uint + var published atomic.Uint64 publishFn := func(batch publisher.Batch) error { - published.Add(uint(len(batch.Events()))) + published.Add(uint64(len(batch.Events()))) return nil } @@ -69,7 +69,7 @@ func TestMakeClientWorker(t *testing.T) { for i := uint(0); i < numBatches; i++ { batch := randomBatch(50, 150).withRetryer(retryer) - numEvents += uint(len(batch.Events())) + numEvents += uint64(len(batch.Events())) workQueue <- batch } @@ -82,7 +82,7 @@ func TestMakeClientWorker(t *testing.T) { }) if !success { logger.Flush() - t.Logf("numBatches = %v, numEvents = %v, published = %v", numBatches, numEvents, published) + t.Logf("numBatches = %v, numEvents = %v, published = %v", numBatches, numEvents, published.Load()) } return success }, nil) @@ -140,9 +140,9 @@ func TestReplaceClientWorker(t *testing.T) { }() // Publish at least 1 batch worth of events but no more than 20% events - publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents)*0.2)) + publishLimit := uint64(math.Max(minEventsInBatch, float64(numEvents)*0.2)) - var publishedFirst atomic.Uint + var publishedFirst atomic.Uint64 blockCtrl := make(chan struct{}) blockingPublishFn := func(batch publisher.Batch) error { // Emulate blocking. Upon unblocking the in-flight batch that was @@ -152,7 +152,7 @@ func TestReplaceClientWorker(t *testing.T) { } count := len(batch.Events()) - publishedFirst.Add(uint(count)) + publishedFirst.Add(uint64(count)) t.Logf("#1 processed batch: %v (%v)", batch.(*mockBatch).events[0].Content.Private, count) return nil } @@ -176,10 +176,10 @@ func TestReplaceClientWorker(t *testing.T) { close(blockCtrl) // Start new worker to drain work queue - var publishedLater atomic.Uint + var publishedLater atomic.Uint64 countingPublishFn := func(batch publisher.Batch) error { count := len(batch.Events()) - publishedLater.Add(uint(count)) + publishedLater.Add(uint64(count)) t.Logf("#2 processed batch: %v (%v)", batch.(*mockBatch).events[0].Content.Private, count) return nil } @@ -212,7 +212,7 @@ func TestMakeClientTracer(t *testing.T) { testutil.SeedPRNG(t) numBatches := 10 - var numEvents uint + var numEvents uint64 logger := makeBufLogger(t) @@ -220,9 +220,9 @@ func TestMakeClientTracer(t *testing.T) { retryer := newStandaloneRetryer(workQueue) defer retryer.close() - var published atomic.Uint + var published atomic.Uint64 publishFn := func(batch publisher.Batch) error { - published.Add(uint(len(batch.Events()))) + published.Add(uint64(len(batch.Events()))) return nil } @@ -236,7 +236,7 @@ func TestMakeClientTracer(t *testing.T) { for i := 0; i < numBatches; i++ { batch := randomBatch(10, 15).withRetryer(retryer) - numEvents += uint(len(batch.Events())) + numEvents += uint64(len(batch.Events())) workQueue <- batch } @@ -248,7 +248,7 @@ func TestMakeClientTracer(t *testing.T) { return numEvents == published.Load() }) if !matches { - t.Errorf("expected %d events, got %d", numEvents, published) + t.Errorf("expected %d events, got %d", numEvents, published.Load()) } recorder.Flush(nil) diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 706c159e3d4..5e48fbb79b8 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -20,12 +20,12 @@ package pipeline import ( "fmt" "sync" + "sync/atomic" "testing" "testing/quick" "time" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/internal/testutil" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" @@ -64,9 +64,9 @@ func TestOutputReload(t *testing.T) { fmt.Sprintf("mem.events: %v", numEventsToPublish)) _ = queueConfig.Unpack(conf) - var publishedCount atomic.Uint + var publishedCount atomic.Uint64 countingPublishFn := func(batch publisher.Batch) error { - publishedCount.Add(uint(len(batch.Events()))) + publishedCount.Add(uint64(len(batch.Events()))) return nil } @@ -108,7 +108,7 @@ func TestOutputReload(t *testing.T) { timeout := 20 * time.Second return waitUntilTrue(timeout, func() bool { - return numEventsToPublish == publishedCount.Load() + return uint64(numEventsToPublish) == publishedCount.Load() }) }, &quick.Config{MaxCount: 25}) @@ -222,18 +222,19 @@ func TestQueueProducerBlocksUntilOutputIsSet(t *testing.T) { // block, because there is no queue, but they should become unblocked // once we set a nonempty output. const producerCount = 10 - remaining := atomic.MakeInt(producerCount) + var remaining atomic.Int64 + remaining.Store(producerCount) for i := 0; i < producerCount; i++ { go func() { controller.queueProducer(queue.ProducerConfig{}) - remaining.Dec() + remaining.Add(-1) }() } allStarted := waitUntilTrue(time.Second, func() bool { return len(controller.pendingRequests) == producerCount }) assert.True(t, allStarted, "All queueProducer requests should be saved as pending requests by outputController") - assert.Equal(t, producerCount, remaining.Load(), "No queueProducer request should return before an output is set") + assert.Equal(t, int64(producerCount), remaining.Load(), "No queueProducer request should return before an output is set") // Set the output, then ensure that it unblocks all the waiting goroutines. controller.Set(outputs.Group{ diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index a5a13a0584e..6297f7b7ee6 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher" @@ -211,13 +210,13 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { client := &client{ logger: p.monitors.Logger, - isOpen: atomic.MakeBool(true), clientListener: cfg.ClientListener, processors: processors, eventFlags: eventFlags, canDrop: canDrop, observer: p.observer, } + client.isOpen.Store(true) ackHandler := cfg.EventListener diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index a8cf34b895a..bd374c3c87b 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -20,10 +20,10 @@ package pipeline import ( "runtime" "sync" + "sync/atomic" "testing" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/tests/resources" "github.com/elastic/elastic-agent-libs/mapstr" @@ -79,7 +79,7 @@ func TestPipelineAcceptsAnyNumberOfClients(t *testing.T) { // close method is called, this ID is returned func makeDiscardQueue() queue.Queue { var wg sync.WaitGroup - producerID := atomic.NewInt(0) + var producerID atomic.Int64 return &testQueue{ close: func() error { @@ -92,7 +92,7 @@ func makeDiscardQueue() queue.Queue { }, producer: func(cfg queue.ProducerConfig) queue.Producer { - producerID.Inc() + producerID.Add(1) // count is a counter that increments on every published event // it's also the returned Event ID @@ -231,11 +231,11 @@ func makeTestQueue() queue.Queue { func blockingProducer(_ queue.ProducerConfig) queue.Producer { sig := make(chan struct{}) - waiting := atomic.MakeInt(0) + var waiting atomic.Int64 return &testProducer{ publish: func(_ bool, _ queue.Entry) (queue.EntryID, bool) { - waiting.Inc() + waiting.Add(1) <-sig return 0, false }, diff --git a/libbeat/publisher/pipeline/stress/gen.go b/libbeat/publisher/pipeline/stress/gen.go index 2a4d8c72ef0..7fdf17bd27b 100644 --- a/libbeat/publisher/pipeline/stress/gen.go +++ b/libbeat/publisher/pipeline/stress/gen.go @@ -22,6 +22,7 @@ import ( "fmt" "runtime/pprof" "sync" + "sync/atomic" "time" "github.com/elastic/elastic-agent-libs/logp" @@ -29,7 +30,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" - "github.com/elastic/beats/v7/libbeat/common/atomic" ) type generateConfig struct { @@ -97,7 +97,7 @@ func generate( done := make(chan struct{}) defer close(done) - count := atomic.MakeUint64(0) + var count atomic.Uint64 var wg sync.WaitGroup defer wg.Wait() @@ -148,7 +148,7 @@ func generate( Fields: mapstr.M{ "id": id, "hello": "world", - "count": count, + "count": count.Load(), // TODO: more custom event generation? }, @@ -156,7 +156,7 @@ func generate( client.Publish(event) - total := count.Inc() + total := count.Add(1) if config.MaxEvents > 0 && total == config.MaxEvents { break } diff --git a/libbeat/publisher/pipeline/stress/sig.go b/libbeat/publisher/pipeline/stress/sig.go index 537fc9c633b..d6d03f0c3d1 100644 --- a/libbeat/publisher/pipeline/stress/sig.go +++ b/libbeat/publisher/pipeline/stress/sig.go @@ -17,7 +17,7 @@ package stress -import "github.com/elastic/beats/v7/libbeat/common/atomic" +import "sync/atomic" type closeSignaler struct { active atomic.Bool @@ -25,14 +25,15 @@ type closeSignaler struct { } func newCloseSignaler() *closeSignaler { - return &closeSignaler{ - active: atomic.MakeBool(true), - done: make(chan struct{}), + cs := &closeSignaler{ + done: make(chan struct{}), } + cs.active.Store(true) + return cs } func (s *closeSignaler) Close() { - if act := s.active.Swap(false); act { + if s.active.Swap(false) { close(s.done) } } diff --git a/libbeat/publisher/testing/connector.go b/libbeat/publisher/testing/connector.go index ddf48cc126f..967f86ebe28 100644 --- a/libbeat/publisher/testing/connector.go +++ b/libbeat/publisher/testing/connector.go @@ -18,15 +18,16 @@ package testing import ( + "sync/atomic" + "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" ) // ClientCounter can be used to create a beat.PipelineConnector that count // pipeline connects and disconnects. type ClientCounter struct { - total atomic.Int - active atomic.Int + total atomic.Int64 + active atomic.Int64 } // FakeConnector implements the beat.PipelineConnector interface. @@ -111,21 +112,21 @@ func ChClient(ch chan beat.Event) beat.Client { } // Active returns the number of currently active connections. -func (c *ClientCounter) Active() int { return c.active.Load() } +func (c *ClientCounter) Active() int { return int(c.active.Load()) } // Total returns the total number of calls to Connect. -func (c *ClientCounter) Total() int { return c.total.Load() } +func (c *ClientCounter) Total() int { return int(c.total.Load()) } // BuildConnector create a pipeline that updates the active and tocal // connection counters on Connect and Close calls. func (c *ClientCounter) BuildConnector() beat.PipelineConnector { return FakeConnector{ ConnectFunc: func(_ beat.ClientConfig) (beat.Client, error) { - c.total.Inc() - c.active.Inc() + c.total.Add(1) + c.active.Add(1) return &FakeClient{ CloseFunc: func() error { - c.active.Dec() + c.active.Add(-1) return nil }, }, nil diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 904fc1e302a..aa05d6ff0e6 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -37,13 +37,12 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/require" - - "github.com/elastic/beats/v7/libbeat/common/atomic" ) type BeatProc struct { @@ -189,7 +188,7 @@ func (b *BeatProc) Start(args ...string) { b.fullPath = fullPath b.Args = append(b.baseArgs, args...) - done := atomic.MakeBool(false) + var done atomic.Bool wg := sync.WaitGroup{} if b.RestartOnBeatOnExit { wg.Add(1) diff --git a/metricbeat/mb/module/runner_group_test.go b/metricbeat/mb/module/runner_group_test.go index 1d462359968..2cd6e6cc8b5 100644 --- a/metricbeat/mb/module/runner_group_test.go +++ b/metricbeat/mb/module/runner_group_test.go @@ -19,13 +19,13 @@ package module import ( "fmt" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/cfgfile" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/common/diagnostics" ) @@ -55,19 +55,19 @@ func (fr *fakeRunnerDiag) Diagnostics() []diagnostics.DiagnosticSetup { type fakeRunner struct { id int - startCounter *atomic.Int - stopCounter *atomic.Int + startCounter *atomic.Int64 + stopCounter *atomic.Int64 } func (fr *fakeRunner) Start() { if fr.startCounter != nil { - fr.startCounter.Inc() + fr.startCounter.Add(1) } } func (fr *fakeRunner) Stop() { if fr.stopCounter != nil { - fr.stopCounter.Inc() + fr.stopCounter.Add(1) } } @@ -76,15 +76,14 @@ func (fr *fakeRunner) String() string { } func TestStartStop(t *testing.T) { - startCounter := atomic.NewInt(0) - stopCounter := atomic.NewInt(0) + var startCounter, stopCounter atomic.Int64 runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, - startCounter: startCounter, - stopCounter: stopCounter, + startCounter: &startCounter, + stopCounter: &stopCounter, }) } @@ -93,8 +92,8 @@ func TestStartStop(t *testing.T) { runnerGroup.Stop() - assert.Equal(t, fakeRunnersNum, startCounter.Load()) - assert.Equal(t, fakeRunnersNum, stopCounter.Load()) + assert.Equal(t, int64(fakeRunnersNum), startCounter.Load()) + assert.Equal(t, int64(fakeRunnersNum), stopCounter.Load()) } func TestDiagnosticsUnsupported(t *testing.T) { @@ -102,8 +101,8 @@ func TestDiagnosticsUnsupported(t *testing.T) { for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, - startCounter: atomic.NewInt(0), - stopCounter: atomic.NewInt(0), + startCounter: &atomic.Int64{}, + stopCounter: &atomic.Int64{}, }) } diff --git a/packetbeat/sniffer/sniffer.go b/packetbeat/sniffer/sniffer.go index d8043032a64..24c77180b46 100644 --- a/packetbeat/sniffer/sniffer.go +++ b/packetbeat/sniffer/sniffer.go @@ -25,6 +25,7 @@ import ( "os" "runtime" "strings" + "sync/atomic" "time" "github.com/google/gopacket" @@ -33,7 +34,6 @@ import ( "github.com/google/gopacket/pcapgo" "golang.org/x/sync/errgroup" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/beats/v7/packetbeat/config" @@ -51,7 +51,7 @@ type Sniffer struct { type sniffer struct { config config.InterfaceConfig - state atomic.Int32 // store snifferState + state *atomic.Int32 // store snifferState // device is the first active device after calling New. // It is not updated by default route polling. @@ -103,13 +103,14 @@ func New(id string, testMode bool, _ string, decoders map[string]Decoders, inter return nil, fmt.Errorf("no decoder for %s", iface.Device) } child := sniffer{ - state: atomic.MakeInt32(snifferInactive), + state: &atomic.Int32{}, followDefault: iface.PollDefaultRoute > 0 && strings.HasPrefix(iface.Device, "default_route"), id: id, idx: i, decoders: dec, log: s.log, } + child.state.Store(snifferInactive) s.log.Debugf("interface: %d, BPF filter: '%s'", i, iface.BpfFilter) @@ -373,7 +374,7 @@ func (s *sniffer) sniffHandle(ctx context.Context, handle snifferHandle, dec *de // Mark inactive sniffer as active. In case of the sniffer/packetbeat closing // before/while Run is executed, the state will be snifferClosing. // => return if state is already snifferClosing. - if !s.state.CAS(snifferInactive, snifferActive) { + if !s.state.CompareAndSwap(snifferInactive, snifferActive) { return nil } defer s.state.Store(snifferInactive) diff --git a/winlogbeat/beater/acker.go b/winlogbeat/beater/acker.go index e9cc2e77c33..d35a0166484 100644 --- a/winlogbeat/beater/acker.go +++ b/winlogbeat/beater/acker.go @@ -20,20 +20,20 @@ package beater import ( "context" "sync" + "sync/atomic" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/winlogbeat/checkpoint" ) type eventACKer struct { - active *atomic.Int + active *atomic.Int64 wg *sync.WaitGroup checkpoint *checkpoint.Checkpoint } func newEventACKer(checkpoint *checkpoint.Checkpoint) *eventACKer { return &eventACKer{ - active: atomic.NewInt(0), + active: &atomic.Int64{}, wg: &sync.WaitGroup{}, checkpoint: checkpoint, } @@ -55,7 +55,7 @@ func (a *eventACKer) ACKEvents(data []interface{}) { } // Mark events as done (subtract). - a.active.Add(-1 * len(data)) + a.active.Add(-1 * int64(len(data))) a.wg.Add(-1 * len(data)) } @@ -71,11 +71,11 @@ func (a *eventACKer) Wait(ctx context.Context) { // Add adds to the number of active events. func (a *eventACKer) Add(delta int) { - a.active.Add(delta) + a.active.Add(int64(delta)) a.wg.Add(delta) } // Active returns the number of active events (published but not yet ACKed). func (a *eventACKer) Active() int { - return a.active.Load() + return int(a.active.Load()) } diff --git a/winlogbeat/sys/wineventlog/renderer_test.go b/winlogbeat/sys/wineventlog/renderer_test.go index 339122d8a29..01089b5a112 100644 --- a/winlogbeat/sys/wineventlog/renderer_test.go +++ b/winlogbeat/sys/wineventlog/renderer_test.go @@ -26,13 +26,13 @@ import ( "runtime" "strconv" "strings" + "sync/atomic" "testing" "text/template" "time" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/winlogbeat/sys/winevent" "github.com/elastic/elastic-agent-libs/logp" ) @@ -297,7 +297,7 @@ func BenchmarkRenderer(b *testing.B) { defer itr.Close() defer r.Close() - count := atomic.NewUint64(0) + count := atomic.Uint64{} start := time.Now() b.ResetTimer() @@ -314,7 +314,7 @@ func BenchmarkRenderer(b *testing.B) { b.Fatal(err) } - count.Inc() + count.Add(1) } elapsed := time.Since(start) @@ -326,7 +326,7 @@ func BenchmarkRenderer(b *testing.B) { defer itr.Close() defer r.Close() - count := atomic.NewUint64(0) + var count atomic.Uint64 start := time.Now() b.ResetTimer() @@ -343,7 +343,7 @@ func BenchmarkRenderer(b *testing.B) { if err != nil { b.Fatal(err) } - count.Inc() + count.Add(1) } }) diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index e153d321e9f..33d3b9a513b 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -5,12 +5,12 @@ package awss3 import ( + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/elastic-agent-libs/monitoring" ) diff --git a/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go b/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go index b18229b7795..71abb10d137 100644 --- a/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go +++ b/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker.go @@ -6,10 +6,10 @@ package kvstore import ( "context" + "sync/atomic" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" - "github.com/elastic/beats/v7/libbeat/common/atomic" ) // TxTracker implements a transaction tracker. Individual beat events which @@ -17,20 +17,20 @@ import ( // ACK-ed, the pending count is decremented (see NewTxACKHandler). Calling Wait // will block until all events are ACK-ed or the parent context is cancelled. type TxTracker struct { - pending atomic.Int + pending atomic.Int64 ctx context.Context cancel context.CancelFunc } // Add increments the pending count. func (t *TxTracker) Add() { - t.pending.Inc() + t.pending.Add(1) } // Ack decrements the pending count. If pending goes to zero, then the // context on TxTracker is cancelled. func (t *TxTracker) Ack() { - if t.pending.Dec() == 0 { + if t.pending.Add(-1) == 0 { t.cancel() } } diff --git a/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker_test.go b/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker_test.go index 7b452e21ae8..7741fa1e595 100644 --- a/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker_test.go +++ b/x-pack/filebeat/input/entityanalytics/internal/kvstore/tracker_test.go @@ -15,7 +15,7 @@ import ( func TestTxTracker_Ack(t *testing.T) { txTracker := NewTxTracker(context.Background()) - txTracker.pending.Inc() + txTracker.pending.Add(1) txTracker.Ack() @@ -25,9 +25,9 @@ func TestTxTracker_Ack(t *testing.T) { func TestTxTracker_Add(t *testing.T) { txTracker := NewTxTracker(context.Background()) - require.Equal(t, 0, txTracker.pending.Load()) + require.Equal(t, int64(0), txTracker.pending.Load()) txTracker.Add() - require.Equal(t, 1, txTracker.pending.Load()) + require.Equal(t, int64(1), txTracker.pending.Load()) } func TestTxTracker_Wait(t *testing.T) { @@ -43,7 +43,7 @@ func TestTxACKHandler(t *testing.T) { handler := NewTxACKHandler() txTracker.Add() - require.Equal(t, 1, txTracker.pending.Load()) + require.Equal(t, int64(1), txTracker.pending.Load()) handler.AddEvent(beat.Event{ Private: txTracker, @@ -63,7 +63,7 @@ func TestTxACKHandler(t *testing.T) { handler := NewTxACKHandler() txTracker.Add() - require.Equal(t, 1, txTracker.pending.Load()) + require.Equal(t, int64(1), txTracker.pending.Load()) handler.AddEvent(beat.Event{ Private: txTracker, @@ -71,6 +71,6 @@ func TestTxACKHandler(t *testing.T) { txTracker.Wait() - require.Equal(t, 1, txTracker.pending.Load()) + require.Equal(t, int64(1), txTracker.pending.Load()) }) } diff --git a/x-pack/filebeat/input/entityanalytics/internal/kvstore/transaction.go b/x-pack/filebeat/input/entityanalytics/internal/kvstore/transaction.go index 7f32429a4e9..c62155d27fb 100644 --- a/x-pack/filebeat/input/entityanalytics/internal/kvstore/transaction.go +++ b/x-pack/filebeat/input/entityanalytics/internal/kvstore/transaction.go @@ -8,10 +8,9 @@ import ( "encoding/json" "errors" "fmt" + "sync/atomic" "go.etcd.io/bbolt" - - "github.com/elastic/beats/v7/libbeat/common/atomic" ) var ( @@ -126,7 +125,7 @@ func (t *Transaction) Delete(bucket, key []byte) error { // Commit will write any changes to disk. For read-only transactions, calling // Commit will route to Rollback. func (t *Transaction) Commit() error { - if !t.closed.CAS(false, true) { + if !t.closed.CompareAndSwap(false, true) { return nil } if !t.writeable { @@ -138,7 +137,7 @@ func (t *Transaction) Commit() error { // Rollback closes the transaction. For write transactions, all updates made // within the transaction will be discarded. func (t *Transaction) Rollback() error { - if !t.closed.CAS(false, true) { + if !t.closed.CompareAndSwap(false, true) { return nil } return t.tx.Rollback() diff --git a/x-pack/filebeat/input/gcppubsub/pubsub_test.go b/x-pack/filebeat/input/gcppubsub/pubsub_test.go index 7981a3ee772..16fdbf1ebbd 100644 --- a/x-pack/filebeat/input/gcppubsub/pubsub_test.go +++ b/x-pack/filebeat/input/gcppubsub/pubsub_test.go @@ -12,6 +12,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -23,7 +24,6 @@ import ( "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/tests/compose" "github.com/elastic/beats/v7/libbeat/tests/resources" conf "github.com/elastic/elastic-agent-libs/config" @@ -247,6 +247,7 @@ func runTestWithACKer(t *testing.T, cfg *conf.C, onEvent eventHandler, run func( if err != nil { t.Fatal(err) } + //nolint:errcheck // ignore pubsubInput := in.(*pubsubInput) defer pubsubInput.Stop() @@ -421,13 +422,14 @@ func TestRunStop(t *testing.T) { func TestEndToEndACK(t *testing.T) { cfg := defaultTestConfig() - var count atomic.Int + var count atomic.Int64 seen := make(map[string]struct{}) // ACK every other message halfAcker := func(ev beat.Event, clientConfig beat.ClientConfig) bool { + //nolint:errcheck // ignore msg := ev.Private.(*pubsub.Message) seen[msg.ID] = struct{}{} - if count.Inc()&1 != 0 { + if count.Add(1)&1 != 0 { // Nack will result in the Message being redelivered more quickly than if it were allowed to expire. msg.Nack() return false @@ -453,6 +455,7 @@ func TestEndToEndACK(t *testing.T) { assert.Len(t, events, len(seen)) got := make(map[string]struct{}) for _, ev := range events { + //nolint:errcheck // ignore msg := ev.Private.(*pubsub.Message) got[msg.ID] = struct{}{} } diff --git a/x-pack/filebeat/input/netflow/decoder/atomic/bool.go b/x-pack/filebeat/input/netflow/decoder/atomic/bool.go deleted file mode 100644 index b294cc6c395..00000000000 --- a/x-pack/filebeat/input/netflow/decoder/atomic/bool.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package atomic - -import "sync/atomic" - -type Bool struct { - value uint32 -} - -func (b *Bool) Store(value bool) { - atomic.StoreUint32(&b.value, encodeBool(value)) -} - -func (b *Bool) CAS(old bool, new bool) (swapped bool) { - return atomic.CompareAndSwapUint32(&b.value, encodeBool(old), encodeBool(new)) -} - -func (b *Bool) Load() (value bool) { - return atomic.LoadUint32(&b.value) != 0 -} - -func encodeBool(value bool) (result uint32) { - if value { - result = 1 - } - return -} diff --git a/x-pack/filebeat/input/netflow/decoder/v9/session.go b/x-pack/filebeat/input/netflow/decoder/v9/session.go index 492576f6b96..e72fa1ab80a 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/session.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/session.go @@ -8,9 +8,9 @@ import ( "log" "net" "sync" + "sync/atomic" "time" - "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/atomic" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template" ) @@ -83,7 +83,7 @@ func (s *SessionState) ExpireTemplates() (alive int, removed int) { var toDelete []TemplateKey s.mutex.RLock() for id, template := range s.Templates { - if !template.Delete.CAS(false, true) { + if !template.Delete.CompareAndSwap(false, true) { toDelete = append(toDelete, id) } } @@ -183,7 +183,7 @@ func (m *SessionMap) cleanup() (aliveSession int, removedSession int, aliveTempl a, r := session.ExpireTemplates() aliveTemplates += a removedTemplates += r - if !session.Delete.CAS(false, true) { + if !session.Delete.CompareAndSwap(false, true) { toDelete = append(toDelete, key) } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index f3684398a51..fe384ae15b2 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -5,12 +5,10 @@ package synthexec -import ( - "github.com/elastic/beats/v7/libbeat/common/atomic" -) +import "sync/atomic" type ExecMultiplexer struct { - eventCounter *atomic.Int + eventCounter *atomic.Int64 synthEvents chan *SynthEvent done chan struct{} } @@ -27,7 +25,7 @@ func (e *ExecMultiplexer) writeSynthEvent(se *SynthEvent) { if se.Type == JourneyStart { e.eventCounter.Store(-1) } - se.index = e.eventCounter.Inc() + se.index = int(e.eventCounter.Add(1)) e.synthEvents <- se } @@ -48,8 +46,10 @@ func (e *ExecMultiplexer) Wait() { } func NewExecMultiplexer() *ExecMultiplexer { + c := &atomic.Int64{} + c.Store(-1) // Start from -1 so first call to Inc returns 0 return &ExecMultiplexer{ - eventCounter: atomic.NewInt(-1), // Start from -1 so first call to Inc returns 0 + eventCounter: c, synthEvents: make(chan *SynthEvent), done: make(chan struct{}), } diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go index f03f6a98ebb..86610e0d6cb 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v1.go +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -7,7 +7,8 @@ package cloudfoundry import ( - "github.com/elastic/beats/v7/libbeat/common/atomic" + "sync/atomic" + "github.com/elastic/beats/v7/metricbeat/mb" cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" "github.com/elastic/elastic-agent-libs/logp" @@ -29,7 +30,6 @@ func newModuleV1(base mb.BaseModule, hub CloudfoundryHub, log *logp.Logger) (*Mo m := ModuleV1{ BaseModule: base, log: log, - running: atomic.MakeBool(false), } consumer, err := hub.DopplerConsumer(cfcommon.DopplerCallbacks{ Metric: m.callback, @@ -83,7 +83,7 @@ func (m *ModuleV1) callback(event cfcommon.Event) { // run ensures that the module is running with the passed subscription func (m *ModuleV1) run(s subscription) { - if !m.running.CAS(false, true) { + if !m.running.CompareAndSwap(false, true) { // Module is already running, queue subscription for current dispatcher. m.subscriptions <- s return