Skip to content

Commit

Permalink
Small improvements in collector tests, and atomic usage (#4948)
Browse files Browse the repository at this point in the history
* Change state to be a int32 and use atomic ops accordingly.
* Change test that should expect certain state to check for that instead of using Eventually.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Mar 3, 2022
1 parent 322d0aa commit c257aba
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 36 deletions.
13 changes: 5 additions & 8 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Collector struct {
zPagesSpanProcessor *zpages.SpanProcessor

service *service
state atomic.Value
state int32

// shutdownChan is used to terminate the collector.
shutdownChan chan struct{}
Expand All @@ -103,22 +103,19 @@ func New(set CollectorSettings) (*Collector, error) {
return nil, errors.New("invalid nil config provider")
}

state := atomic.Value{}
state.Store(Starting)
return &Collector{
logger: zap.NewNop(), // Set a Nop logger as a place holder until a logger is created based on configuration

set: set,
state: state,

set: set,
state: int32(Starting),
shutdownChan: make(chan struct{}),
}, nil

}

// GetState returns current state of the collector server.
func (col *Collector) GetState() State {
return col.state.Load().(State)
return State(atomic.LoadInt32(&col.state))
}

// GetLogger returns logger used by the Collector.
Expand Down Expand Up @@ -283,7 +280,7 @@ func (col *Collector) shutdown(ctx context.Context) error {

// setCollectorState provides current state of the collector
func (col *Collector) setCollectorState(state State) {
col.state.Store(state)
atomic.StoreInt32(&col.state, int32(state))
}

func getBallastSize(host component.Host) uint64 {
Expand Down
45 changes: 17 additions & 28 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestStateString(t *testing.T) {
assert.Equal(t, "Running", Running.String())
assert.Equal(t, "Closing", Closing.String())
assert.Equal(t, "Closed", Closed.String())
assert.Equal(t, "UNKNOWN", State(13).String())
}

// TestCollector_StartAsGoRoutine must be the first unit test on the file,
Expand All @@ -60,32 +61,28 @@ func TestCollector_StartAsGoRoutine(t *testing.T) {
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")},
[]string{"service.telemetry.metrics.address=localhost:" + strconv.FormatUint(uint64(testutil.GetAvailablePort(t)), 10)}),
}
col, err := New(set)
require.NoError(t, err)

colDone := make(chan struct{})
go func() {
defer close(colDone)
colErr := col.Run(context.Background())
if colErr != nil {
err = colErr
}
require.NoError(t, col.Run(context.Background()))
}()

assert.Eventually(t, func() bool {
return Running == col.GetState()
}, time.Second*2, time.Millisecond*200)
}, 2*time.Second, 200*time.Millisecond)

col.Shutdown()
col.Shutdown()
<-colDone
assert.Eventually(t, func() bool {
return Closed == col.GetState()
}, time.Second*2, time.Millisecond*200)
assert.Equal(t, Closed, col.GetState())
}

func testCollectorStartHelper(t *testing.T) {
Expand Down Expand Up @@ -114,12 +111,12 @@ func testCollectorStartHelper(t *testing.T) {
colDone := make(chan struct{})
go func() {
defer close(colDone)
assert.NoError(t, col.Run(context.Background()))
require.NoError(t, col.Run(context.Background()))
}()

assert.Eventually(t, func() bool {
return Running == col.GetState()
}, time.Second*2, time.Millisecond*200)
}, 2*time.Second, 200*time.Millisecond)
assert.Equal(t, col.logger, col.GetLogger())
assert.True(t, loggingHookCalled)

Expand All @@ -135,12 +132,10 @@ func testCollectorStartHelper(t *testing.T) {
assertMetrics(t, metricsPort, mandatoryLabels)

assertZPages(t)

col.signalsChannel <- syscall.SIGTERM

<-colDone
assert.Eventually(t, func() bool {
return Closed == col.GetState()
}, time.Second*2, time.Millisecond*200)
assert.Equal(t, Closed, col.GetState())
}

// as telemetry instance is initialized only once, we need to reset it before each test so the metrics endpoint can
Expand Down Expand Up @@ -209,17 +204,12 @@ func TestCollector_ShutdownBeforeRun(t *testing.T) {
colDone := make(chan struct{})
go func() {
defer close(colDone)
colErr := col.Run(context.Background())
if colErr != nil {
err = colErr
}
require.NoError(t, col.Run(context.Background()))
}()

col.Shutdown()
<-colDone
assert.Eventually(t, func() bool {
return Closed == col.GetState()
}, time.Second*2, time.Millisecond*200)
assert.Equal(t, Closed, col.GetState())
}

type mockColTelemetry struct{}
Expand Down Expand Up @@ -256,12 +246,11 @@ func TestCollector_ReportError(t *testing.T) {

assert.Eventually(t, func() bool {
return Running == col.GetState()
}, time.Second*2, time.Millisecond*200)
}, 2*time.Second, 200*time.Millisecond)
col.service.ReportFatalError(errors.New("err2"))

<-colDone
assert.Eventually(t, func() bool {
return Closed == col.GetState()
}, time.Second*2, time.Millisecond*200)
assert.Equal(t, Closed, col.GetState())
}

func assertMetrics(t *testing.T, metricsPort uint16, mandatoryLabels []string) {
Expand Down

0 comments on commit c257aba

Please sign in to comment.