Skip to content

Commit

Permalink
Avoid polling in worker
Browse files Browse the repository at this point in the history
The polling architecture with 200 ms sleep in ProcessEventMessages()
has two drawbacks:

- it imposes a rate limit of 5 events per second for any input source
- it imposes an average latency of 100 ms per event

The polling loop can be avoided using a goroutine per worker that
posts incoming events to the `messageStream` channel, and pending on
that channel in ProcessEventMessages().

A short `messageStream` queue guarantees fairness: no worker's latest
event will be postponed for more than the length of the channel.

Signed-off-by: Jeff Learman <[email protected]>
  • Loading branch information
jlearman committed Jun 26, 2024
1 parent c06cf63 commit 6265838
Showing 1 changed file with 27 additions and 44 deletions.
71 changes: 27 additions & 44 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (m *MessageHandlerRegistry) Add(mh interface {
}
}

func (m *MessageHandlerRegistry) Remove(name string) {
func (m *MessageHandlerRegistry) remove(name string) {
if _, ok := m.Handlers[name]; ok {
delete(m.Handlers, name)
}
Expand All @@ -542,7 +542,7 @@ func eventHandler(incoming events.Message, workers *MessageHandlerRegistry) (str
switch incoming.(type) {
case *events.WorkerStopMessage:
msg, _ := incoming.(*events.WorkerStopMessage)
workers.Remove(msg.Name())
workers.remove(msg.Name())
workerStatusManager.SetWorkerStatus(msg.Name(), STATUS_TERMINATED)
return successMsg, nil
}
Expand All @@ -557,65 +557,48 @@ func eventHandler(incoming events.Message, workers *MessageHandlerRegistry) (str
return successMsg, nil
}

// This function combines all messages (events) from workers into a single global message queue. From this
// global queue, each message will get delivered to each worker by the event handler function.
func mux(workers *MessageHandlerRegistry, muxed chan events.Message) chan events.Message {

// Start a goroutine for each worker to forward worker messages into the given "multiplexed" channel.
// From this global queue, each message will get delivered to each worker by the event handler function.
func mux(workers *MessageHandlerRegistry, muxed chan events.Message) {
for _, w := range workers.Handlers {
select {
case ev := <-(*w).Messages():
muxed <- ev
default: // nothing
}
go func(c <-chan events.Message) {
for v := range c {
muxed <- v
}
}((*w).Messages())
}

return muxed
}

func (workers *MessageHandlerRegistry) ProcessEventMessages() {

// 200 messages should be plenty. We will never get more than 1 message from every worker each time
// we write into this stream.
messageStream := make(chan events.Message, 200)
mux(workers, messageStream)

last := int64(0)

// Process messages on the combined worker message queue as they arrive
for {
// Exit the event processing loop if all workers have deregistered.
if workers.IsEmpty() {
glog.V(3).Infof(mdLogString(fmt.Sprintf("Terminating")))
break
}

// Grab messages that are outbound from the workers.
messageStream = mux(workers, messageStream)

// Process any new messages on the combined worker message queue.
done := false
for !done {
select {
case msg := <-messageStream:
glog.V(3).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg.ShortString())))
glog.V(5).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg)))

// Push outbound messages into each worker.
if successMsg, err := eventHandler(msg, workers); err != nil {
// error! do some barfing and then continue
glog.Errorf(mdLogString(fmt.Sprintf("Error occurred handling message: %s, Error: %v\n", msg, err)))
} else {
glog.V(2).Infof(mdLogString(fmt.Sprintf("Success handling message: %s\n", successMsg)))
}
default:
now := time.Now().Unix()
if now-last > 30 {
glog.V(5).Infof(mdLogString(fmt.Sprintf("No incoming messages for router to handle")))
last = now
}
done = true
}
msg, ok := <-messageStream
if !ok {
// channel closed: currently won't happen
glog.V(3).Infof(mdLogString("Muxed channel closed: Terminating"))
break
}

time.Sleep(200 * time.Millisecond)
glog.V(3).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg.ShortString())))
glog.V(5).Infof(mdLogString(fmt.Sprintf("Handling Message (%T): %v\n", msg, msg)))

// Push outbound messages into each worker.
if successMsg, err := eventHandler(msg, workers); err != nil {
// error! do some barfing and then continue
glog.Errorf(mdLogString(fmt.Sprintf("Error occurred handling message: %s, Error: %v\n", msg, err)))
} else {
glog.V(2).Infof(mdLogString(fmt.Sprintf("Success handling message: %s\n", successMsg)))
}
}

// Brief delay just in case.
Expand Down

0 comments on commit 6265838

Please sign in to comment.