From 52e3b07894eb6329b5aeb939740d965ec0012553 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Fri, 7 Jul 2023 12:46:36 +0200 Subject: [PATCH 01/29] Add knativeerrordest to the event extension when delivery failure --- cmd/dispatcher/main.go | 2 ++ pkg/dispatcher/dispatcher.go | 32 ++++++++++++------- .../trigger/resources/dispatcher.go | 7 ++++ 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index aa8b46e0be..21c04ab41c 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -46,6 +46,7 @@ type envConfig struct { BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"` SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"` SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"` + DeadLetterSinkURL string `envconfig:"DEAD_LETTER_SINK_URL" required:"false"` // Number of concurrent messages in flight Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"` @@ -91,6 +92,7 @@ func main() { BrokerIngressURL: env.BrokerIngressURL, SubscriberURL: env.SubscriberURL, SubscriberCACerts: env.SubscriberCACerts, + DeadLetterSinkURL: env.DeadLetterSinkURL, MaxRetries: env.Retry, BackoffDelay: backoffDelay, Timeout: env.Timeout, diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index c5b292b978..bf170f67c0 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -49,6 +49,7 @@ type Dispatcher struct { BrokerIngressURL string SubscriberURL string SubscriberCACerts string + DeadLetterSinkURL string MaxRetries int BackoffDelay time.Duration Timeout time.Duration @@ -152,6 +153,8 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { start := time.Now() + subscriberURL := d.SubscriberURL + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { @@ -160,20 +163,25 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } - return - } - ctx, span := readSpan(ctx, msg) - defer span.End() - if span.IsRecordingEvents() { - span.AddAttributes(client.EventTraceAttributes(event)...) - } + event.SetExtension("knativeerrordest", d.SubscriberURL) - ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) - if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { - ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) + subscriberURL = d.DeadLetterSinkURL + + ctx = cloudevents.ContextWithTarget(ctx, subscriberURL) } else { - ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) + ctx, span := readSpan(ctx, msg) + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes(client.EventTraceAttributes(event)...) + } + + ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) + if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { + ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } else { + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } } response, result := ceClient.Request(ctx, *event) @@ -186,7 +194,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c } if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + logging.FromContext(ctx).Warnf("Failed to deliver to %q", subscriberURL) if err := msg.Nack(false, false); err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index 57a575100d..b3c1c34d69 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -170,6 +170,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: timeout.DurationApprox().String(), }) } + if args.Delivery.DeadLetterSink != nil { + dispatcher.Env = append(dispatcher.Env, + corev1.EnvVar{ + Name: "DEAD_LETTER_SINK_URL", + Value: args.Delivery.DeadLetterSink.URI.String(), + }) + } } if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok { dispatcher.Env = append(dispatcher.Env, From 24c39af67d8f426cefe0e643a68397257e9e2b4d Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 10 Jul 2023 14:42:15 +0200 Subject: [PATCH 02/29] Check if the extension is set, else set it. If not act like nothing happend --- pkg/dispatcher/dispatcher.go | 57 ++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index bf170f67c0..f68ceb2fe9 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -153,59 +153,64 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { start := time.Now() - subscriberURL := d.SubscriberURL msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { - logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) - err = msg.Nack(false, false) - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) - } + extensions := event.Extensions() + if _, ok := extensions["knativeerrordest"]; ok { + logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) + err = msg.Nack(false, false) + if err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } - event.SetExtension("knativeerrordest", d.SubscriberURL) + return + } else { + event.SetExtension("knativeerrordest", d.DeadLetterSinkURL) + } + } - subscriberURL = d.DeadLetterSinkURL + ctx, span := readSpan(ctx, msg) + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes(client.EventTraceAttributes(event)...) + } - ctx = cloudevents.ContextWithTarget(ctx, subscriberURL) + ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) + if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { + ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) } else { - ctx, span := readSpan(ctx, msg) - defer span.End() - if span.IsRecordingEvents() { - span.AddAttributes(client.EventTraceAttributes(event)...) - } - - ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) - if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { - ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) - } else { - ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) - } + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) } response, result := ceClient.Request(ctx, *event) statusCode, isSuccess := getStatus(ctx, result) if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} - if err := d.Reporter.ReportEventCount(args, statusCode); err != nil { + if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { logging.FromContext(ctx).Errorf("Something happened: %v", err) + + event.SetExtension("knativeerrorcode", statusCode) } } if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", subscriberURL) - if err := msg.Nack(false, false); err != nil { + logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + if err = msg.Nack(false, false); err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } return } else if response != nil { logging.FromContext(ctx).Infof("Sending an event: %+v", response) ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) - result := ceClient.Send(ctx, *response) - _, isSuccess := getStatus(ctx, result) + result = ceClient.Send(ctx, *response) + _, isSuccess = getStatus(ctx, result) if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) + + event.SetExtension("knativeerrordata", result) + err = msg.Nack(false, false) // not multiple if err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) From 4250b0940914aed044bf816962e6a67eec96ccc0 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 10 Jul 2023 15:07:12 +0200 Subject: [PATCH 03/29] Cleanup and requeue variable --- cmd/dispatcher/main.go | 2 -- pkg/dispatcher/dispatcher.go | 12 ++++++++---- pkg/reconciler/trigger/resources/dispatcher.go | 7 ------- 3 files changed, 8 insertions(+), 13 deletions(-) diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index 21c04ab41c..aa8b46e0be 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -46,7 +46,6 @@ type envConfig struct { BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"` SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"` SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"` - DeadLetterSinkURL string `envconfig:"DEAD_LETTER_SINK_URL" required:"false"` // Number of concurrent messages in flight Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"` @@ -92,7 +91,6 @@ func main() { BrokerIngressURL: env.BrokerIngressURL, SubscriberURL: env.SubscriberURL, SubscriberCACerts: env.SubscriberCACerts, - DeadLetterSinkURL: env.DeadLetterSinkURL, MaxRetries: env.Retry, BackoffDelay: backoffDelay, Timeout: env.Timeout, diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index f68ceb2fe9..60cd3abbf9 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -49,7 +49,6 @@ type Dispatcher struct { BrokerIngressURL string SubscriberURL string SubscriberCACerts string - DeadLetterSinkURL string MaxRetries int BackoffDelay time.Duration Timeout time.Duration @@ -154,6 +153,8 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { start := time.Now() + requeue := false + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { @@ -167,7 +168,8 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c return } else { - event.SetExtension("knativeerrordest", d.DeadLetterSinkURL) + requeue = true + event.SetExtension("knativeerrordest", d.SubscriberURL) } } @@ -191,13 +193,14 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { logging.FromContext(ctx).Errorf("Something happened: %v", err) + requeue = true event.SetExtension("knativeerrorcode", statusCode) } } if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) - if err = msg.Nack(false, false); err != nil { + if err = msg.Nack(false, requeue); err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } return @@ -209,9 +212,10 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) + requeue = true event.SetExtension("knativeerrordata", result) - err = msg.Nack(false, false) // not multiple + err = msg.Nack(false, requeue) // not multiple if err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index b3c1c34d69..57a575100d 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -170,13 +170,6 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: timeout.DurationApprox().String(), }) } - if args.Delivery.DeadLetterSink != nil { - dispatcher.Env = append(dispatcher.Env, - corev1.EnvVar{ - Name: "DEAD_LETTER_SINK_URL", - Value: args.Delivery.DeadLetterSink.URI.String(), - }) - } } if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok { dispatcher.Env = append(dispatcher.Env, From 51afdb763b4d768930e5e880cd5f42cdcb5108a0 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 10 Jul 2023 15:30:27 +0200 Subject: [PATCH 04/29] Set hedders in msg and requeue --- pkg/dispatcher/dispatcher.go | 41 ++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 60cd3abbf9..ae44e8efc7 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -152,25 +152,25 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { start := time.Now() - - requeue := false - + if _, ok := msg.Headers["knativeerrordest"]; !ok { + err := msg.Nack(false, false) + if err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } + return + } + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { - extensions := event.Extensions() - if _, ok := extensions["knativeerrordest"]; ok { - logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) - err = msg.Nack(false, false) - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) - } + logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err) - return - } else { - requeue = true - event.SetExtension("knativeerrordest", d.SubscriberURL) + msg.Headers["knativeerrordest"] = d.SubscriberURL + err = msg.Nack(false, true) + if err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) } + return } ctx, span := readSpan(ctx, msg) @@ -193,14 +193,14 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { logging.FromContext(ctx).Errorf("Something happened: %v", err) - requeue = true - event.SetExtension("knativeerrorcode", statusCode) + msg.Headers["knativeerrordest"] = d.SubscriberURL + msg.Headers["knativeerrorcode"] = statusCode } } if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) - if err = msg.Nack(false, requeue); err != nil { + if err = msg.Nack(false, true); err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } return @@ -212,10 +212,11 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) - requeue = true - event.SetExtension("knativeerrordata", result) + msg.Headers["knativeerrordest"] = d.SubscriberURL + msg.Headers["knativeerrorcode"] = statusCode + msg.Headers["knativeerrordata"] = result - err = msg.Nack(false, requeue) // not multiple + err = msg.Nack(false, true) // not multiple if err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } From 69ef0b3237e61e44e3791e344f2f9162e4dda41b Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 10 Jul 2023 15:58:58 +0200 Subject: [PATCH 05/29] Lint --- pkg/dispatcher/dispatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index ae44e8efc7..9683c5261e 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -159,7 +159,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c } return } - + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { From 30fdc1b38df62e70fd8d04866a65b38e6183216e Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 10 Jul 2023 22:46:14 +0200 Subject: [PATCH 06/29] Wrong place to set headers --- pkg/dispatcher/dispatcher.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 9683c5261e..3f6b156ea9 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -192,14 +192,12 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c args := &dispatcher.ReportArgs{EventType: event.Type()} if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { logging.FromContext(ctx).Errorf("Something happened: %v", err) - - msg.Headers["knativeerrordest"] = d.SubscriberURL - msg.Headers["knativeerrorcode"] = statusCode } } if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + msg.Headers["knativeerrordest"] = d.SubscriberURL if err = msg.Nack(false, true); err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } From 0c426b7dc14fa28ca0447288609e40eec370f1bf Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 10 Jul 2023 22:49:32 +0200 Subject: [PATCH 07/29] Forgot the status code --- pkg/dispatcher/dispatcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 3f6b156ea9..16398b4acc 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -198,6 +198,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) msg.Headers["knativeerrordest"] = d.SubscriberURL + msg.Headers["knativeerrorcode"] = statusCode if err = msg.Nack(false, true); err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) } From 095834aa1f0903c8ff310b64c0dc4a3e982ff9e1 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Sat, 15 Jul 2023 16:06:59 +0200 Subject: [PATCH 08/29] Nack if there's a header --- pkg/dispatcher/dispatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 16398b4acc..4809454961 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -152,7 +152,7 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { start := time.Now() - if _, ok := msg.Headers["knativeerrordest"]; !ok { + if _, ok := msg.Headers["knativeerrordest"]; ok { err := msg.Nack(false, false) if err != nil { logging.FromContext(ctx).Warn("failed to NACK event: ", err) From faea709a6a18330848296b2a237cdba74c3da658 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Sun, 16 Jul 2023 20:22:06 +0200 Subject: [PATCH 09/29] Send event directly to RabbitMQ --- pkg/dispatcher/dispatcher.go | 74 +++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 22 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 4809454961..1e0717ebd9 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -18,6 +18,7 @@ package dispatcher import ( "context" + "fmt" "net/http" "sync" "time" @@ -101,7 +102,7 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC go func() { defer wg.Done() for msg := range workerQueue { - d.dispatch(ctx, msg, ceClient) + d.dispatch(ctx, msg, ceClient, channel) } }() } @@ -150,12 +151,11 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { return -1, false } -func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { +func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) { start := time.Now() if _, ok := msg.Headers["knativeerrordest"]; ok { - err := msg.Nack(false, false) - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + if err := msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Nack event: ", err) } return } @@ -165,10 +165,8 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err != nil { logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err) - msg.Headers["knativeerrordest"] = d.SubscriberURL - err = msg.Nack(false, true) - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + if err = msg.Nack(false, false); err != nil { + logging.FromContext(ctx).Warn("failed to Nack event: ", err) } return } @@ -197,10 +195,19 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) - msg.Headers["knativeerrordest"] = d.SubscriberURL - msg.Headers["knativeerrorcode"] = statusCode - if err = msg.Nack(false, true); err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + + // We need to ack the original message. + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) + } + + // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery + event.SetExtension("knativeerrordest", d.SubscriberURL) + event.SetExtension("knativeerrorcode", statusCode) + + // Queue the event into DLQ with the correct headers. + if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { + logging.FromContext(ctx).Warn("failed to send event: ", err) } return } else if response != nil { @@ -210,22 +217,27 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c _, isSuccess = getStatus(ctx, result) if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) + + // We need to ack the original message. + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) + } - msg.Headers["knativeerrordest"] = d.SubscriberURL - msg.Headers["knativeerrorcode"] = statusCode - msg.Headers["knativeerrordata"] = result + // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery + event.SetExtension("knativeerrordest", d.SubscriberURL) + event.SetExtension("knativeerrorcode", statusCode) + event.SetExtension("knativeerrordata", result) - err = msg.Nack(false, true) // not multiple - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + // Queue the event into DLQ with the correct headers. + if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { + logging.FromContext(ctx).Warn("failed to send event: ", err) } return } } - err = msg.Ack(false) - if err != nil { - logging.FromContext(ctx).Warn("failed to ACK event: ", err) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} @@ -234,6 +246,24 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c } } +func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error { + tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext()) + dc, err := channel.PublishWithDeferredConfirm( + exchangeName, + "", // routing key + false, // mandatory + false, // immediate + *rabbit.CloudEventToRabbitMQMessage(event, tp, ts)) + if err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + if ack := dc.Wait(); !ack { + return errors.New("failed to publish message: nacked") + } + return nil +} + func readSpan(ctx context.Context, msg amqp.Delivery) (context.Context, *trace.Span) { traceparent, ok := msg.Headers["traceparent"].(string) if !ok { From d38fc3d18b0c084bfbbb6dca17951f2068bd6a2e Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Sun, 16 Jul 2023 20:56:43 +0200 Subject: [PATCH 10/29] Handle error and send an event for it --- pkg/dispatcher/dispatcher.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 1e0717ebd9..774bbb41f8 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -154,12 +154,15 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) { start := time.Now() if _, ok := msg.Headers["knativeerrordest"]; ok { - if err := msg.Ack(false); err != nil { + if err := msg.Nack(false, false); err != nil { logging.FromContext(ctx).Warn("failed to Nack event: ", err) } return } + ctx, span := readSpan(ctx, msg) + defer span.End() + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { @@ -168,11 +171,17 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err = msg.Nack(false, false); err != nil { logging.FromContext(ctx).Warn("failed to Nack event: ", err) } + + // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery + event.SetExtension("knativeerrordest", d.SubscriberURL) + + if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { + logging.FromContext(ctx).Warn("failed to send event: ", err) + } + return } - - ctx, span := readSpan(ctx, msg) - defer span.End() + if span.IsRecordingEvents() { span.AddAttributes(client.EventTraceAttributes(event)...) } @@ -217,7 +226,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c _, isSuccess = getStatus(ctx, result) if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) - + // We need to ack the original message. if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) From 6be7192fbe2ac399deea6615a2c5c937826cde58 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 17 Jul 2023 13:35:37 +0200 Subject: [PATCH 11/29] Add functions to handle DLQ in the new way --- cmd/dispatcher/main.go | 2 + pkg/dispatcher/dispatcher.go | 78 ++++++++++++++++++- .../trigger/resources/dispatcher.go | 7 ++ 3 files changed, 84 insertions(+), 3 deletions(-) diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index aa8b46e0be..880dd9a53a 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -46,6 +46,7 @@ type envConfig struct { BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"` SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"` SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"` + DLX bool `envconfig:"DLX" required:"false"` // Number of concurrent messages in flight Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"` @@ -97,6 +98,7 @@ func main() { BackoffPolicy: backoffPolicy, WorkerCount: env.Parallelism, Reporter: reporter, + DLX: env.DLX, } var err error diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 774bbb41f8..f29fa8743f 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -56,6 +56,7 @@ type Dispatcher struct { BackoffPolicy eventingduckv1.BackoffPolicyType WorkerCount int Reporter dispatcher.StatsReporter + DLX bool } // ConsumeFromQueue consumes messages from the given message channel and queue. @@ -102,7 +103,11 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC go func() { defer wg.Done() for msg := range workerQueue { - d.dispatch(ctx, msg, ceClient, channel) + if d.DLX { + d.dispatchDLQ(ctx, msg, ceClient, channel) + } else { + d.dispatch(ctx, msg, ceClient) + } } }() } @@ -151,7 +156,74 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { return -1, false } -func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) { +func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { + start := time.Now() + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) + event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) + if err != nil { + logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) + err = msg.Nack(false, false) + if err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } + return + } + + ctx, span := readSpan(ctx, msg) + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes(client.EventTraceAttributes(event)...) + } + + ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) + if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { + ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } else { + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } + + response, result := ceClient.Request(ctx, *event) + statusCode, isSuccess := getStatus(ctx, result) + if statusCode != -1 { + args := &dispatcher.ReportArgs{EventType: event.Type()} + if err := d.Reporter.ReportEventCount(args, statusCode); err != nil { + logging.FromContext(ctx).Errorf("Something happened: %v", err) + } + } + + if !isSuccess { + logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + if err := msg.Nack(false, false); err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } + return + } else if response != nil { + logging.FromContext(ctx).Infof("Sending an event: %+v", response) + ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) + result := ceClient.Send(ctx, *response) + _, isSuccess := getStatus(ctx, result) + if !isSuccess { + logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) + err = msg.Nack(false, false) // not multiple + if err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } + return + } + } + + err = msg.Ack(false) + if err != nil { + logging.FromContext(ctx).Warn("failed to ACK event: ", err) + } + if statusCode != -1 { + args := &dispatcher.ReportArgs{EventType: event.Type()} + dispatchTime := time.Since(start) + _ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime) + } +} + +func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) { start := time.Now() if _, ok := msg.Headers["knativeerrordest"]; ok { if err := msg.Nack(false, false); err != nil { @@ -181,7 +253,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c return } - + if span.IsRecordingEvents() { span.AddAttributes(client.EventTraceAttributes(event)...) } diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index 57a575100d..9fca19a89b 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -198,6 +198,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: *args.Subscriber.CACerts, }) } + if args.DLX { + dispatcher.Env = append(dispatcher.Env, + corev1.EnvVar{ + Name: "DLX", + Value: "true", + }) + } deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Trigger.Namespace, From 5fb51ca43119b6aa08a920cb550b1dbde9b6a0d4 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 17 Jul 2023 14:03:26 +0200 Subject: [PATCH 12/29] Moved things around --- pkg/dispatcher/dispatcher.go | 140 +++++++++++++++++------------------ 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index f29fa8743f..fe1ba6343e 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -104,9 +104,9 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC defer wg.Done() for msg := range workerQueue { if d.DLX { - d.dispatchDLQ(ctx, msg, ceClient, channel) + d.dispatchDLQ(ctx, msg, ceClient) } else { - d.dispatch(ctx, msg, ceClient) + d.dispatch(ctx, msg, ceClient, channel) } } }() @@ -156,74 +156,7 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { return -1, false } -func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { - start := time.Now() - msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) - event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) - if err != nil { - logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) - err = msg.Nack(false, false) - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) - } - return - } - - ctx, span := readSpan(ctx, msg) - defer span.End() - if span.IsRecordingEvents() { - span.AddAttributes(client.EventTraceAttributes(event)...) - } - - ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) - if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { - ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) - } else { - ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) - } - - response, result := ceClient.Request(ctx, *event) - statusCode, isSuccess := getStatus(ctx, result) - if statusCode != -1 { - args := &dispatcher.ReportArgs{EventType: event.Type()} - if err := d.Reporter.ReportEventCount(args, statusCode); err != nil { - logging.FromContext(ctx).Errorf("Something happened: %v", err) - } - } - - if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) - if err := msg.Nack(false, false); err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) - } - return - } else if response != nil { - logging.FromContext(ctx).Infof("Sending an event: %+v", response) - ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) - result := ceClient.Send(ctx, *response) - _, isSuccess := getStatus(ctx, result) - if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) - err = msg.Nack(false, false) // not multiple - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) - } - return - } - } - - err = msg.Ack(false) - if err != nil { - logging.FromContext(ctx).Warn("failed to ACK event: ", err) - } - if statusCode != -1 { - args := &dispatcher.ReportArgs{EventType: event.Type()} - dispatchTime := time.Since(start) - _ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime) - } -} - -func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) { +func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) { start := time.Now() if _, ok := msg.Headers["knativeerrordest"]; ok { if err := msg.Nack(false, false); err != nil { @@ -327,6 +260,73 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien } } +func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { + start := time.Now() + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) + event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) + if err != nil { + logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) + err = msg.Nack(false, false) + if err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } + return + } + + ctx, span := readSpan(ctx, msg) + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes(client.EventTraceAttributes(event)...) + } + + ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) + if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { + ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } else { + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } + + response, result := ceClient.Request(ctx, *event) + statusCode, isSuccess := getStatus(ctx, result) + if statusCode != -1 { + args := &dispatcher.ReportArgs{EventType: event.Type()} + if err := d.Reporter.ReportEventCount(args, statusCode); err != nil { + logging.FromContext(ctx).Errorf("Something happened: %v", err) + } + } + + if !isSuccess { + logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + if err := msg.Nack(false, false); err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } + return + } else if response != nil { + logging.FromContext(ctx).Infof("Sending an event: %+v", response) + ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) + result := ceClient.Send(ctx, *response) + _, isSuccess := getStatus(ctx, result) + if !isSuccess { + logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) + err = msg.Nack(false, false) // not multiple + if err != nil { + logging.FromContext(ctx).Warn("failed to NACK event: ", err) + } + return + } + } + + err = msg.Ack(false) + if err != nil { + logging.FromContext(ctx).Warn("failed to ACK event: ", err) + } + if statusCode != -1 { + args := &dispatcher.ReportArgs{EventType: event.Type()} + dispatchTime := time.Since(start) + _ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime) + } +} + func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error { tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext()) dc, err := channel.PublishWithDeferredConfirm( From cf22797f3cacea1ef49e0f40c5feca6c49b464c3 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 17 Jul 2023 14:16:39 +0200 Subject: [PATCH 13/29] Added missing arg --- pkg/reconciler/broker/resources/dispatcher.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/reconciler/broker/resources/dispatcher.go b/pkg/reconciler/broker/resources/dispatcher.go index 84c4e128ae..00633a81ba 100644 --- a/pkg/reconciler/broker/resources/dispatcher.go +++ b/pkg/reconciler/broker/resources/dispatcher.go @@ -55,6 +55,7 @@ type DispatcherArgs struct { BrokerUrlSecretKey string BrokerIngressURL *apis.URL Subscriber *duckv1.Addressable + DLX bool Configs reconcilersource.ConfigAccessor ResourceRequirements corev1.ResourceRequirements } @@ -150,6 +151,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: *args.Subscriber.CACerts, }) } + if args.DLX { + envs = append(envs, + corev1.EnvVar{ + Name: "DLX", + Value: "true", + }) + } // Default requirements only if none of the requirements are set through annotations if len(args.ResourceRequirements.Limits) == 0 && len(args.ResourceRequirements.Requests) == 0 { // This resource requests and limits comes from performance testing 1500msgs/s with a parallelism of 1000 From 8db07c6041997620b6bb6889d325c015eb72c10f Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 17 Jul 2023 14:45:51 +0200 Subject: [PATCH 14/29] We should always Ack in dispatch to DLQ --- pkg/dispatcher/dispatcher.go | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index fe1ba6343e..0d27748047 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -260,15 +260,15 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c } } +// Defaulting to Ack as this always hits the DLQ. func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { start := time.Now() msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { - logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) - err = msg.Nack(false, false) - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + logging.FromContext(ctx).Warn("failed creating event from delivery, err: ", err) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } return } @@ -290,27 +290,26 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien statusCode, isSuccess := getStatus(ctx, result) if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} - if err := d.Reporter.ReportEventCount(args, statusCode); err != nil { + if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { logging.FromContext(ctx).Errorf("Something happened: %v", err) } } if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) - if err := msg.Nack(false, false); err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } return } else if response != nil { logging.FromContext(ctx).Infof("Sending an event: %+v", response) ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) - result := ceClient.Send(ctx, *response) - _, isSuccess := getStatus(ctx, result) + result = ceClient.Send(ctx, *response) + _, isSuccess = getStatus(ctx, result) if !isSuccess { logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) - err = msg.Nack(false, false) // not multiple - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } return } @@ -318,7 +317,7 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien err = msg.Ack(false) if err != nil { - logging.FromContext(ctx).Warn("failed to ACK event: ", err) + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} From 4b1535f3d8f3aabdf3a8b7a19bca609ca9568602 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Wed, 19 Jul 2023 10:23:20 +0200 Subject: [PATCH 15/29] Confirm mode. Added missing DLX argument --- cmd/dispatcher/main.go | 15 ++++++++++++++- pkg/reconciler/broker/broker.go | 1 + 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index 880dd9a53a..948cbe3463 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -103,7 +103,20 @@ func main() { var err error rmqHelper := rabbit.NewRabbitMQConnectionHandler(1000, logger) - rmqHelper.Setup(ctx, rabbit.VHostHandler(env.RabbitURL, env.RabbitMQVhost), rabbit.ChannelQoS, rabbit.DialWrapper) + rmqHelper.Setup(ctx, rabbit.VHostHandler( + env.RabbitURL, + env.RabbitMQVhost), + func(c rabbit.RabbitMQConnectionInterface, ch rabbit.RabbitMQChannelInterface) error { + if err := rabbit.ChannelQoS(c, ch); err != nil { + return err + } + + if err := rabbit.ChannelConfirm(c, ch); err != nil { + return err + } + return nil + }, + rabbit.DialWrapper) for { if err = d.ConsumeFromQueue(ctx, rmqHelper.GetConnection(), rmqHelper.GetChannel(), env.QueueName); err != nil { if errors.Is(err, context.Canceled) { diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 43db2006f4..22265dbab3 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -245,6 +245,7 @@ func (r *Reconciler) reconcileDLXDispatcherDeployment(ctx context.Context, b *ev BrokerIngressURL: b.Status.Address.URL, Configs: r.configs, ResourceRequirements: requirements, + DLX: true, }) return r.reconcileDeployment(ctx, expected) } From 031cacc767febb807a4be5c7e77729317ff8eba5 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Wed, 19 Jul 2023 11:11:22 +0200 Subject: [PATCH 16/29] Fix for DLX --- pkg/reconciler/broker/broker_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index ca957dedb6..1cf054a576 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -1543,6 +1543,7 @@ func createDispatcherDeployment(opts ...func(*resources.DispatcherArgs)) *appsv1 BrokerUrlSecretKey: "brokerURL", BrokerIngressURL: brokerAddress, Subscriber: DLSAddressable, + DLX: true, } for _, o := range opts { o(args) From 0c14e465de000f1f8a58f23c5116fb05a6461b62 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Wed, 19 Jul 2023 12:53:06 +0200 Subject: [PATCH 17/29] Missed variable --- pkg/reconciler/trigger/resources/dispatcher_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/reconciler/trigger/resources/dispatcher_test.go b/pkg/reconciler/trigger/resources/dispatcher_test.go index a118b1c792..d2235be53a 100644 --- a/pkg/reconciler/trigger/resources/dispatcher_test.go +++ b/pkg/reconciler/trigger/resources/dispatcher_test.go @@ -84,6 +84,7 @@ func TestMakeDispatcherDeployment(t *testing.T) { want: deployment( deploymentNamed("testtrigger-dlx-dispatcher"), withEnv(corev1.EnvVar{Name: "PARALLELISM", Value: "1"}), + withEnv(corev1.EnvVar{Name: "DLX", Value: "true"}), withEnv(corev1.EnvVar{Name: "POD_NAME", Value: "testtrigger-dlx-dispatcher"}), withDefaultResourceRequirements(), ), From 4a697267173be7d4e9f5a5785713ca591b57f9e8 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Sat, 22 Jul 2023 20:25:11 +0200 Subject: [PATCH 18/29] Initial testing work --- pkg/dispatcher/dispatcher.go | 67 ++++++++------ pkg/dispatcher/dispatcher_test.go | 148 +++++++++++++++++++++++++++++- 2 files changed, 184 insertions(+), 31 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 0d27748047..88f5c3ab93 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -93,7 +93,7 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC } logging.FromContext(ctx).Info("rabbitmq receiver started, exit with CTRL+C") - logging.FromContext(ctx).Infow("Starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount)) + logging.FromContext(ctx).Infow("starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount)) wg := &sync.WaitGroup{} wg.Add(d.WorkerCount) @@ -104,9 +104,9 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC defer wg.Done() for msg := range workerQueue { if d.DLX { - d.dispatchDLQ(ctx, msg, ceClient) + _ = d.dispatchDLQ(ctx, msg, ceClient) } else { - d.dispatch(ctx, msg, ceClient, channel) + _ = d.dispatch(ctx, msg, ceClient, channel) } } }() @@ -148,21 +148,22 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil) return httpResult.StatusCode, !retry } - logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v", retriesResult.Result) + logging.FromContext(ctx).Warnf("invalid result type, not HTTP Result: %v", retriesResult.Result) return -1, false } - logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult") + logging.FromContext(ctx).Warnf("invalid result type, not RetriesResult") return -1, false } -func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) { +func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) error { start := time.Now() if _, ok := msg.Headers["knativeerrordest"]; ok { - if err := msg.Nack(false, false); err != nil { + err := msg.Nack(false, false) + if err != nil { logging.FromContext(ctx).Warn("failed to Nack event: ", err) } - return + return fmt.Errorf("failed to Nack event: %s", err) } ctx, span := readSpan(ctx, msg) @@ -171,20 +172,24 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { - logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err) + if event == nil { + logging.FromContext(ctx).Error("failed parsing event: ", err) + } else { + logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err) - if err = msg.Nack(false, false); err != nil { - logging.FromContext(ctx).Warn("failed to Nack event: ", err) - } + // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery + event.SetExtension("knativeerrordest", d.SubscriberURL) - // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery - event.SetExtension("knativeerrordest", d.SubscriberURL) + if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { + logging.FromContext(ctx).Warn("failed to send event: ", err) + } + } - if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { - logging.FromContext(ctx).Warn("failed to send event: ", err) + if err = msg.Nack(false, false); err != nil { + logging.FromContext(ctx).Warn("failed to Nack event: ", err) } - return + return fmt.Errorf("failed parsing event: %s", err) } if span.IsRecordingEvents() { @@ -203,12 +208,12 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { - logging.FromContext(ctx).Errorf("Something happened: %v", err) + logging.FromContext(ctx).Errorf("something happened: %v", err) } } if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL) // We need to ack the original message. if err = msg.Ack(false); err != nil { @@ -223,14 +228,14 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { logging.FromContext(ctx).Warn("failed to send event: ", err) } - return + return fmt.Errorf("failed to deliver to %q", d.SubscriberURL) } else if response != nil { - logging.FromContext(ctx).Infof("Sending an event: %+v", response) + logging.FromContext(ctx).Infof("sending an event: %+v", response) ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) result = ceClient.Send(ctx, *response) _, isSuccess = getStatus(ctx, result) if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) + logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL) // We need to ack the original message. if err = msg.Ack(false); err != nil { @@ -246,7 +251,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { logging.FromContext(ctx).Warn("failed to send event: ", err) } - return + return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL) } } @@ -258,10 +263,11 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c dispatchTime := time.Since(start) _ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime) } + return nil } // Defaulting to Ack as this always hits the DLQ. -func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { +func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) error { start := time.Now() msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) @@ -270,7 +276,7 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) } - return + return fmt.Errorf("failed creating event from delivery, err: %s", err) } ctx, span := readSpan(ctx, msg) @@ -291,27 +297,27 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { - logging.FromContext(ctx).Errorf("Something happened: %v", err) + logging.FromContext(ctx).Errorf("something happened: %v", err) } } if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) + logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL) if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) } - return + return fmt.Errorf("failed to deliver to %q", d.SubscriberURL) } else if response != nil { logging.FromContext(ctx).Infof("Sending an event: %+v", response) ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) result = ceClient.Send(ctx, *response) _, isSuccess = getStatus(ctx, result) if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) + logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL) if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) } - return + return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL) } } @@ -324,6 +330,7 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien dispatchTime := time.Since(start) _ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime) } + return nil } func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error { diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index d8f0fa75dd..a104177603 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -18,7 +18,9 @@ package dispatcher import ( "context" + v2 "github.com/cloudevents/sdk-go/v2" "io" + "log" "net/http" "net/http/httptest" "sync" @@ -56,7 +58,10 @@ func TestDispatcher_ConsumeFromQueue(t *testing.T) { time.Sleep(1000) cancelFunc() }() - d.ConsumeFromQueue(ctx, &rabbit.RabbitMQConnectionMock{}, &rabbit.RabbitMQChannelMock{}, "") + + if err := d.ConsumeFromQueue(ctx, &rabbit.RabbitMQConnectionMock{}, &rabbit.RabbitMQChannelMock{}, ""); err != nil { + t.Errorf("ConsumeFromQueue() error = %v", err) + } } func TestDispatcher_ReadSpan(t *testing.T) { @@ -181,3 +186,144 @@ func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func requestAccepted(writer http.ResponseWriter, req *http.Request) { writer.WriteHeader(http.StatusOK) } + +type MockAcknowledger struct { +} + +func (m MockAcknowledger) Ack(tag uint64, multiple bool) error { + return nil +} +func (m MockAcknowledger) Nack(tag uint64, multiple bool, requeue bool) error { + return nil +} +func (m MockAcknowledger) Reject(tag uint64, requeue bool) error { + return nil +} + +func TestDispatcher_dispatch(t *testing.T) { + mockAcknowledger := MockAcknowledger{} + + p, err := v2.NewHTTP(v2.WithGetHandlerFunc(requestAccepted)) + if err != nil { + log.Fatalf("Failed to create protocol, %v", err) + } + + c, err := v2.NewClient(p) + if err != nil { + log.Fatalf("Failed to create client, %v", err) + } + + notifyCloseChannel := make(chan *amqp.Error) + consumeChannel := make(<-chan amqp.Delivery) + channel := rabbit.RabbitMQChannelMock{ + NotifyCloseChannel: notifyCloseChannel, + ConsumeChannel: consumeChannel, + } + + go func() { + for { + select { + case consumer := <-consumeChannel: + log.Fatalf("%+v", consumer) + case notify := <-notifyCloseChannel: + log.Fatalf(notify.Error()) + } + } + }() + + type fields struct { + BrokerIngressURL string + SubscriberURL string + SubscriberCACerts string + MaxRetries int + BackoffDelay time.Duration + Timeout time.Duration + BackoffPolicy v1.BackoffPolicyType + WorkerCount int + Reporter dispatcherstats.StatsReporter + DLX bool + } + type args struct { + ctx context.Context + msg amqp.Delivery + ceClient v2.Client + channel rabbit.RabbitMQChannelInterface + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "knativeerrordest is in header", + fields: fields{}, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: mockAcknowledger, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{"knativeerrordest": "some-destination"}, + }, + ceClient: c, + channel: &channel, + }, + }, + { + name: "invalid event", + fields: fields{}, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: mockAcknowledger, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + }, + ceClient: nil, + channel: nil, + }, + wantErr: true, + }, + { + name: "valid event", + fields: fields{}, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: mockAcknowledger, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + ceClient: c, + channel: &channel, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := &Dispatcher{ + BrokerIngressURL: tt.fields.BrokerIngressURL, + SubscriberURL: tt.fields.SubscriberURL, + SubscriberCACerts: tt.fields.SubscriberCACerts, + MaxRetries: tt.fields.MaxRetries, + BackoffDelay: tt.fields.BackoffDelay, + Timeout: tt.fields.Timeout, + BackoffPolicy: tt.fields.BackoffPolicy, + WorkerCount: tt.fields.WorkerCount, + Reporter: tt.fields.Reporter, + DLX: tt.fields.DLX, + } + + if d.DLX { + if err = d.dispatchDLQ(tt.args.ctx, tt.args.msg, tt.args.ceClient); (err != nil) != tt.wantErr { + t.Errorf("dispatchDLQ() error = %v, wantErr %v", err, tt.wantErr) + } + } else { + if err = d.dispatch(tt.args.ctx, tt.args.msg, tt.args.ceClient, tt.args.channel); (err != nil) != tt.wantErr { + t.Errorf("dispatch() error = %v, wantErr %v", err, tt.wantErr) + } + } + }) + } +} From 29a27198a90d666eb716773214629675d0dbe396 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Sun, 23 Jul 2023 20:56:47 +0200 Subject: [PATCH 19/29] Improved tests --- pkg/dispatcher/dispatcher.go | 2 +- pkg/dispatcher/dispatcher_test.go | 91 ++++++++++++++++++++++--------- 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 88f5c3ab93..5ed28305d4 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -163,7 +163,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c if err != nil { logging.FromContext(ctx).Warn("failed to Nack event: ", err) } - return fmt.Errorf("failed to Nack event: %s", err) + return nil } ctx, span := readSpan(ctx, msg) diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index a104177603..ead68edb62 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -19,6 +19,7 @@ package dispatcher import ( "context" v2 "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" "io" "log" "net/http" @@ -200,19 +201,22 @@ func (m MockAcknowledger) Reject(tag uint64, requeue bool) error { return nil } -func TestDispatcher_dispatch(t *testing.T) { - mockAcknowledger := MockAcknowledger{} +type MockClient struct { + Request func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) +} - p, err := v2.NewHTTP(v2.WithGetHandlerFunc(requestAccepted)) - if err != nil { - log.Fatalf("Failed to create protocol, %v", err) - } +type MockStatsReporter struct { +} - c, err := v2.NewClient(p) - if err != nil { - log.Fatalf("Failed to create client, %v", err) - } +func (m MockStatsReporter) ReportEventCount(args *dispatcherstats.ReportArgs, responseCode int) error { + return nil +} + +func (m MockStatsReporter) ReportEventDispatchTime(args *dispatcherstats.ReportArgs, responseCode int, d time.Duration) error { + return nil +} +func TestDispatcher_dispatch(t *testing.T) { notifyCloseChannel := make(chan *amqp.Error) consumeChannel := make(<-chan amqp.Delivery) channel := rabbit.RabbitMQChannelMock{ @@ -244,10 +248,10 @@ func TestDispatcher_dispatch(t *testing.T) { DLX bool } type args struct { - ctx context.Context - msg amqp.Delivery - ceClient v2.Client - channel rabbit.RabbitMQChannelInterface + ctx context.Context + msg amqp.Delivery + client MockClient + channel rabbit.RabbitMQChannelInterface } tests := []struct { name string @@ -261,12 +265,12 @@ func TestDispatcher_dispatch(t *testing.T) { args: args{ ctx: context.TODO(), msg: amqp.Delivery{ - Acknowledger: mockAcknowledger, + Acknowledger: &MockAcknowledger{}, ContentType: "application/cloudevents+json", Headers: amqp.Table{"knativeerrordest": "some-destination"}, }, - ceClient: c, - channel: &channel, + client: MockClient{}, + channel: &channel, }, }, { @@ -275,28 +279,56 @@ func TestDispatcher_dispatch(t *testing.T) { args: args{ ctx: context.TODO(), msg: amqp.Delivery{ - Acknowledger: mockAcknowledger, + Acknowledger: &MockAcknowledger{}, ContentType: "application/cloudevents+json", Headers: amqp.Table{}, }, - ceClient: nil, - channel: nil, + client: MockClient{}, + channel: nil, }, wantErr: true, }, { - name: "valid event", - fields: fields{}, + name: "invalid request", + fields: fields{ + Reporter: &MockStatsReporter{}, + }, args: args{ ctx: context.TODO(), msg: amqp.Delivery{ - Acknowledger: mockAcknowledger, + Acknowledger: &MockAcknowledger{}, ContentType: "application/cloudevents+json", Headers: amqp.Table{}, Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), }, - ceClient: c, - channel: &channel, + client: MockClient{ + Request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(500, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, + }, + wantErr: true, + }, + { + name: "valid event", + fields: fields{ + Reporter: &MockStatsReporter{}, + }, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + client: MockClient{ + Request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(200, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, }, }, } @@ -315,12 +347,17 @@ func TestDispatcher_dispatch(t *testing.T) { DLX: tt.fields.DLX, } + client, err := v2.NewClient(tt.args.client) + if err != nil { + log.Fatalf("Failed to create protocol, %v", err) + } + if d.DLX { - if err = d.dispatchDLQ(tt.args.ctx, tt.args.msg, tt.args.ceClient); (err != nil) != tt.wantErr { + if err = d.dispatchDLQ(tt.args.ctx, tt.args.msg, client); (err != nil) != tt.wantErr { t.Errorf("dispatchDLQ() error = %v, wantErr %v", err, tt.wantErr) } } else { - if err = d.dispatch(tt.args.ctx, tt.args.msg, tt.args.ceClient, tt.args.channel); (err != nil) != tt.wantErr { + if err = d.dispatch(tt.args.ctx, tt.args.msg, client, tt.args.channel); (err != nil) != tt.wantErr { t.Errorf("dispatch() error = %v, wantErr %v", err, tt.wantErr) } } From 7d49b8092ad2efe4fd63af63cfae77c3b29ad88d Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 24 Jul 2023 19:47:10 +0200 Subject: [PATCH 20/29] Quick push --- pkg/dispatcher/dispatcher_test.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index ead68edb62..91fa17f137 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -202,7 +202,11 @@ func (m MockAcknowledger) Reject(tag uint64, requeue bool) error { } type MockClient struct { - Request func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) + request func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) +} + +func (mock MockClient) Request(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return mock.request(ctx, m, transformers...) } type MockStatsReporter struct { @@ -217,14 +221,14 @@ func (m MockStatsReporter) ReportEventDispatchTime(args *dispatcherstats.ReportA } func TestDispatcher_dispatch(t *testing.T) { - notifyCloseChannel := make(chan *amqp.Error) - consumeChannel := make(<-chan amqp.Delivery) + //notifyCloseChannel := make(chan *amqp.Error, 1) + //consumeChannel := make(chan amqp.Delivery, 1) channel := rabbit.RabbitMQChannelMock{ - NotifyCloseChannel: notifyCloseChannel, - ConsumeChannel: consumeChannel, + //NotifyCloseChannel: notifyCloseChannel, + //ConsumeChannel: consumeChannel, } - go func() { + /*go func() { for { select { case consumer := <-consumeChannel: @@ -233,7 +237,7 @@ func TestDispatcher_dispatch(t *testing.T) { log.Fatalf(notify.Error()) } } - }() + }()*/ type fields struct { BrokerIngressURL string @@ -302,7 +306,7 @@ func TestDispatcher_dispatch(t *testing.T) { Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), }, client: MockClient{ - Request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(500, ""), 0, time.Now(), []protocol.Result{}) }, }, @@ -324,7 +328,7 @@ func TestDispatcher_dispatch(t *testing.T) { Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), }, client: MockClient{ - Request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(200, ""), 0, time.Now(), []protocol.Result{}) }, }, From e5e6abee33215bbf2eef5597fdee218b9124c690 Mon Sep 17 00:00:00 2001 From: Gabriel Freites Date: Mon, 24 Jul 2023 19:50:36 +0200 Subject: [PATCH 21/29] now the failed messages are directly routed to the DLQ exchange defined in the trigger, if its not defined in the trigger then in the broker, and if neither is defined then they are not rerouted --- cmd/dispatcher/main.go | 2 ++ pkg/dispatcher/dispatcher.go | 25 +++++++++---------- .../trigger/resources/dispatcher.go | 8 ++++++ pkg/reconciler/trigger/trigger.go | 9 ++++--- 4 files changed, 28 insertions(+), 16 deletions(-) diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index 948cbe3463..35e1f9c692 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -47,6 +47,7 @@ type envConfig struct { SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"` SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"` DLX bool `envconfig:"DLX" required:"false"` + DLXName string `envconfig:"DLX_NAME" required:"false"` // Number of concurrent messages in flight Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"` @@ -99,6 +100,7 @@ func main() { WorkerCount: env.Parallelism, Reporter: reporter, DLX: env.DLX, + DLXName: env.DLXName, } var err error diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 5ed28305d4..4adc3b1e2e 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -57,6 +57,7 @@ type Dispatcher struct { WorkerCount int Reporter dispatcher.StatsReporter DLX bool + DLXName string } // ConsumeFromQueue consumes messages from the given message channel and queue. @@ -158,13 +159,7 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) error { start := time.Now() - if _, ok := msg.Headers["knativeerrordest"]; ok { - err := msg.Nack(false, false) - if err != nil { - logging.FromContext(ctx).Warn("failed to Nack event: ", err) - } - return nil - } + dlqExchange := d.DLXName ctx, span := readSpan(ctx, msg) defer span.End() @@ -179,14 +174,13 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery event.SetExtension("knativeerrordest", d.SubscriberURL) - - if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { + if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil { logging.FromContext(ctx).Warn("failed to send event: ", err) } } - if err = msg.Nack(false, false); err != nil { - logging.FromContext(ctx).Warn("failed to Nack event: ", err) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } return fmt.Errorf("failed parsing event: %s", err) @@ -225,7 +219,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c event.SetExtension("knativeerrorcode", statusCode) // Queue the event into DLQ with the correct headers. - if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { + if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil { logging.FromContext(ctx).Warn("failed to send event: ", err) } return fmt.Errorf("failed to deliver to %q", d.SubscriberURL) @@ -248,7 +242,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c event.SetExtension("knativeerrordata", result) // Queue the event into DLQ with the correct headers. - if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil { + if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil { logging.FromContext(ctx).Warn("failed to send event: ", err) } return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL) @@ -334,6 +328,11 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien } func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error { + // no dlq defined in the trigger nor the broker, return + if exchangeName == "" { + return nil + } + tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext()) dc, err := channel.PublishWithDeferredConfirm( exchangeName, diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index 9fca19a89b..5ed863a3ce 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -57,6 +57,7 @@ type DispatcherArgs struct { BrokerIngressURL *apis.URL Subscriber *duckv1.Addressable DLX bool + DLXName string Configs reconcilersource.ConfigAccessor ResourceRequirements corev1.ResourceRequirements } @@ -205,6 +206,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: "true", }) } + if args.DLXName != "" { + dispatcher.Env = append(dispatcher.Env, + corev1.EnvVar{ + Name: "DLX_NAME", + Value: args.DLXName, + }) + } deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Trigger.Namespace, diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index 060b5a8215..b6549ddbca 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -43,6 +43,7 @@ import ( triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" reconcilersource "knative.dev/eventing/pkg/reconciler/source" + "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -204,7 +205,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p } t.Status.MarkDeadLetterSinkResolvedSucceeded() t.Status.DeadLetterSinkURI = deadLetterSinkAddressable.URL - _, err = r.reconcileDispatcherDeployment(ctx, t, deadLetterSinkAddressable, t.Spec.Delivery, true, rabbitmqVhost) + _, err = r.reconcileDispatcherDeployment(ctx, t, deadLetterSinkAddressable, t.Spec.Delivery, true, rabbitmqVhost, "") if err != nil { logging.FromContext(ctx).Error("Problem reconciling DLX dispatcher Deployment", zap.Error(err)) t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err) @@ -292,9 +293,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p if delivery == nil { // If trigger didn't but Broker did, use it instead. delivery = broker.Spec.Delivery + dlxName = ptr.String(naming.BrokerExchangeName(broker, true)) } - _, err = r.reconcileDispatcherDeployment(ctx, t, subscriberAddressable, delivery, false, rabbitmqVhost) + _, err = r.reconcileDispatcherDeployment(ctx, t, subscriberAddressable, delivery, false, rabbitmqVhost, ptr.StringValue(dlxName)) if err != nil { logging.FromContext(ctx).Error("Problem reconciling dispatcher Deployment", zap.Error(err)) t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err) @@ -381,7 +383,7 @@ func (r *Reconciler) reconcileDispatcherDeployment( subscriberAddressable *duckv1.Addressable, delivery *eventingduckv1.DeliverySpec, dlq bool, - rabbitmqVhost string) (*v1.Deployment, error) { + rabbitmqVhost, dlxName string) (*v1.Deployment, error) { rabbitmqSecret, err := r.getRabbitmqSecret(ctx, t) if err != nil { return nil, err @@ -421,6 +423,7 @@ func (r *Reconciler) reconcileDispatcherDeployment( BrokerIngressURL: b.Status.Address.URL, Subscriber: subscriberAddressable, DLX: dlq, + DLXName: dlxName, Delivery: delivery, Configs: r.configs, ResourceRequirements: resourceRequirements, From 3589e6f7f860040f6a1477a573a332b95a3e0f42 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Tue, 25 Jul 2023 19:11:56 +0200 Subject: [PATCH 22/29] Update test. Still failing --- pkg/dispatcher/dispatcher_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index 91fa17f137..e1aabb6d10 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -250,6 +250,7 @@ func TestDispatcher_dispatch(t *testing.T) { WorkerCount int Reporter dispatcherstats.StatsReporter DLX bool + DLXName string } type args struct { ctx context.Context @@ -296,6 +297,7 @@ func TestDispatcher_dispatch(t *testing.T) { name: "invalid request", fields: fields{ Reporter: &MockStatsReporter{}, + DLXName: "test", }, args: args{ ctx: context.TODO(), @@ -318,6 +320,7 @@ func TestDispatcher_dispatch(t *testing.T) { name: "valid event", fields: fields{ Reporter: &MockStatsReporter{}, + DLXName: "test", }, args: args{ ctx: context.TODO(), @@ -349,6 +352,7 @@ func TestDispatcher_dispatch(t *testing.T) { WorkerCount: tt.fields.WorkerCount, Reporter: tt.fields.Reporter, DLX: tt.fields.DLX, + DLXName: tt.fields.DLXName, } client, err := v2.NewClient(tt.args.client) From fcfc5add49e7d2ca981881372701a05621ec3849 Mon Sep 17 00:00:00 2001 From: Gabriel Freites Date: Thu, 27 Jul 2023 12:24:36 +0200 Subject: [PATCH 23/29] fixed tests and removed event nil case when binding.ToEvent fails cause its always going to be nil --- pkg/dispatcher/dispatcher.go | 15 ++------------- pkg/dispatcher/dispatcher_test.go | 23 +++-------------------- 2 files changed, 5 insertions(+), 33 deletions(-) diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 4adc3b1e2e..8407d44824 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -167,18 +167,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { - if event == nil { - logging.FromContext(ctx).Error("failed parsing event: ", err) - } else { - logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err) - - // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery - event.SetExtension("knativeerrordest", d.SubscriberURL) - if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil { - logging.FromContext(ctx).Warn("failed to send event: ", err) - } - } - + logging.FromContext(ctx).Warn("failed parsing event: ", err) if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) } @@ -296,7 +285,7 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien } if !isSuccess { - logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL) + logging.FromContext(ctx).Warnf("failed to deliver to %q %s", d.SubscriberURL, msg) if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) } diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index e1aabb6d10..d53c859812 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -18,8 +18,6 @@ package dispatcher import ( "context" - v2 "github.com/cloudevents/sdk-go/v2" - "github.com/cloudevents/sdk-go/v2/binding" "io" "log" "net/http" @@ -28,6 +26,9 @@ import ( "testing" "time" + v2 "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" amqp "github.com/rabbitmq/amqp091-go" @@ -250,7 +251,6 @@ func TestDispatcher_dispatch(t *testing.T) { WorkerCount int Reporter dispatcherstats.StatsReporter DLX bool - DLXName string } type args struct { ctx context.Context @@ -264,20 +264,6 @@ func TestDispatcher_dispatch(t *testing.T) { args args wantErr bool }{ - { - name: "knativeerrordest is in header", - fields: fields{}, - args: args{ - ctx: context.TODO(), - msg: amqp.Delivery{ - Acknowledger: &MockAcknowledger{}, - ContentType: "application/cloudevents+json", - Headers: amqp.Table{"knativeerrordest": "some-destination"}, - }, - client: MockClient{}, - channel: &channel, - }, - }, { name: "invalid event", fields: fields{}, @@ -297,7 +283,6 @@ func TestDispatcher_dispatch(t *testing.T) { name: "invalid request", fields: fields{ Reporter: &MockStatsReporter{}, - DLXName: "test", }, args: args{ ctx: context.TODO(), @@ -320,7 +305,6 @@ func TestDispatcher_dispatch(t *testing.T) { name: "valid event", fields: fields{ Reporter: &MockStatsReporter{}, - DLXName: "test", }, args: args{ ctx: context.TODO(), @@ -352,7 +336,6 @@ func TestDispatcher_dispatch(t *testing.T) { WorkerCount: tt.fields.WorkerCount, Reporter: tt.fields.Reporter, DLX: tt.fields.DLX, - DLXName: tt.fields.DLXName, } client, err := v2.NewClient(tt.args.client) From 114be7537efbbde6ee81b950bbfa8f6cf4fd49da Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Thu, 27 Jul 2023 13:18:16 +0200 Subject: [PATCH 24/29] Added two simple test, to test the dlq function --- pkg/dispatcher/dispatcher_test.go | 63 ++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index d53c859812..efdd4554e6 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -222,23 +222,7 @@ func (m MockStatsReporter) ReportEventDispatchTime(args *dispatcherstats.ReportA } func TestDispatcher_dispatch(t *testing.T) { - //notifyCloseChannel := make(chan *amqp.Error, 1) - //consumeChannel := make(chan amqp.Delivery, 1) - channel := rabbit.RabbitMQChannelMock{ - //NotifyCloseChannel: notifyCloseChannel, - //ConsumeChannel: consumeChannel, - } - - /*go func() { - for { - select { - case consumer := <-consumeChannel: - log.Fatalf("%+v", consumer) - case notify := <-notifyCloseChannel: - log.Fatalf(notify.Error()) - } - } - }()*/ + channel := rabbit.RabbitMQChannelMock{} type fields struct { BrokerIngressURL string @@ -301,6 +285,29 @@ func TestDispatcher_dispatch(t *testing.T) { }, wantErr: true, }, + { + name: "invalid request dlq", + fields: fields{ + Reporter: &MockStatsReporter{}, + DLX: true, + }, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + client: MockClient{ + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(500, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, + }, + wantErr: true, + }, { name: "valid event", fields: fields{ @@ -322,6 +329,28 @@ func TestDispatcher_dispatch(t *testing.T) { channel: &channel, }, }, + { + name: "valid event dlq", + fields: fields{ + Reporter: &MockStatsReporter{}, + DLX: true, + }, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + client: MockClient{ + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(200, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 4b38a8fbf5c8767451d953bcf563c3e02801279d Mon Sep 17 00:00:00 2001 From: Gabriel Freites Date: Thu, 27 Jul 2023 16:55:03 +0200 Subject: [PATCH 25/29] fixed unit tests --- pkg/reconciler/trigger/trigger_test.go | 54 ++++++++++++++------------ 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 2c8636cdc0..51e09fd6c3 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -74,12 +74,13 @@ import ( ) const ( - systemNS = "knative-testing" - testNS = "test-namespace" - otherNS = "other-namespace" - brokerClass = "RabbitMQBroker" - brokerName = "test-broker" - brokerUID = "broker-test-uid" + systemNS = "knative-testing" + testNS = "test-namespace" + otherNS = "other-namespace" + brokerClass = "RabbitMQBroker" + brokerName = "test-broker" + brokerUID = "broker-test-uid" + brokerDLQName = "b.test-namespace.test-broker.dlx.broker-test-uid" rabbitSecretName = "test-broker-broker-rabbit" rabbitMQBrokerName = "rabbitbrokerhere" @@ -270,7 +271,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -306,7 +307,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeploymentWithRetries(), + createDispatcherDeploymentWithRetries(brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -325,7 +326,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeploymentWithRetries(), + createDispatcherDeploymentWithRetries(""), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithDeliverySpecReady(), @@ -420,7 +421,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -441,7 +442,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -470,7 +471,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -499,7 +500,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -562,7 +563,7 @@ func TestReconcile(t *testing.T) { }, WantErr: true, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -599,7 +600,7 @@ func TestReconcile(t *testing.T) { }, WantErr: true, WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: createDispatcherDeployment(false, ""), + Object: createDispatcherDeployment(false, "", brokerDLQName), }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -626,7 +627,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), markReady(createQueue(config, false, "")), markReady(createBinding(false, false, "")), - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -651,7 +652,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), markReady(createQueue(config, false, "")), markReady(createBinding(true, false, "")), - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -821,7 +822,7 @@ func TestReconcile(t *testing.T) { }, WantErr: false, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -854,7 +855,7 @@ func TestReconcile(t *testing.T) { createDispatcherDeploymentWithParallelism(), }, WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: createDispatcherDeployment(false, ""), + Object: createDispatcherDeployment(false, "", brokerDLQName), }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -883,11 +884,11 @@ func TestReconcile(t *testing.T) { markReady(createQueue(config, false, "")), markReady(createExchange()), markReady(createBinding(true, true, "")), - createDispatcherDeployment(true, ""), + createDispatcherDeployment(true, "", brokerDLQName), markReady(createPolicy()), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -947,7 +948,7 @@ func TestReconcile(t *testing.T) { }, WantErr: false, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -984,7 +985,7 @@ func TestReconcile(t *testing.T) { }, WantErr: false, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, rabbitMQVhost), + createDispatcherDeployment(false, rabbitMQVhost, brokerDLQName), }, WantUpdates: []clientgotesting.UpdateActionImpl{{ Object: markReady(createQueue(configWithRabbitMQBrokerConfig(), false, rabbitMQVhost)), @@ -1317,7 +1318,7 @@ func createRabbitMQBrokerConfig(vhost string) *v1alpha1.RabbitmqBrokerConfig { } } -func createDispatcherDeployment(dlq bool, vhost string) *appsv1.Deployment { +func createDispatcherDeployment(dlq bool, vhost, dlxName string) *appsv1.Deployment { args := &resources.DispatcherArgs{ Trigger: &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ @@ -1337,6 +1338,7 @@ func createDispatcherDeployment(dlq bool, vhost string) *appsv1.Deployment { BrokerIngressURL: brokerAddress, Subscriber: subscriberAddressable, DLX: dlq, + DLXName: dlxName, } return resources.MakeDispatcherDeployment(args) } @@ -1360,6 +1362,7 @@ func createDispatcherDeploymentWithResourceRequirements(dlq bool) *appsv1.Deploy BrokerIngressURL: brokerAddress, Subscriber: subscriberAddressable, DLX: dlq, + DLXName: brokerDLQName, ResourceRequirements: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("500m"), @@ -1372,7 +1375,7 @@ func createDispatcherDeploymentWithResourceRequirements(dlq bool) *appsv1.Deploy return resources.MakeDispatcherDeployment(args) } -func createDispatcherDeploymentWithRetries() *appsv1.Deployment { +func createDispatcherDeploymentWithRetries(dlxName string) *appsv1.Deployment { args := &resources.DispatcherArgs{ Trigger: &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ @@ -1391,6 +1394,7 @@ func createDispatcherDeploymentWithRetries() *appsv1.Deployment { BrokerIngressURL: brokerAddress, Subscriber: subscriberAddressable, Delivery: &eventingduckv1.DeliverySpec{}, + DLXName: dlxName, } return resources.MakeDispatcherDeployment(args) } From 2c761bb9349796569f2a373a087cd58f5e52c394 Mon Sep 17 00:00:00 2001 From: Gabriel Freites Date: Thu, 27 Jul 2023 17:10:54 +0200 Subject: [PATCH 26/29] this is just an smoke test for consumefromqueue function to see if it stops when a context cancel gets there --- pkg/dispatcher/dispatcher_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index efdd4554e6..d215b0cdec 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -60,10 +60,7 @@ func TestDispatcher_ConsumeFromQueue(t *testing.T) { time.Sleep(1000) cancelFunc() }() - - if err := d.ConsumeFromQueue(ctx, &rabbit.RabbitMQConnectionMock{}, &rabbit.RabbitMQChannelMock{}, ""); err != nil { - t.Errorf("ConsumeFromQueue() error = %v", err) - } + d.ConsumeFromQueue(ctx, &rabbit.RabbitMQConnectionMock{}, &rabbit.RabbitMQChannelMock{}, "") } func TestDispatcher_ReadSpan(t *testing.T) { From b4852da221eda4d3d02b557b8259dbfca5502e78 Mon Sep 17 00:00:00 2001 From: Gabriel Freites Date: Thu, 27 Jul 2023 22:11:54 +0200 Subject: [PATCH 27/29] added defered confirmation to connection mock --- pkg/rabbit/rabbit_mocks.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/rabbit/rabbit_mocks.go b/pkg/rabbit/rabbit_mocks.go index 9c2cde6edb..333d059567 100644 --- a/pkg/rabbit/rabbit_mocks.go +++ b/pkg/rabbit/rabbit_mocks.go @@ -61,6 +61,7 @@ func (rm *RabbitMQBadConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp. type RabbitMQChannelMock struct { NotifyCloseChannel chan *amqp.Error ConsumeChannel <-chan amqp.Delivery + dc *amqp.DeferredConfirmation } func (rm *RabbitMQChannelMock) IsClosed() bool { @@ -83,7 +84,7 @@ func (rm *RabbitMQChannelMock) Consume(a string, b string, c bool, d bool, e boo } func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error) { - return &amqp.DeferredConfirmation{}, nil + return rm.dc, nil } func (rm *RabbitMQChannelMock) Confirm(a bool) error { From 8e2b28aad57cc441c3cc9470d3d47b57948a6abe Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 31 Jul 2023 18:41:43 +0200 Subject: [PATCH 28/29] Extended tests --- .../broker/resources/dispatcher_test.go | 4 ++++ .../trigger/resources/dispatcher_test.go | 16 ++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/pkg/reconciler/broker/resources/dispatcher_test.go b/pkg/reconciler/broker/resources/dispatcher_test.go index 691d165e52..a9364e6e46 100644 --- a/pkg/reconciler/broker/resources/dispatcher_test.go +++ b/pkg/reconciler/broker/resources/dispatcher_test.go @@ -73,6 +73,7 @@ func TestMakeDispatcherDeployment(t *testing.T) { BackoffDelay: ptr.String("PT20S"), BackoffPolicy: &linear, }, + DLX: true, } got := MakeDispatcherDeployment(args) @@ -190,6 +191,9 @@ func TestMakeDispatcherDeployment(t *testing.T) { }, { Name: "SUBSCRIBER_CACERTS", Value: "test.cacert", + }, { + Name: "DLX", + Value: "true", }}, Ports: []corev1.ContainerPort{{ Name: "http-metrics", diff --git a/pkg/reconciler/trigger/resources/dispatcher_test.go b/pkg/reconciler/trigger/resources/dispatcher_test.go index d2235be53a..9ba4b24465 100644 --- a/pkg/reconciler/trigger/resources/dispatcher_test.go +++ b/pkg/reconciler/trigger/resources/dispatcher_test.go @@ -88,6 +88,16 @@ func TestMakeDispatcherDeployment(t *testing.T) { withEnv(corev1.EnvVar{Name: "POD_NAME", Value: "testtrigger-dlx-dispatcher"}), withDefaultResourceRequirements(), ), + }, { + name: "with dlx name", + args: dispatcherArgs(withDLXName("dlx-name")), + want: deployment( + deploymentNamed("testtrigger-dispatcher"), + withEnv(corev1.EnvVar{Name: "PARALLELISM", Value: "1"}), + withEnv(corev1.EnvVar{Name: "DLX_NAME", Value: "dlx-name"}), + withEnv(corev1.EnvVar{Name: "POD_NAME", Value: "testtrigger-dispatcher"}), + withDefaultResourceRequirements(), + ), }, { name: "with parallelism", @@ -311,6 +321,12 @@ func withDLX(args *DispatcherArgs) { args.DLX = true } +func withDLXName(name string) func(*DispatcherArgs) { + return func(args *DispatcherArgs) { + args.DLXName = name + } +} + func withParallelism(c string) func(*DispatcherArgs) { return func(args *DispatcherArgs) { if args.Trigger.ObjectMeta.Annotations == nil { From 107f3c3f18437ff672136516dc0feb1d9680965e Mon Sep 17 00:00:00 2001 From: Gabriel Freites Date: Mon, 31 Jul 2023 19:29:09 +0200 Subject: [PATCH 29/29] improving coverage --- pkg/rabbit/connections_handler_test.go | 16 ++++++++++++++++ pkg/rabbit/rabbit_mocks.go | 7 +------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/pkg/rabbit/connections_handler_test.go b/pkg/rabbit/connections_handler_test.go index a7bea2edfa..b1aec3a535 100644 --- a/pkg/rabbit/connections_handler_test.go +++ b/pkg/rabbit/connections_handler_test.go @@ -62,6 +62,22 @@ func Test_InvalidSetupRabbitMQ(t *testing.T) { rabbitMQHelper.Connection, _ = rabbitMQHelper.createConnection("amqp://localhost:5672/%2f", ValidDial) rabbitMQHelper.Channel, _ = rabbitMQHelper.createChannel() + if rabbitMQHelper.Connection.IsClosed() { + t.Errorf("unexpected closed connection error") + } + + if _, err := rabbitMQHelper.Channel.QueueInspect("test"); err != nil { + t.Errorf("unexpected queue inspect error") + } + + if _, err := rabbitMQHelper.Channel.Consume("test", "test", true, true, true, true, amqp091.Table{}); err != nil { + t.Errorf("unexpected channel consume error") + } + + if _, err := rabbitMQHelper.Channel.PublishWithDeferredConfirm("test", "test", true, true, amqp091.Publishing{}); err != nil { + t.Errorf("unexpected channel publish with deferred confirm error") + } + err = rabbitMQHelper.configConnectionAndChannel(InvalidConfigTest) if err == nil || rabbitMQHelper.GetConnection() == nil || rabbitMQHelper.GetChannel() == nil { t.Errorf("unexpected error == nil when setting up invalid config %s %s %s", rabbitMQHelper.GetConnection(), rabbitMQHelper.GetChannel(), err) diff --git a/pkg/rabbit/rabbit_mocks.go b/pkg/rabbit/rabbit_mocks.go index 333d059567..0fec70d139 100644 --- a/pkg/rabbit/rabbit_mocks.go +++ b/pkg/rabbit/rabbit_mocks.go @@ -54,14 +54,9 @@ func (rm *RabbitMQBadConnectionMock) Close() error { return nil } -func (rm *RabbitMQBadConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error { - return c -} - type RabbitMQChannelMock struct { NotifyCloseChannel chan *amqp.Error ConsumeChannel <-chan amqp.Delivery - dc *amqp.DeferredConfirmation } func (rm *RabbitMQChannelMock) IsClosed() bool { @@ -84,7 +79,7 @@ func (rm *RabbitMQChannelMock) Consume(a string, b string, c bool, d bool, e boo } func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error) { - return rm.dc, nil + return &amqp.DeferredConfirmation{}, nil } func (rm *RabbitMQChannelMock) Confirm(a bool) error {