Skip to content

Commit

Permalink
Improved ingester logging (#3711)
Browse files Browse the repository at this point in the history
* Make ingesters limit how many events they process at once

Currently we limit how many messages are processed at once

However a single message may contain thousands of events, so limit on messages is a bit flawed
 - A limit of 1000, could mean you're processing 1000 or 1000000+ events at once

This should make the ingesters less prone to long pauses if some large messages come along

Also adjusted event ingester config to be more standard with other config naming

Signed-off-by: JamesMurkin <[email protected]>

* Comment improvement

Signed-off-by: JamesMurkin <[email protected]>

* Comment improvement

Signed-off-by: JamesMurkin <[email protected]>

* gofumpt

Signed-off-by: JamesMurkin <[email protected]>

* Improve config descriptions

Signed-off-by: JamesMurkin <[email protected]>

* Limit Pulsar messages to have a configurable max number of events per message

Currently we can publish very large messages (100k+ events per message)

This can make the time to process messages quite unpredictable, as they can be anywhere between 1 event and 100000+ events

Now we restrict how many messages we put into each message (via `maxAllowedEventsPerMessage`), which should make how many changes a given message may contain somewhat more predictable

Signed-off-by: JamesMurkin <[email protected]>

* Revert "Limit Pulsar messages to have a configurable max number of events per message"

This reverts commit 11a8a2a.

* Improve logging in ingester pipeline

This should help us understand what is happening in our ingestion pipelines

 - Should log if we are no longer receiving pulsar messages for 2mins
 - Will log a summary of how many messages and event in each "batch"
 - Will log a summary of the types of events in each batch
 - Will log a summary of how long Convert took for each batch

This is admittedly quite a "quick" fix and better long term steps would be:
 - Metrics or spans
  - Some of these could be at the ingseter pipeline level (generic)
  - Some would need to be done in each ingester to expose more detailed information such as which query is all the time being spent on

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jun 24, 2024
1 parent b38bd6f commit 3a9207f
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 3 deletions.
58 changes: 55 additions & 3 deletions internal/common/ingest/ingestion_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,38 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error {
i.consumer = consumer
defer closePulsar()
}
pulsarMsgs := i.consumer.Chan()
pulsarMessageChannel := i.consumer.Chan()
pulsarMessages := make(chan pulsar.ConsumerMessage)

// Consume pulsar messages
// Used to track if we are no longer receiving pulsar messages
go func() {
timeout := time.Minute * 2
timer := time.NewTimer(timeout)
loop:
for {
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)
select {
case msg, ok := <-pulsarMessageChannel:
if !ok {
// Channel closed
break loop
}
pulsarMessages <- msg
case <-timer.C:
log.Infof("No pulsar message received in %s", timeout)
}
}
close(pulsarMessages)
}()

// Convert to event sequences
eventSequences := make(chan *EventSequencesWithIds)
go func() {
for msg := range pulsarMsgs {
for msg := range pulsarMessages {
converted := unmarshalEventSequences(msg, i.metrics)
eventSequences <- converted
}
Expand All @@ -131,11 +157,24 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error {
close(batchedEventSequences)
}()

// Log summary of batch
preprocessedBatchEventSequences := make(chan *EventSequencesWithIds)
go func() {
for msg := range batchedEventSequences {
logSummaryOfEventSequences(msg)
preprocessedBatchEventSequences <- msg
}
close(preprocessedBatchEventSequences)
}()

// Convert to instructions
instructions := make(chan T)
go func() {
for msg := range batchedEventSequences {
for msg := range preprocessedBatchEventSequences {
start := time.Now()
converted := i.converter.Convert(ctx, msg)
taken := time.Now().Sub(start)
log.Infof("Processed %d pulsar messages in %dms", len(msg.MessageIds), taken.Milliseconds())
instructions <- converted
}
close(instructions)
Expand Down Expand Up @@ -244,3 +283,16 @@ func combineEventSequences(sequences []*EventSequencesWithIds) *EventSequencesWi
EventSequences: combinedSequences, MessageIds: messageIds,
}
}

func logSummaryOfEventSequences(sequence *EventSequencesWithIds) {
numberOfEvents := 0
countOfEventsByType := map[string]int{}
for _, eventSequence := range sequence.EventSequences {
numberOfEvents += len(eventSequence.Events)
for _, e := range eventSequence.Events {
typeString := e.GetEventName()
countOfEventsByType[typeString] = countOfEventsByType[typeString] + 1
}
}
log.Infof("Batch being processed contains %d event messages and %d events of type %v", len(sequence.MessageIds), numberOfEvents, countOfEventsByType)
}
1 change: 1 addition & 0 deletions internal/scheduleringester/instructions.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (c *InstructionConverter) Convert(_ *armadacontext.Context, sequencesWithId
operations = AppendDbOperation(operations, op)
}
}
log.Infof("Converted sequences into %d db operations", len(operations))
return &DbOperationsWithMessageIds{
Ops: operations,
MessageIds: sequencesWithIds.MessageIds,
Expand Down
52 changes: 52 additions & 0 deletions pkg/armadaevents/events_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,58 @@ func (ev *EventSequence_Event) UnmarshalJSON(data []byte) error {
return nil
}

func (ev *EventSequence_Event) GetEventName() string {
switch ev.GetEvent().(type) {
case *EventSequence_Event_SubmitJob:
return "SubmitJob"
case *EventSequence_Event_JobRunLeased:
return "JobRunLeased"
case *EventSequence_Event_JobRunRunning:
return "JobRunRunning"
case *EventSequence_Event_JobRunSucceeded:
return "JobRunSucceeded"
case *EventSequence_Event_JobRunErrors:
return "JobRunErrors"
case *EventSequence_Event_JobSucceeded:
return "JobSucceeded"
case *EventSequence_Event_JobErrors:
return "JobErrors"
case *EventSequence_Event_JobPreemptionRequested:
return "JobPreemptionRequested"
case *EventSequence_Event_JobRunPreemptionRequested:
return "JobRunPreemptionRequested"
case *EventSequence_Event_ReprioritiseJob:
return "ReprioritiseJob"
case *EventSequence_Event_ReprioritiseJobSet:
return "ReprioritiseJobSet"
case *EventSequence_Event_CancelJob:
return "CancelJob"
case *EventSequence_Event_CancelJobSet:
return "CancelJobSet"
case *EventSequence_Event_CancelledJob:
return "CancelledJob"
case *EventSequence_Event_JobRunCancelled:
return "JobRunCancelled"
case *EventSequence_Event_JobRequeued:
return "JobRequeued"
case *EventSequence_Event_PartitionMarker:
return "PartitionMarker"
case *EventSequence_Event_JobRunPreempted:
return "JobRunPreemped"
case *EventSequence_Event_JobRunAssigned:
return "JobRunAssigned"
case *EventSequence_Event_JobValidated:
return "JobValidated"
case *EventSequence_Event_ReprioritisedJob:
return "ReprioritisedJob"
case *EventSequence_Event_ResourceUtilisation:
return "ResourceUtilisation"
case *EventSequence_Event_StandaloneIngressInfo:
return "StandloneIngressIngo"
}
return "Unknown"
}

func (kmo *KubernetesMainObject) UnmarshalJSON(data []byte) error {
if string(data) == "null" || string(data) == `""` {
return nil
Expand Down

0 comments on commit 3a9207f

Please sign in to comment.