Skip to content

Commit

Permalink
Log errors returned from reg.Unregister()
Browse files Browse the repository at this point in the history
  • Loading branch information
madaraszg-tulip committed Nov 28, 2024
1 parent c42de6c commit ae1515d
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ type QueueSender struct {
traceAttribute attribute.KeyValue
consumers *queue.Consumers[internal.Request]

shutdownCallbacks []func()

obsrep *ObsReport
exporterID component.ID
obsrep *ObsReport
exporterID component.ID
logger *zap.Logger
shutdownFns []component.ShutdownFunc
}

func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settings, numConsumers int,
Expand All @@ -87,6 +87,7 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin
traceAttribute: attribute.String(ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
logger: set.Logger,
}
consumeFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
Expand Down Expand Up @@ -114,19 +115,21 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
reg1, err1 := qs.obsrep.TelemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr)))

qs.shutdownCallbacks = append(qs.shutdownCallbacks, func() {
qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error {
if reg1 != nil {
_ = reg1.Unregister()
return reg1.Unregister()
}
return nil
})

reg2, err2 := qs.obsrep.TelemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute)))

qs.shutdownCallbacks = append(qs.shutdownCallbacks, func() {
qs.shutdownFns = append(qs.shutdownFns, func(context.Context) error {
if reg2 != nil {
_ = reg2.Unregister()
return reg2.Unregister()
}
return nil
})

return multierr.Append(err1, err2)
Expand All @@ -137,10 +140,13 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.

for _, callback := range qs.shutdownCallbacks {
callback()
for _, fn := range qs.shutdownFns {
err := fn(ctx)
if err != nil {
qs.logger.Warn("Error while shutting down QueueSender", zap.Error(err))
}
}
qs.shutdownCallbacks = nil
qs.shutdownFns = nil

if err := qs.queue.Shutdown(ctx); err != nil {
return err
Expand Down

0 comments on commit ae1515d

Please sign in to comment.