Skip to content

Commit

Permalink
feat: add producer cleanup to OTEL exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Frederico Araujo <[email protected]>
  • Loading branch information
araujof committed Apr 22, 2024
1 parent 1400413 commit 1381c32
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
2 changes: 1 addition & 1 deletion core/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Exporter) registerExportProtocols() {
(&transports.ElasticProto{}).Register(protocols)
}

// Init initializes the plugin with a configuration map and cache.
// Init initializes the plugin with a configuration map.
func (s *Exporter) Init(conf map[string]interface{}) error {
var err error

Expand Down
31 changes: 21 additions & 10 deletions core/exporter/otelexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,39 @@ import (
)

const (
pluginLabel string = "otelexporter"
otelPluginName string = "otelexporter"
)

var Plugin OTELExporter

// OTELExporter defines an OpenTelemetry exporter plugin.
type OTELExporter struct {
producer *kafka.Producer
exportTopic string
encoding string
}

// NewOTELExporter creates a new plugin instance.
func NewOTELExporter() plugins.SFProcessor {
return new(OTELExporter)
}

// GetName returns the plugin name.
func (s *OTELExporter) GetName() string {
return pluginLabel
return otelPluginName
}

// Register registers plugin to plugin cache.
func (s *OTELExporter) Register(pc plugins.SFPluginCache) {
pc.AddProcessor(otelPluginName, NewOTELExporter)
}

// Init initializes the plugin with a configuration map.
func (s *OTELExporter) Init(conf map[string]interface{}) error {
brokerString, ok := conf["otelExportKafkaBrokerList"]
if !ok {
return fmt.Errorf("no broker list found to initialize driver")
}

topicRaw, ok := conf["otelExportTopic"]
if !ok {
return fmt.Errorf("no topic to export to")
Expand Down Expand Up @@ -66,17 +74,13 @@ func (s *OTELExporter) Init(conf map[string]interface{}) error {
return fmt.Errorf("invalid config -- (%s) encoding not supported", enc)
}
s.encoding = encStr

s.producer = producer
s.exportTopic = topicStr

return nil
}

func (s *OTELExporter) Register(pc plugins.SFPluginCache) {
pc.AddProcessor(pluginLabel, NewOTELExporter)
}

// Process implements the main interface of the plugin.
func (s *OTELExporter) Process(ch []interface{}, wg *sync.WaitGroup) {
for _, chi := range ch {
cha := chi.(*plugins.Channel[*otel.ResourceLogs])
Expand All @@ -97,7 +101,6 @@ func (s *OTELExporter) Process(ch []interface{}, wg *sync.WaitGroup) {
logger.Trace.Println("Channel closed shutting down")
break
}
// fmt.Printf("Dealing with a record--%s\n", fc)

var msgValue []byte
var err error
Expand Down Expand Up @@ -131,6 +134,14 @@ func (s *OTELExporter) Process(ch []interface{}, wg *sync.WaitGroup) {
}
}

// SetOutChan sets the output channel of the plugin.
func (s *OTELExporter) SetOutChan(ch []interface{}) {}

func (s *OTELExporter) Cleanup() {}
// Cleanup tears down plugin resources.
func (s *OTELExporter) Cleanup() {
logger.Trace.Println("Exiting ", otelPluginName)
if !s.producer.IsClosed() {
s.producer.Flush(3000)
s.producer.Close()
}
}

0 comments on commit 1381c32

Please sign in to comment.