diff --git a/internal/common/ingest/ingestion_pipeline.go b/internal/common/ingest/ingestion_pipeline.go index 46c3096bb83..404372ef859 100644 --- a/internal/common/ingest/ingestion_pipeline.go +++ b/internal/common/ingest/ingestion_pipeline.go @@ -109,12 +109,38 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error { i.consumer = consumer defer closePulsar() } - pulsarMsgs := i.consumer.Chan() + pulsarMessageChannel := i.consumer.Chan() + pulsarMessages := make(chan pulsar.ConsumerMessage) + + // Consume pulsar messages + // Used to track if we are no longer receiving pulsar messages + go func() { + timeout := time.Minute * 2 + timer := time.NewTimer(timeout) + loop: + for { + if !timer.Stop() { + <-timer.C + } + timer.Reset(timeout) + select { + case msg, ok := <-pulsarMessageChannel: + if !ok { + // Channel closed + break loop + } + pulsarMessages <- msg + case <-timer.C: + log.Infof("No pulsar message received in %s", timeout) + } + } + close(pulsarMessages) + }() // Convert to event sequences eventSequences := make(chan *EventSequencesWithIds) go func() { - for msg := range pulsarMsgs { + for msg := range pulsarMessages { converted := unmarshalEventSequences(msg, i.metrics) eventSequences <- converted } @@ -131,11 +157,24 @@ func (i *IngestionPipeline[T]) Run(ctx *armadacontext.Context) error { close(batchedEventSequences) }() + // Log summary of batch + preprocessedBatchEventSequences := make(chan *EventSequencesWithIds) + go func() { + for msg := range batchedEventSequences { + logSummaryOfEventSequences(msg) + preprocessedBatchEventSequences <- msg + } + close(preprocessedBatchEventSequences) + }() + // Convert to instructions instructions := make(chan T) go func() { - for msg := range batchedEventSequences { + for msg := range preprocessedBatchEventSequences { + start := time.Now() converted := i.converter.Convert(ctx, msg) + taken := time.Now().Sub(start) + log.Infof("Processed %d pulsar messages in %dms", len(msg.MessageIds), taken.Milliseconds()) instructions <- converted } close(instructions) @@ -244,3 +283,16 @@ func combineEventSequences(sequences []*EventSequencesWithIds) *EventSequencesWi EventSequences: combinedSequences, MessageIds: messageIds, } } + +func logSummaryOfEventSequences(sequence *EventSequencesWithIds) { + numberOfEvents := 0 + countOfEventsByType := map[string]int{} + for _, eventSequence := range sequence.EventSequences { + numberOfEvents += len(eventSequence.Events) + for _, e := range eventSequence.Events { + typeString := e.GetEventName() + countOfEventsByType[typeString] = countOfEventsByType[typeString] + 1 + } + } + log.Infof("Batch being processed contains %d event messages and %d events of type %v", len(sequence.MessageIds), numberOfEvents, countOfEventsByType) +} diff --git a/internal/scheduleringester/instructions.go b/internal/scheduleringester/instructions.go index 9b45a3db3d7..509a00f2d94 100644 --- a/internal/scheduleringester/instructions.go +++ b/internal/scheduleringester/instructions.go @@ -57,6 +57,7 @@ func (c *InstructionConverter) Convert(_ *armadacontext.Context, sequencesWithId operations = AppendDbOperation(operations, op) } } + log.Infof("Converted sequences into %d db operations", len(operations)) return &DbOperationsWithMessageIds{ Ops: operations, MessageIds: sequencesWithIds.MessageIds, diff --git a/pkg/armadaevents/events_util.go b/pkg/armadaevents/events_util.go index 0a142522e48..22e3cbf35ba 100644 --- a/pkg/armadaevents/events_util.go +++ b/pkg/armadaevents/events_util.go @@ -167,6 +167,58 @@ func (ev *EventSequence_Event) UnmarshalJSON(data []byte) error { return nil } +func (ev *EventSequence_Event) GetEventName() string { + switch ev.GetEvent().(type) { + case *EventSequence_Event_SubmitJob: + return "SubmitJob" + case *EventSequence_Event_JobRunLeased: + return "JobRunLeased" + case *EventSequence_Event_JobRunRunning: + return "JobRunRunning" + case *EventSequence_Event_JobRunSucceeded: + return "JobRunSucceeded" + case *EventSequence_Event_JobRunErrors: + return "JobRunErrors" + case *EventSequence_Event_JobSucceeded: + return "JobSucceeded" + case *EventSequence_Event_JobErrors: + return "JobErrors" + case *EventSequence_Event_JobPreemptionRequested: + return "JobPreemptionRequested" + case *EventSequence_Event_JobRunPreemptionRequested: + return "JobRunPreemptionRequested" + case *EventSequence_Event_ReprioritiseJob: + return "ReprioritiseJob" + case *EventSequence_Event_ReprioritiseJobSet: + return "ReprioritiseJobSet" + case *EventSequence_Event_CancelJob: + return "CancelJob" + case *EventSequence_Event_CancelJobSet: + return "CancelJobSet" + case *EventSequence_Event_CancelledJob: + return "CancelledJob" + case *EventSequence_Event_JobRunCancelled: + return "JobRunCancelled" + case *EventSequence_Event_JobRequeued: + return "JobRequeued" + case *EventSequence_Event_PartitionMarker: + return "PartitionMarker" + case *EventSequence_Event_JobRunPreempted: + return "JobRunPreemped" + case *EventSequence_Event_JobRunAssigned: + return "JobRunAssigned" + case *EventSequence_Event_JobValidated: + return "JobValidated" + case *EventSequence_Event_ReprioritisedJob: + return "ReprioritisedJob" + case *EventSequence_Event_ResourceUtilisation: + return "ResourceUtilisation" + case *EventSequence_Event_StandaloneIngressInfo: + return "StandloneIngressIngo" + } + return "Unknown" +} + func (kmo *KubernetesMainObject) UnmarshalJSON(data []byte) error { if string(data) == "null" || string(data) == `""` { return nil