Skip to content

Commit

Permalink
Fix flakiness in TestManager_NoDeadlock (#4104)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored Jan 22, 2024
1 parent 9e11210 commit 91c0a94
Showing 1 changed file with 70 additions and 101 deletions.
171 changes: 70 additions & 101 deletions pkg/component/runtime/manager_fake_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,35 +1307,11 @@ LOOP:
require.Equal(t, context.Canceled, err, "Run should return with context canceled, got %v", err.Error())
}

func (suite *FakeInputSuite) TestManager_NoDeadlock() {
t := suite.T()
// NOTE: This is a long-running test that spams the runtime managers `Update` function to try and
// trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue:
// https://github.com/elastic/elastic-agent/issues/2691

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ai, _ := info.NewAgentInfo(ctx, true)
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)
errCh := make(chan error)
go func() {
err := m.Run(ctx)
if errors.Is(err, context.Canceled) {
err = nil
}
errCh <- err
}()

waitCtx, waitCancel := context.WithTimeout(ctx, 1*time.Second)
defer waitCancel()
if err := waitForReady(waitCtx, m); err != nil {
require.NoError(t, err)
}

// A component that can be fed to Manager.Update, with an index to allow
// looping with distinct configurations at each step.
func noDeadlockTestComponent(t *testing.T, index int) component.Component {
binaryPath := testBinary(t, "component")
comp := component.Component{
return component.Component{
ID: "fake-default",
InputSpec: &component.InputRuntimeSpec{
InputType: "fake",
Expand All @@ -1351,99 +1327,92 @@ func (suite *FakeInputSuite) TestManager_NoDeadlock() {
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": "Fake Healthy",
"message": fmt.Sprintf("Fake Healthy %d", index),
}),
},
},
}
}

updatedCh := make(chan time.Time)
updatedErr := make(chan error)
updatedCtx, updatedCancel := context.WithCancel(context.Background())
defer updatedCancel()
func (suite *FakeInputSuite) TestManager_NoDeadlock() {
t := suite.T()
// NOTE: This is a long-running test that spams the runtime managers `Update` function to try and
// trigger a deadlock. This test takes 2 minutes to run trying to re-produce issue:
// https://github.com/elastic/elastic-agent/issues/2691

// How long to run the test
testDuration := 2 * time.Minute

// How long without an update before we report it as a deadlock
maxUpdateInterval := 15 * time.Second

// Create the runtime manager
ai, _ := info.NewAgentInfo(context.Background(), true)
m, err := NewManager(newDebugLogger(t), newDebugLogger(t), "localhost:0", ai, apmtest.DiscardTracer, newTestMonitoringMgr(), configuration.DefaultGRPCConfig())
require.NoError(t, err)

// Start the runtime manager in a goroutine, passing its termination state
// to managerResultChan.
managerCtx, managerCancel := context.WithCancel(context.Background())
defer managerCancel()
managerResultChan := make(chan error)
go func() {
// spam update on component trying to cause a deadlock
comp := comp
i := 0
for {
if updatedCtx.Err() != nil {
return
}
updatedComp := comp
updatedComp.Units = make([]component.Unit, 1)
updatedComp.Units[0] = component.Unit{
ID: "fake-input",
Type: client.UnitTypeInput,
LogLevel: client.UnitLogLevelError, // test log will get spammed with the constant updates (error to prevent spam)
Config: component.MustExpectedConfig(map[string]interface{}{
"type": "fake",
"state": int(client.UnitStateHealthy),
"message": fmt.Sprintf("Fake Healthy %d", i),
}),
}
i += 1
comp = updatedComp
m.Update(component.Model{Components: []component.Component{updatedComp}})
err := <-m.errCh
if err != nil {
updatedErr <- err
return
}
updatedCh <- time.Now()
}
managerResultChan <- m.Run(managerCtx)
}()

deadlockErr := make(chan error)
// Wait for the manager to become active
timedWaitForReady(t, m, 1*time.Second)

// Start a goroutine to spam the manager update trying to cause
// a deadlock. When the test context finishes, the update loop
// closes updateResultChan to signal that it is done.
updateResultChan := make(chan error)
updateLoopCtx, updateLoopCancel := context.WithTimeout(context.Background(), testDuration)
defer updateLoopCancel()
go func() {
t := time.NewTimer(15 * time.Second)
defer t.Stop()
for {
select {
case <-updatedCtx.Done():
return
case <-updatedCh:
// update did occur
t.Reset(15 * time.Second)
case <-t.C:
// timeout hit waiting for another update to work
deadlockErr <- errors.New("hit deadlock")
defer close(updateResultChan)
for i := 0; updateLoopCtx.Err() == nil; i++ {
comp := noDeadlockTestComponent(t, i)
m.Update(component.Model{Components: []component.Component{comp}})
err := <-m.errCh
updateResultChan <- err
if err != nil {
// If the update gave an error, end the test
return
}
}
}()

defer drainErrChan(errCh)
defer drainErrChan(updatedErr)
defer drainErrChan(deadlockErr)

// wait 2 minutes for a deadlock to occur
endTimer := time.NewTimer(2 * time.Minute)
defer endTimer.Stop()
// The component state is being changed constantly. If updateTimeout
// triggers without any updates, report it as a deadlock.
updateTimeout := time.NewTicker(maxUpdateInterval)
defer updateTimeout.Stop()
LOOP:
for {
select {
case <-endTimer.C:
// no deadlock after timeout (all good stop the component)
updatedCancel()
m.Update(component.Model{Components: []component.Component{}})
<-m.errCh // Don't care about the result of Update, just that it runs
break LOOP
case err := <-errCh:
require.NoError(t, err)
case err := <-updatedErr:
require.NoError(t, err)
break LOOP
case err := <-deadlockErr:
require.NoError(t, err)
break LOOP
case err, ok := <-updateResultChan:
if ok {
// update did occur
require.NoError(t, err)
updateTimeout.Reset(15 * time.Second)
} else {
// Update goroutine is terminating, test is over
break LOOP
}
case <-managerResultChan:
require.Fail(t, "Runtime manager terminated before test was over")
case <-updateTimeout.C:
require.Fail(t, "Timed out waiting for Manager.Update result")
}
}

updatedCancel()
cancel()

err = <-errCh
// Finished without a deadlock. Stop the component and shut down the manager.
m.Update(component.Model{Components: []component.Component{}})
err = <-m.errCh
require.NoError(t, err)

managerCancel()
err = <-managerResultChan
require.Equal(t, err, context.Canceled)
}

func (suite *FakeInputSuite) TestManager_Configure() {
Expand Down

0 comments on commit 91c0a94

Please sign in to comment.