Skip to content

Commit

Permalink
Export traces less frequently (kolide#1422)
Browse files Browse the repository at this point in the history
  • Loading branch information
RebeccaMahany authored Oct 27, 2023
1 parent 3b8451e commit a896d36
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 7 deletions.
11 changes: 11 additions & 0 deletions pkg/agent/flags/flag_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,17 @@ func (fc *FlagController) TraceSamplingRate() float64 {
).get(fc.getControlServerValue(keys.TraceSamplingRate))
}

func (fc *FlagController) SetTraceBatchTimeout(duration time.Duration) error {
return fc.setControlServerValue(keys.TraceBatchTimeout, durationToBytes(duration))
}
func (fc *FlagController) TraceBatchTimeout() time.Duration {
return NewDurationFlagValue(fc.logger, keys.TraceBatchTimeout,
WithDefault(1*time.Minute),
WithMin(5*time.Second),
WithMax(1*time.Hour),
).get(fc.getControlServerValue(keys.TraceBatchTimeout))
}

func (fc *FlagController) SetLogIngestServerURL(url string) error {
return fc.setControlServerValue(keys.LogIngestServerURL, []byte(url))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/flags/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
UpdateDirectory FlagKey = "update_directory"
ExportTraces FlagKey = "export_traces"
TraceSamplingRate FlagKey = "trace_sampling_rate"
TraceBatchTimeout FlagKey = "trace_batch_timeout"
LogIngestServerURL FlagKey = "log_ingest_url"
TraceIngestServerURL FlagKey = "trace_ingest_url"
DisableTraceIngestTLS FlagKey = "disable_trace_ingest_tls"
Expand Down
7 changes: 7 additions & 0 deletions pkg/agent/knapsack/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@ func (k *knapsack) DisableTraceIngestTLS() bool {
return k.flags.DisableTraceIngestTLS()
}

func (k *knapsack) SetTraceBatchTimeout(duration time.Duration) error {
return k.flags.SetTraceBatchTimeout(duration)
}
func (k *knapsack) TraceBatchTimeout() time.Duration {
return k.flags.TraceBatchTimeout()
}

func (k *knapsack) SetLogIngestServerURL(url string) error {
return k.flags.SetLogIngestServerURL(url)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/types/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ type Flags interface {
SetDisableTraceIngestTLS(enabled bool) error
DisableTraceIngestTLS() bool

// TraceBatchTimeout is the maximum amount of time before the trace exporter will export the next batch of spans
SetTraceBatchTimeout(duration time.Duration) error
TraceBatchTimeout() time.Duration

// InModernStandby indicates whether a Windows machine is awake or in modern standby
SetInModernStandby(enabled bool) error
InModernStandby() bool
Expand Down
28 changes: 28 additions & 0 deletions pkg/agent/types/mocks/flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions pkg/agent/types/mocks/knapsack.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions pkg/traces/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type TraceExporter struct {
disableIngestTLS bool
enabled bool
traceSamplingRate float64
batchTimeout time.Duration
ctx context.Context // nolint:containedctx
cancel context.CancelFunc
interrupted bool
Expand Down Expand Up @@ -90,13 +91,14 @@ func NewTraceExporter(ctx context.Context, k types.Knapsack, client osquery.Quer
disableIngestTLS: k.DisableTraceIngestTLS(),
enabled: k.ExportTraces(),
traceSamplingRate: k.TraceSamplingRate(),
batchTimeout: k.TraceBatchTimeout(),
ctx: ctx,
cancel: cancel,
}

// Observe ExportTraces and IngestServerURL changes to know when to start/stop exporting, and where
// to export to
t.knapsack.RegisterChangeObserver(t, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS)
// Observe changes to trace configuration to know when to start/stop exporting, and when
// to adjust exporting behavior
t.knapsack.RegisterChangeObserver(t, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS, keys.TraceBatchTimeout)

if !t.enabled {
return t, nil
Expand Down Expand Up @@ -251,7 +253,7 @@ func (t *TraceExporter) setNewGlobalProvider() {
parentBasedSampler := sdktrace.ParentBased(sdktrace.TraceIDRatioBased(t.traceSamplingRate))

newProvider := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithBatcher(exp, sdktrace.WithBatchTimeout(t.batchTimeout)),
sdktrace.WithResource(r),
sdktrace.WithSampler(parentBasedSampler),
)
Expand Down Expand Up @@ -356,6 +358,15 @@ func (t *TraceExporter) FlagsChanged(flagKeys ...keys.FlagKey) {
}
}

// Handle trace_batch_timeout updates
if slices.Contains(flagKeys, keys.TraceBatchTimeout) {
if t.batchTimeout != t.knapsack.TraceBatchTimeout() {
t.batchTimeout = t.knapsack.TraceBatchTimeout()
needsNewProvider = true
level.Debug(t.logger).Log("msg", "updating trace batch timeout", "new_batch_timeout", t.batchTimeout)
}
}

if !t.enabled || !needsNewProvider {
return
}
Expand Down
78 changes: 75 additions & 3 deletions pkg/traces/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestNewTraceExporter(t *testing.T) { //nolint:paralleltest
mockKnapsack.On("DisableTraceIngestTLS").Return(false)
mockKnapsack.On("ExportTraces").Return(true)
mockKnapsack.On("TraceSamplingRate").Return(1.0)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS).Return(nil)
mockKnapsack.On("TraceBatchTimeout").Return(1 * time.Minute)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS, keys.TraceBatchTimeout).Return(nil)

osqueryClient := mocks.NewQuerier(t)
osqueryClient.On("Query", mock.Anything).Return([]map[string]string{
Expand Down Expand Up @@ -85,7 +86,8 @@ func TestNewTraceExporter_exportNotEnabled(t *testing.T) {
mockKnapsack.On("DisableTraceIngestTLS").Return(false)
mockKnapsack.On("ExportTraces").Return(false)
mockKnapsack.On("TraceSamplingRate").Return(0.0)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS).Return(nil)
mockKnapsack.On("TraceBatchTimeout").Return(1 * time.Minute)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS, keys.TraceBatchTimeout).Return(nil)

traceExporter, err := NewTraceExporter(context.Background(), mockKnapsack, mocks.NewQuerier(t), log.NewNopLogger())
require.NoError(t, err)
Expand Down Expand Up @@ -122,7 +124,8 @@ func TestInterrupt_Multiple(t *testing.T) {
mockKnapsack.On("DisableTraceIngestTLS").Return(false)
mockKnapsack.On("ExportTraces").Return(false)
mockKnapsack.On("TraceSamplingRate").Return(0.0)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS).Return(nil)
mockKnapsack.On("TraceBatchTimeout").Return(1 * time.Minute)
mockKnapsack.On("RegisterChangeObserver", mock.Anything, keys.ExportTraces, keys.TraceSamplingRate, keys.TraceIngestServerURL, keys.DisableTraceIngestTLS, keys.TraceBatchTimeout).Return(nil)

traceExporter, err := NewTraceExporter(context.Background(), mockKnapsack, mocks.NewQuerier(t), log.NewNopLogger())
require.NoError(t, err)
Expand Down Expand Up @@ -603,6 +606,75 @@ func TestFlagsChanged_DisableTraceIngestTLS(t *testing.T) { //nolint:paralleltes
}
}

func TestFlagsChanged_TraceBatchTimeout(t *testing.T) { //nolint:paralleltest
tests := []struct {
testName string
currentBatchTimeout time.Duration
newBatchTimeout time.Duration
tracingEnabled bool
shouldReplaceProvider bool
}{
{
testName: "update",
currentBatchTimeout: 1 * time.Minute,
newBatchTimeout: 5 * time.Second,
tracingEnabled: true,
shouldReplaceProvider: true,
},
{
testName: "update but tracing not enabled",
currentBatchTimeout: 1 * time.Minute,
newBatchTimeout: 5 * time.Second,
tracingEnabled: false,
shouldReplaceProvider: false,
},
{
testName: "no update",
currentBatchTimeout: 1 * time.Minute,
newBatchTimeout: 1 * time.Minute,
tracingEnabled: true,
shouldReplaceProvider: false,
},
}

for _, tt := range tests { //nolint:paralleltest
tt := tt
t.Run(tt.testName, func(t *testing.T) {
mockKnapsack := typesmocks.NewKnapsack(t)
mockKnapsack.On("TraceBatchTimeout").Return(tt.newBatchTimeout)
osqueryClient := mocks.NewQuerier(t)

ctx, cancel := context.WithCancel(context.Background())
traceExporter := &TraceExporter{
knapsack: mockKnapsack,
osqueryClient: osqueryClient,
logger: log.NewNopLogger(),
attrs: make([]attribute.KeyValue, 0),
attrLock: sync.RWMutex{},
ingestClientAuthenticator: newClientAuthenticator("test token", false),
ingestAuthToken: "test token",
ingestUrl: "localhost:4317",
disableIngestTLS: false,
enabled: tt.tracingEnabled,
traceSamplingRate: 1.0,
batchTimeout: tt.currentBatchTimeout,
ctx: ctx,
cancel: cancel,
}

traceExporter.FlagsChanged(keys.TraceBatchTimeout)

require.Equal(t, tt.newBatchTimeout, traceExporter.batchTimeout, "batch timeout value not updated")

if tt.shouldReplaceProvider {
require.NotNil(t, traceExporter.provider)
} else {
require.Nil(t, traceExporter.provider)
}
})
}
}

func testServerProvidedDataStore(t *testing.T) types.KVStore {
s, err := storageci.NewStore(t, log.NewNopLogger(), storage.ServerProvidedDataStore.String())
require.NoError(t, err)
Expand Down

0 comments on commit a896d36

Please sign in to comment.