diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index aa8b46e0be..21c04ab41c 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -46,6 +46,7 @@ type envConfig struct { BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"` SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"` SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"` + DeadLetterSinkURL string `envconfig:"DEAD_LETTER_SINK_URL" required:"false"` // Number of concurrent messages in flight Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"` @@ -91,6 +92,7 @@ func main() { BrokerIngressURL: env.BrokerIngressURL, SubscriberURL: env.SubscriberURL, SubscriberCACerts: env.SubscriberCACerts, + DeadLetterSinkURL: env.DeadLetterSinkURL, MaxRetries: env.Retry, BackoffDelay: backoffDelay, Timeout: env.Timeout, diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index c5b292b978..bf170f67c0 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -49,6 +49,7 @@ type Dispatcher struct { BrokerIngressURL string SubscriberURL string SubscriberCACerts string + DeadLetterSinkURL string MaxRetries int BackoffDelay time.Duration Timeout time.Duration @@ -152,6 +153,8 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { start := time.Now() + subscriberURL := d.SubscriberURL + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { @@ -160,20 +163,25 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } - return - } - ctx, span := readSpan(ctx, msg) - defer span.End() - if span.IsRecordingEvents() { - span.AddAttributes(client.EventTraceAttributes(event)...) - } + event.SetExtension("knativeerrordest", d.SubscriberURL) - ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) - if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { - ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) + subscriberURL = d.DeadLetterSinkURL + + ctx = cloudevents.ContextWithTarget(ctx, subscriberURL) } else { - ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) + ctx, span := readSpan(ctx, msg) + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes(client.EventTraceAttributes(event)...) + } + + ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) + if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { + ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } else { + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } } response, result := ceClient.Request(ctx, *event) @@ -186,7 +194,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c } if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + logging.FromContext(ctx).Warnf("Failed to deliver to %q", subscriberURL) if err := msg.Nack(false, false); err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index 57a575100d..b3c1c34d69 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -170,6 +170,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: timeout.DurationApprox().String(), }) } + if args.Delivery.DeadLetterSink != nil { + dispatcher.Env = append(dispatcher.Env, + corev1.EnvVar{ + Name: "DEAD_LETTER_SINK_URL", + Value: args.Delivery.DeadLetterSink.URI.String(), + }) + } } if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok { dispatcher.Env = append(dispatcher.Env,