From 6630c224603fb90b1e59c11cda38ce9a7e94c36c Mon Sep 17 00:00:00 2001 From: sapslaj Date: Mon, 15 Jan 2024 16:18:58 -0500 Subject: [PATCH] refactor: add Transport.PublishMessage func This is mainly a dev convenience thing to make it easier to send raw msgs through the entire pipeline. --- server/common.go | 1 + transport/transport.go | 20 ++++++++++++-------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/server/common.go b/server/common.go index 86935a1..4cc9e8a 100644 --- a/server/common.go +++ b/server/common.go @@ -19,4 +19,5 @@ type Logger interface { type Transport interface { Publish([]*goflowpb.FlowMessage) + PublishMessage(msg map[string]interface{}) } diff --git a/transport/transport.go b/transport/transport.go index ade2dcd..13b41bb 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -109,30 +109,34 @@ func (s *Transport) Publish(fmsgs []*goflowpb.FlowMessage) { } } -func (s *Transport) messageWorkerPublish(fmsg *goflowpb.FlowMessage) { - MetricFlowMessageCount.Inc() - ffmsg := s.FormatFlowMessage(fmsg) +func (s *Transport) PublishMessage(msg map[string]interface{}) { for _, enricher := range s.Enrichers { - ffmsg = enricher.Process(ffmsg) + msg = enricher.Process(msg) } if s.ParallelizeDestinations { var wg sync.WaitGroup for _, d := range s.Destinations { wg.Add(1) - go func(d destination.Destination, ffmsg map[string]interface{}) { + go func(d destination.Destination, msg map[string]interface{}) { defer wg.Done() - d.Publish(ffmsg) - }(d, ffmsg) + d.Publish(msg) + }(d, msg) } wg.Wait() } else { for _, d := range s.Destinations { - d.Publish(ffmsg) + d.Publish(msg) } } } +func (s *Transport) messageWorkerPublish(fmsg *goflowpb.FlowMessage) { + MetricFlowMessageCount.Inc() + msg := s.FormatFlowMessage(fmsg) + s.PublishMessage(msg) +} + func (s *Transport) FormatFlowMessage(fmsg *goflowpb.FlowMessage) map[string]interface{} { msg := make(map[string]interface{})