Skip to content

Commit

Permalink
Handle error and send an event for it
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjohansen committed Jul 16, 2023
1 parent faea709 commit d38fc3d
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,15 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {
func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) {
start := time.Now()
if _, ok := msg.Headers["knativeerrordest"]; ok {
if err := msg.Ack(false); err != nil {
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to Nack event: ", err)
}
return
}

ctx, span := readSpan(ctx, msg)
defer span.End()

msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
Expand All @@ -168,11 +171,17 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
if err = msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to Nack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)

if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}

return
}

ctx, span := readSpan(ctx, msg)
defer span.End()

if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}
Expand Down Expand Up @@ -217,7 +226,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
Expand Down

0 comments on commit d38fc3d

Please sign in to comment.