Skip to content

Commit

Permalink
refactor: add Transport.PublishMessage func
Browse files Browse the repository at this point in the history
This is mainly a dev convenience thing to make it easier to send raw msgs through the entire pipeline.
  • Loading branch information
sapslaj committed Feb 3, 2024
1 parent c751d23 commit 6630c22
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
1 change: 1 addition & 0 deletions server/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ type Logger interface {

type Transport interface {
Publish([]*goflowpb.FlowMessage)
PublishMessage(msg map[string]interface{})
}
20 changes: 12 additions & 8 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,30 +109,34 @@ func (s *Transport) Publish(fmsgs []*goflowpb.FlowMessage) {
}
}

func (s *Transport) messageWorkerPublish(fmsg *goflowpb.FlowMessage) {
MetricFlowMessageCount.Inc()
ffmsg := s.FormatFlowMessage(fmsg)
func (s *Transport) PublishMessage(msg map[string]interface{}) {
for _, enricher := range s.Enrichers {
ffmsg = enricher.Process(ffmsg)
msg = enricher.Process(msg)
}

if s.ParallelizeDestinations {
var wg sync.WaitGroup
for _, d := range s.Destinations {
wg.Add(1)
go func(d destination.Destination, ffmsg map[string]interface{}) {
go func(d destination.Destination, msg map[string]interface{}) {
defer wg.Done()
d.Publish(ffmsg)
}(d, ffmsg)
d.Publish(msg)
}(d, msg)
}
wg.Wait()
} else {
for _, d := range s.Destinations {
d.Publish(ffmsg)
d.Publish(msg)
}
}
}

func (s *Transport) messageWorkerPublish(fmsg *goflowpb.FlowMessage) {
MetricFlowMessageCount.Inc()
msg := s.FormatFlowMessage(fmsg)
s.PublishMessage(msg)
}

func (s *Transport) FormatFlowMessage(fmsg *goflowpb.FlowMessage) map[string]interface{} {
msg := make(map[string]interface{})

Expand Down

0 comments on commit 6630c22

Please sign in to comment.