Skip to content

Commit

Permalink
feat: replace custom atomic with sync/atomic (#40132)
Browse files Browse the repository at this point in the history
* feat: replace custom atomic with sync/atomic

Go 1.19 added new atomic types. Replace custom atomic types with stdlib atomic package
to fully use the race detector, improved lsp hints and support.

Use pointers in sniffer struct to prevent copying the atomic struct and cause
a race condition.

* lint: time.Now().Sub -> time.Since

fix golangci-lint issue

* Apply suggestions from code review

Co-authored-by: Dan Kortschak <[email protected]>

* refactor: keep Swap usage as is

* lint: avoid unnecessary conversion

fix golangci-lint issue

* fix: update method reference to new atomic types

* fix: resolve more compile errors and test failures

* lint: fix linter issues

* lint: fix linter issues

* lint: fix linter issues

* lint: fix linter issues

* lint: more linting errors

* lint: fix linter issues

* lint: remove unused import

* lint: fix linter issue

* lint: fix linter issues

* lint: fix linter issues

---------

Co-authored-by: Dan Kortschak <[email protected]>
  • Loading branch information
kruskall and efd6 authored Dec 10, 2024
1 parent bde07fa commit 296b83b
Show file tree
Hide file tree
Showing 50 changed files with 223 additions and 717 deletions.
8 changes: 4 additions & 4 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package channel

import (
"sync/atomic"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
)

type outlet struct {
Expand All @@ -31,15 +32,14 @@ type outlet struct {
func newOutlet(client beat.Client) *outlet {
o := &outlet{
client: client,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
}
o.isOpen.Store(true)
return o
}

func (o *outlet) Close() error {
isOpen := o.isOpen.Swap(false)
if isOpen {
if o.isOpen.Swap(false) {
close(o.done)
return o.client.Close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -32,7 +33,6 @@ import (
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/tests/resources"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestDefaultHarvesterGroup(t *testing.T) {

t.Run("assert a harvester is only started if harvester limit haven't been reached", func(t *testing.T) {
var wg sync.WaitGroup
var harvesterRunningCount atomic.Int
var harvesterRunningCount atomic.Int64
var harvester1Finished, harvester2Finished atomic.Bool
done1, done2 := make(chan struct{}), make(chan struct{})

Expand Down
8 changes: 4 additions & 4 deletions filebeat/input/filestream/internal/input-logfile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cleanup"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/statestore"
Expand Down Expand Up @@ -461,14 +461,14 @@ func (r *resource) isDeleted() bool {
// Retain is used to indicate that 'resource' gets an additional 'owner'.
// Owners of an resource can be active inputs or pending update operations
// not yet written to disk.
func (r *resource) Retain() { r.pending.Inc() }
func (r *resource) Retain() { r.pending.Add(1) }

// Release reduced the owner ship counter of the resource.
func (r *resource) Release() { r.pending.Dec() }
func (r *resource) Release() { r.pending.Add(^uint64(0)) }

// UpdatesReleaseN is used to release ownership of N pending update operations.
func (r *resource) UpdatesReleaseN(n uint) {
r.pending.Sub(uint64(n))
r.pending.Add(^uint64(n - 1))
}

// Finished returns true if the resource is not in use and if there are no pending updates
Expand Down
7 changes: 4 additions & 3 deletions filebeat/input/kafka/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/elastic-agent-libs/mapstr"

input "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -391,9 +391,10 @@ func (l *listFromFieldReader) Next() (reader.Message, error) {
timestamp, kafkaFields := composeEventMetadata(l.claim, l.groupHandler, msg)
messages := l.parseMultipleMessages(msg.Value)

neededAcks := atomic.MakeInt(len(messages))
neededAcks := atomic.Int64{}
neededAcks.Add(int64(len(messages)))
ackHandler := func() {
if neededAcks.Dec() == 0 {
if neededAcks.Add(-1) == 0 {
l.groupHandler.ack(msg)
}
}
Expand Down
14 changes: 7 additions & 7 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gofrs/uuid/v5"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -737,8 +737,8 @@ func (p *Input) createHarvester(logger *logp.Logger, state file.State, onTermina
// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int64) error {
if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
p.numHarvesters.Dec()
if p.numHarvesters.Add(1) > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
p.numHarvesters.Add(^uint32(0))
harvesterSkipped.Add(1)
return errHarvesterLimit
}
Expand All @@ -747,15 +747,15 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int
state.Offset = offset

// Create harvester with state
h, err := p.createHarvester(logger, state, func() { p.numHarvesters.Dec() })
h, err := p.createHarvester(logger, state, func() { p.numHarvesters.Add(^uint32(0)) })
if err != nil {
p.numHarvesters.Dec()
p.numHarvesters.Add(^uint32(0))
return err
}

err = h.Setup()
if err != nil {
p.numHarvesters.Dec()
p.numHarvesters.Add(^uint32(0))
return fmt.Errorf("error setting up harvester: %w", err)
}

Expand All @@ -765,7 +765,7 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int
h.SendStateUpdate()

if err = p.harvesters.Start(h); err != nil {
p.numHarvesters.Dec()
p.numHarvesters.Add(^uint32(0))
}
return err
}
Expand Down
10 changes: 5 additions & 5 deletions filebeat/input/v2/input-cursor/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package cursor
import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/common/cleanup"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/statestore"
Expand Down Expand Up @@ -235,14 +235,14 @@ func (r *resource) IsNew() bool {
// Retain is used to indicate that 'resource' gets an additional 'owner'.
// Owners of an resource can be active inputs or pending update operations
// not yet written to disk.
func (r *resource) Retain() { r.pending.Inc() }
func (r *resource) Retain() { r.pending.Add(1) }

// Release reduced the owner ship counter of the resource.
func (r *resource) Release() { r.pending.Dec() }
func (r *resource) Release() { r.pending.Add(^uint64(0)) }

// UpdatesReleaseN is used to release ownership of N pending update operations.
func (r *resource) UpdatesReleaseN(n uint) {
r.pending.Sub(uint64(n))
r.pending.Add(^uint64(n - 1))
}

// Finished returns true if the resource is not in use and if there are no pending updates
Expand Down Expand Up @@ -290,7 +290,7 @@ func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*stat
}

err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
if !strings.HasPrefix(string(key), keyPrefix) {
if !strings.HasPrefix(key, keyPrefix) {
return true, nil
}

Expand Down
14 changes: 7 additions & 7 deletions filebeat/input/v2/input-stateless/stateless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
"errors"
"runtime"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -111,12 +111,12 @@ func TestStateless_Run(t *testing.T) {
defer cancel()

// connector creates a client the blocks forever until the shutdown signal is received
var publishCalls atomic.Int
var publishCalls atomic.Int64
connector := pubtest.FakeConnector{
ConnectFunc: func(config beat.ClientConfig) (beat.Client, error) {
return &pubtest.FakeClient{
PublishFunc: func(event beat.Event) {
publishCalls.Inc()
publishCalls.Add(1)
// Unlock Publish once the input has been cancelled
<-ctx.Done()
},
Expand All @@ -141,24 +141,24 @@ func TestStateless_Run(t *testing.T) {

// validate
require.Equal(t, context.Canceled, err)
require.Equal(t, 1, publishCalls.Load())
require.Equal(t, int64(1), publishCalls.Load())
})

t.Run("do not start input of pipeline connection fails", func(t *testing.T) {
errOpps := errors.New("oops")
connector := pubtest.FailingConnector(errOpps)

var run atomic.Int
var run atomic.Int64
input := createConfiguredInput(t, constInputManager(&fakeStatelessInput{
OnRun: func(_ v2.Context, publisher stateless.Publisher) error {
run.Inc()
run.Add(1)
return nil
},
}), nil)

err := input.Run(v2.Context{}, connector)
require.True(t, errors.Is(err, errOpps))
require.Equal(t, 0, run.Load())
require.Equal(t, int64(0), run.Load())
})
}

Expand Down
8 changes: 4 additions & 4 deletions filebeat/inputsource/common/streaming/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"net"
"strings"
"sync"
"sync/atomic"

"github.com/elastic/beats/v7/filebeat/inputsource"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
)
Expand All @@ -44,7 +44,7 @@ type Listener struct {
wg sync.WaitGroup
log *logp.Logger
ctx ctxtool.CancelContext
clientsCount atomic.Int
clientsCount atomic.Int64
handlerFactory HandlerFactory
listenerFactory ListenerFactory
}
Expand Down Expand Up @@ -190,10 +190,10 @@ func (l *Listener) handleConnection(conn net.Conn) {
defer cancel()

// Track number of clients.
l.clientsCount.Inc()
l.clientsCount.Add(1)
log.Debugw("New client connection", "active_clients", l.clientsCount.Load())
defer func() {
l.clientsCount.Dec()
l.clientsCount.Add(-1)
log.Debugw("Client disconnected", "active_clients", l.clientsCount.Load())
}()

Expand Down
8 changes: 4 additions & 4 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func TestDisabledMonitor(t *testing.T) {
require.NoError(t, err)
require.IsType(t, NoopRunner{}, runner)

require.Equal(t, 0, built.Load())
require.Equal(t, 0, closed.Load())
require.Equal(t, int64(0), built.Load())
require.Equal(t, int64(0), closed.Load())
}
}

Expand Down Expand Up @@ -353,7 +353,7 @@ func TestDuplicateMonitorIDs(t *testing.T) {

// Two are counted as built. The bad config is missing a stdfield so it
// doesn't complete construction
require.Equal(t, 2, built.Load())
require.Equal(t, int64(2), built.Load())
// Only 2 closes, because the bad config isn't closed
require.Equal(t, 2, closed.Load())
require.Equal(t, int64(2), closed.Load())
}
14 changes: 7 additions & 7 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"regexp"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -42,7 +43,6 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
beatversion "github.com/elastic/beats/v7/libbeat/version"
)

Expand Down Expand Up @@ -212,25 +212,25 @@ func createMockJob() []jobs.Job {
return []jobs.Job{j}
}

func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int64, *atomic.Int64) {
reg := monitoring.NewRegistry()

built := atomic.NewInt(0)
closed := atomic.NewInt(0)
built := &atomic.Int64{}
closed := &atomic.Int64{}

return plugin.PluginFactory{
Name: "test",
Aliases: []string{"testAlias"},
Make: func(s string, config *config.C) (plugin.Plugin, error) {
built.Inc()
built.Add(1)
// Declare a real config block with a required attr so we can see what happens when it doesn't work
unpacked := struct {
URLs []string `config:"urls" validate:"required"`
}{}

// track all closes, even on error
closer := func() error {
closed.Inc()
closed.Add(1)
return nil
}

Expand All @@ -248,7 +248,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
closed
}

func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int, closed *atomic.Int) {
func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int64, closed *atomic.Int64) {
reg := plugin.NewPluginsReg()
builder, built, closed := mockPluginBuilder()
_ = reg.Add(builder)
Expand Down
8 changes: 4 additions & 4 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ func testMonitorConfig(t *testing.T, conf *conf.C, eventValidator validator.Vali
t.Fatalf("No publishes detected!")
}

assert.Equal(t, 1, built.Load())
assert.Equal(t, int64(1), built.Load())
mon.Stop()

assert.Equal(t, 1, closed.Load())
assert.Equal(t, int64(1), closed.Load())
assert.Equal(t, true, pcClient.closed)
}

Expand All @@ -129,8 +129,8 @@ func TestCheckInvalidConfig(t *testing.T) {
require.Nil(t, m, "For this test to work we need a nil value for the monitor.")

// These counters are both zero since this fails at config parse time
require.Equal(t, 0, built.Load())
require.Equal(t, 0, closed.Load())
require.Equal(t, int64(0), built.Load())
require.Equal(t, int64(0), closed.Load())

require.Error(t, checkMonitorConfig(serverMonConf, reg))
}
Expand Down
Loading

0 comments on commit 296b83b

Please sign in to comment.