Skip to content

Commit

Permalink
Synchronize shutdown in stanza adapter (#34638)
Browse files Browse the repository at this point in the history
**Description:** 
This PR takes emitter-converter-receiver pipeline and synchronize
shutdown in a sense that lower level of the pipeline needs to be fully
finished before next one is shut down.

**Link to tracking Issue:** #31074

**Testing:** UT
run 
```
 go test -run ^TestShutdownFlush$ . -count 10000 -failfast
```

**Documentation:** in code comments

---------

Co-authored-by: Daniel Jaglowski <[email protected]>
Co-authored-by: Mikołaj Świątek <[email protected]>
  • Loading branch information
3 people authored Sep 10, 2024
1 parent 6c9976a commit ede416f
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 114 deletions.
27 changes: 27 additions & 0 deletions .chloggen/fix_shutdown.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Synchronize shutdown in stanza adapter

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Stanza-based receivers should now flush all data before shutting down

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
128 changes: 62 additions & 66 deletions pkg/stanza/adapter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ type Converter struct {
pLogsChan chan plog.Logs

stopOnce sync.Once
stopChan chan struct{}

// converterChan is an internal communication channel signaling stop was called
// prevents sending to closed channels
converterChan chan struct{}

// workerChan is an internal communication channel that gets the log
// entries from Batch() calls and it receives the data in workerLoop().
Expand All @@ -73,6 +76,10 @@ type Converter struct {
// wg is a WaitGroup that makes sure that we wait for spun up goroutines exit
// when Stop() is called.
wg sync.WaitGroup

// flushWg is a WaitGroup that makes sure that we wait for flush loop to exit
// when Stop() is called.
flushWg sync.WaitGroup
}

type converterOption interface {
Expand All @@ -94,12 +101,12 @@ func (o workerCountOption) apply(c *Converter) {
func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter {
set.Logger = set.Logger.With(zap.String("component", "converter"))
c := &Converter{
set: set,
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
pLogsChan: make(chan plog.Logs),
stopChan: make(chan struct{}),
flushChan: make(chan plog.Logs),
set: set,
workerChan: make(chan []*entry.Entry),
workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))),
pLogsChan: make(chan plog.Logs),
converterChan: make(chan struct{}),
flushChan: make(chan plog.Logs),
}
for _, opt := range opts {
opt.apply(c)
Expand All @@ -115,14 +122,23 @@ func (c *Converter) Start() {
go c.workerLoop()
}

c.wg.Add(1)
c.flushWg.Add(1)
go c.flushLoop()
}

func (c *Converter) Stop() {
c.stopOnce.Do(func() {
close(c.stopChan)
close(c.converterChan)

// close workerChan and wait for entries to be processed
close(c.workerChan)
c.wg.Wait()

// close flushChan and wait for flush loop to finish
close(c.flushChan)
c.flushWg.Wait()

// close pLogsChan so callers can stop processing
close(c.pLogsChan)
})
}
Expand All @@ -138,76 +154,57 @@ func (c *Converter) OutChannel() <-chan plog.Logs {
func (c *Converter) workerLoop() {
defer c.wg.Done()

for {
for entries := range c.workerChan {

select {
case <-c.stopChan:
return
resourceHashToIdx := make(map[uint64]int)
scopeIdxByResource := make(map[uint64]map[string]int)

case entries, ok := <-c.workerChan:
if !ok {
return
}
pLogs := plog.NewLogs()
var sl plog.ScopeLogs

resourceHashToIdx := make(map[uint64]int)
scopeIdxByResource := make(map[uint64]map[string]int)
for _, e := range entries {
resourceID := HashResource(e.Resource)
var rl plog.ResourceLogs

pLogs := plog.NewLogs()
var sl plog.ScopeLogs
resourceIdx, ok := resourceHashToIdx[resourceID]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()

for _, e := range entries {
resourceID := HashResource(e.Resource)
var rl plog.ResourceLogs
rl = pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())

resourceIdx, ok := resourceHashToIdx[resourceID]
scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
rl = pLogs.ResourceLogs().At(resourceIdx)
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
if !ok {
resourceHashToIdx[resourceID] = pLogs.ResourceLogs().Len()

rl = pLogs.ResourceLogs().AppendEmpty()
upsertToMap(e.Resource, rl.Resource().Attributes())

scopeIdxByResource[resourceID] = map[string]int{e.ScopeName: 0}
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
rl = pLogs.ResourceLogs().At(resourceIdx)
scopeIdxInResource, ok := scopeIdxByResource[resourceID][e.ScopeName]
if !ok {
scopeIdxByResource[resourceID][e.ScopeName] = rl.ScopeLogs().Len()
sl = rl.ScopeLogs().AppendEmpty()
sl.Scope().SetName(e.ScopeName)
} else {
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
}
sl = pLogs.ResourceLogs().At(resourceIdx).ScopeLogs().At(scopeIdxInResource)
}
convertInto(e, sl.LogRecords().AppendEmpty())
}

// Send plogs directly to flushChan
select {
case c.flushChan <- pLogs:
case <-c.stopChan:
}
convertInto(e, sl.LogRecords().AppendEmpty())
}

// Send plogs directly to flushChan
c.flushChan <- pLogs
}
}

func (c *Converter) flushLoop() {
defer c.wg.Done()
defer c.flushWg.Done()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for {
select {
case <-c.stopChan:
return

case pLogs := <-c.flushChan:
if err := c.flush(ctx, pLogs); err != nil {
c.set.Logger.Debug("Problem sending log entries",
zap.Error(err),
)
}
for pLogs := range c.flushChan {
if err := c.flush(ctx, pLogs); err != nil {
c.set.Logger.Debug("Problem sending log entries",
zap.Error(err),
)
}
}
}
Expand All @@ -221,23 +218,22 @@ func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error {
return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err())

case c.pLogsChan <- pLogs:

// The converter has been stopped so bail the flush.
case <-c.stopChan:
return errors.New("logs converter has been stopped")
}

return nil
}

// Batch takes in an entry.Entry and sends it to an available worker for processing.
func (c *Converter) Batch(e []*entry.Entry) error {
// in case Stop was called do not process batch
select {
case c.workerChan <- e:
return nil
case <-c.stopChan:
case <-c.converterChan:
return errors.New("logs converter has been stopped")
default:
}

c.workerChan <- e
return nil
}

// convert converts one entry.Entry into plog.LogRecord allocating it.
Expand Down
5 changes: 4 additions & 1 deletion pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ func TestEmitterToConsumer(t *testing.T) {

err = logsReceiver.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() { require.NoError(t, logsReceiver.Shutdown(context.Background())) }()
defer func() {
require.NoError(t, logsReceiver.emitter.Stop())
require.NoError(t, logsReceiver.Shutdown(context.Background()))
}()

go func() {
ctx := context.Background()
Expand Down
82 changes: 36 additions & 46 deletions pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

type receiver struct {
set component.TelemetrySettings
id component.ID
wg sync.WaitGroup
cancel context.CancelFunc
set component.TelemetrySettings
id component.ID
emitWg sync.WaitGroup
consumeWg sync.WaitGroup
cancel context.CancelFunc

pipe pipeline.Pipeline
emitter *helper.LogEmitter
Expand Down Expand Up @@ -59,13 +60,13 @@ func (r *receiver) Start(ctx context.Context, host component.Host) error {
// * one which reads all the logs produced by the emitter and then forwards
// them to converter
// ...
r.wg.Add(1)
go r.emitterLoop(rctx)
r.emitWg.Add(1)
go r.emitterLoop()

// ...
// * second one which reads all the logs produced by the converter
// (aggregated by Resource) and then calls consumer to consumer them.
r.wg.Add(1)
r.consumeWg.Add(1)
go r.consumerLoop(rctx)

// Those 2 loops are started in separate goroutines because batching in
Expand All @@ -80,56 +81,40 @@ func (r *receiver) Start(ctx context.Context, host component.Host) error {

// emitterLoop reads the log entries produced by the emitter and batches them
// in converter.
func (r *receiver) emitterLoop(ctx context.Context) {
defer r.wg.Done()
func (r *receiver) emitterLoop() {
defer r.emitWg.Done()

// Don't create done channel on every iteration.
doneChan := ctx.Done()
for {
select {
case <-doneChan:
r.set.Logger.Debug("Receive loop stopped")
return

case e, ok := <-r.emitter.OutChannel():
if !ok {
continue
}

if err := r.converter.Batch(e); err != nil {
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
// emitter.OutChannel is closed on ctx.Done(), no need to handle ctx here
// instead we should drain and process the channel to let emitter cancel properly
for e := range r.emitter.OutChannel() {
if err := r.converter.Batch(e); err != nil {
r.set.Logger.Error("Could not add entry to batch", zap.Error(err))
}
}

r.set.Logger.Debug("Emitter loop stopped")
}

// consumerLoop reads converter log entries and calls the consumer to consumer them.
func (r *receiver) consumerLoop(ctx context.Context) {
defer r.wg.Done()
defer r.consumeWg.Done()

// Don't create done channel on every iteration.
doneChan := ctx.Done()
pLogsChan := r.converter.OutChannel()
for {
select {
case <-doneChan:
r.set.Logger.Debug("Consumer loop stopped")
return

case pLogs, ok := <-pLogsChan:
if !ok {
r.set.Logger.Debug("Converter channel got closed")
continue
}
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()
cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
// converter.OutChannel is closed on Shutdown before context is cancelled.
// Drain the channel and process events before exiting
for pLogs := range r.converter.OutChannel() {
obsrecvCtx := r.obsrecv.StartLogsOp(ctx)
logRecordCount := pLogs.LogRecordCount()

cErr := r.consumer.ConsumeLogs(ctx, pLogs)
if cErr != nil {
r.set.Logger.Error("ConsumeLogs() failed", zap.Error(cErr))
}
r.obsrecv.EndLogsOp(obsrecvCtx, "stanza", logRecordCount, cErr)
}

r.set.Logger.Debug("Consumer loop stopped")
}

// Shutdown is invoked during service shutdown
Expand All @@ -140,9 +125,14 @@ func (r *receiver) Shutdown(ctx context.Context) error {

r.set.Logger.Info("Stopping stanza receiver")
pipelineErr := r.pipe.Stop()

// wait for emitter to finish batching and let consumers catch up
r.emitWg.Wait()

r.converter.Stop()
r.cancel()
r.wg.Wait()
// wait for consumers to catch up
r.consumeWg.Wait()

if r.storageClient != nil {
clientErr := r.storageClient.Close(ctx)
Expand Down
Loading

0 comments on commit ede416f

Please sign in to comment.