Skip to content

Commit

Permalink
Add knativeerrordest to the event extension when delivery failure
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjohansen committed Jul 7, 2023
1 parent 680e89f commit 52e3b07
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
2 changes: 2 additions & 0 deletions cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type envConfig struct {
BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"`
SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"`
SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"`
DeadLetterSinkURL string `envconfig:"DEAD_LETTER_SINK_URL" required:"false"`

// Number of concurrent messages in flight
Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"`
Expand Down Expand Up @@ -91,6 +92,7 @@ func main() {
BrokerIngressURL: env.BrokerIngressURL,
SubscriberURL: env.SubscriberURL,
SubscriberCACerts: env.SubscriberCACerts,
DeadLetterSinkURL: env.DeadLetterSinkURL,
MaxRetries: env.Retry,
BackoffDelay: backoffDelay,
Timeout: env.Timeout,
Expand Down
32 changes: 20 additions & 12 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Dispatcher struct {
BrokerIngressURL string
SubscriberURL string
SubscriberCACerts string
DeadLetterSinkURL string
MaxRetries int
BackoffDelay time.Duration
Timeout time.Duration
Expand Down Expand Up @@ -152,6 +153,8 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
start := time.Now()
subscriberURL := d.SubscriberURL

msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
Expand All @@ -160,20 +163,25 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}

ctx, span := readSpan(ctx, msg)
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}
event.SetExtension("knativeerrordest", d.SubscriberURL)

ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)
if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
subscriberURL = d.DeadLetterSinkURL

ctx = cloudevents.ContextWithTarget(ctx, subscriberURL)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
ctx, span := readSpan(ctx, msg)
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}

ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)
if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}
}

response, result := ceClient.Request(ctx, *event)
Expand All @@ -186,7 +194,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
logging.FromContext(ctx).Warnf("Failed to deliver to %q", subscriberURL)
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/reconciler/trigger/resources/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment {
Value: timeout.DurationApprox().String(),
})
}
if args.Delivery.DeadLetterSink != nil {
dispatcher.Env = append(dispatcher.Env,
corev1.EnvVar{
Name: "DEAD_LETTER_SINK_URL",
Value: args.Delivery.DeadLetterSink.URI.String(),
})
}
}
if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok {
dispatcher.Env = append(dispatcher.Env,
Expand Down

0 comments on commit 52e3b07

Please sign in to comment.