diff --git a/cmd/dispatcher/main.go b/cmd/dispatcher/main.go index aa8b46e0be..35e1f9c692 100644 --- a/cmd/dispatcher/main.go +++ b/cmd/dispatcher/main.go @@ -46,6 +46,8 @@ type envConfig struct { BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"` SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"` SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"` + DLX bool `envconfig:"DLX" required:"false"` + DLXName string `envconfig:"DLX_NAME" required:"false"` // Number of concurrent messages in flight Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"` @@ -97,11 +99,26 @@ func main() { BackoffPolicy: backoffPolicy, WorkerCount: env.Parallelism, Reporter: reporter, + DLX: env.DLX, + DLXName: env.DLXName, } var err error rmqHelper := rabbit.NewRabbitMQConnectionHandler(1000, logger) - rmqHelper.Setup(ctx, rabbit.VHostHandler(env.RabbitURL, env.RabbitMQVhost), rabbit.ChannelQoS, rabbit.DialWrapper) + rmqHelper.Setup(ctx, rabbit.VHostHandler( + env.RabbitURL, + env.RabbitMQVhost), + func(c rabbit.RabbitMQConnectionInterface, ch rabbit.RabbitMQChannelInterface) error { + if err := rabbit.ChannelQoS(c, ch); err != nil { + return err + } + + if err := rabbit.ChannelConfirm(c, ch); err != nil { + return err + } + return nil + }, + rabbit.DialWrapper) for { if err = d.ConsumeFromQueue(ctx, rmqHelper.GetConnection(), rmqHelper.GetChannel(), env.QueueName); err != nil { if errors.Is(err, context.Canceled) { diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index c5b292b978..8407d44824 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -18,6 +18,7 @@ package dispatcher import ( "context" + "fmt" "net/http" "sync" "time" @@ -55,6 +56,8 @@ type Dispatcher struct { BackoffPolicy eventingduckv1.BackoffPolicyType WorkerCount int Reporter dispatcher.StatsReporter + DLX bool + DLXName string } // ConsumeFromQueue consumes messages from the given message channel and queue. @@ -91,7 +94,7 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC } logging.FromContext(ctx).Info("rabbitmq receiver started, exit with CTRL+C") - logging.FromContext(ctx).Infow("Starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount)) + logging.FromContext(ctx).Infow("starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount)) wg := &sync.WaitGroup{} wg.Add(d.WorkerCount) @@ -101,7 +104,11 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC go func() { defer wg.Done() for msg := range workerQueue { - d.dispatch(ctx, msg, ceClient) + if d.DLX { + _ = d.dispatchDLQ(ctx, msg, ceClient) + } else { + _ = d.dispatch(ctx, msg, ceClient, channel) + } } }() } @@ -142,25 +149,117 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) { retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil) return httpResult.StatusCode, !retry } - logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v", retriesResult.Result) + logging.FromContext(ctx).Warnf("invalid result type, not HTTP Result: %v", retriesResult.Result) return -1, false } - logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult") + logging.FromContext(ctx).Warnf("invalid result type, not RetriesResult") return -1, false } -func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) { +func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) error { start := time.Now() + dlqExchange := d.DLXName + + ctx, span := readSpan(ctx, msg) + defer span.End() + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { - logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err) - err = msg.Nack(false, false) - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + logging.FromContext(ctx).Warn("failed parsing event: ", err) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) + } + + return fmt.Errorf("failed parsing event: %s", err) + } + + if span.IsRecordingEvents() { + span.AddAttributes(client.EventTraceAttributes(event)...) + } + + ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL) + if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { + ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } else { + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries) + } + + response, result := ceClient.Request(ctx, *event) + statusCode, isSuccess := getStatus(ctx, result) + if statusCode != -1 { + args := &dispatcher.ReportArgs{EventType: event.Type()} + if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { + logging.FromContext(ctx).Errorf("something happened: %v", err) } - return + } + + if !isSuccess { + logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL) + + // We need to ack the original message. + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) + } + + // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery + event.SetExtension("knativeerrordest", d.SubscriberURL) + event.SetExtension("knativeerrorcode", statusCode) + + // Queue the event into DLQ with the correct headers. + if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil { + logging.FromContext(ctx).Warn("failed to send event: ", err) + } + return fmt.Errorf("failed to deliver to %q", d.SubscriberURL) + } else if response != nil { + logging.FromContext(ctx).Infof("sending an event: %+v", response) + ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) + result = ceClient.Send(ctx, *response) + _, isSuccess = getStatus(ctx, result) + if !isSuccess { + logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL) + + // We need to ack the original message. + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) + } + + // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery + event.SetExtension("knativeerrordest", d.SubscriberURL) + event.SetExtension("knativeerrorcode", statusCode) + event.SetExtension("knativeerrordata", result) + + // Queue the event into DLQ with the correct headers. + if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil { + logging.FromContext(ctx).Warn("failed to send event: ", err) + } + return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL) + } + } + + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) + } + if statusCode != -1 { + args := &dispatcher.ReportArgs{EventType: event.Type()} + dispatchTime := time.Since(start) + _ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime) + } + return nil +} + +// Defaulting to Ack as this always hits the DLQ. +func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) error { + start := time.Now() + msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) + event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) + if err != nil { + logging.FromContext(ctx).Warn("failed creating event from delivery, err: ", err) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) + } + return fmt.Errorf("failed creating event from delivery, err: %s", err) } ctx, span := readSpan(ctx, msg) @@ -180,41 +279,64 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c statusCode, isSuccess := getStatus(ctx, result) if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} - if err := d.Reporter.ReportEventCount(args, statusCode); err != nil { - logging.FromContext(ctx).Errorf("Something happened: %v", err) + if err = d.Reporter.ReportEventCount(args, statusCode); err != nil { + logging.FromContext(ctx).Errorf("something happened: %v", err) } } if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL) - if err := msg.Nack(false, false); err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + logging.FromContext(ctx).Warnf("failed to deliver to %q %s", d.SubscriberURL, msg) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } - return + return fmt.Errorf("failed to deliver to %q", d.SubscriberURL) } else if response != nil { logging.FromContext(ctx).Infof("Sending an event: %+v", response) ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL) - result := ceClient.Send(ctx, *response) - _, isSuccess := getStatus(ctx, result) + result = ceClient.Send(ctx, *response) + _, isSuccess = getStatus(ctx, result) if !isSuccess { - logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL) - err = msg.Nack(false, false) // not multiple - if err != nil { - logging.FromContext(ctx).Warn("failed to NACK event: ", err) + logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL) + if err = msg.Ack(false); err != nil { + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } - return + return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL) } } err = msg.Ack(false) if err != nil { - logging.FromContext(ctx).Warn("failed to ACK event: ", err) + logging.FromContext(ctx).Warn("failed to Ack event: ", err) } if statusCode != -1 { args := &dispatcher.ReportArgs{EventType: event.Type()} dispatchTime := time.Since(start) _ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime) } + return nil +} + +func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error { + // no dlq defined in the trigger nor the broker, return + if exchangeName == "" { + return nil + } + + tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext()) + dc, err := channel.PublishWithDeferredConfirm( + exchangeName, + "", // routing key + false, // mandatory + false, // immediate + *rabbit.CloudEventToRabbitMQMessage(event, tp, ts)) + if err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + if ack := dc.Wait(); !ack { + return errors.New("failed to publish message: nacked") + } + return nil } func readSpan(ctx context.Context, msg amqp.Delivery) (context.Context, *trace.Span) { diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index d8f0fa75dd..d215b0cdec 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -19,12 +19,16 @@ package dispatcher import ( "context" "io" + "log" "net/http" "net/http/httptest" "sync" "testing" "time" + v2 "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" amqp "github.com/rabbitmq/amqp091-go" @@ -181,3 +185,199 @@ func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func requestAccepted(writer http.ResponseWriter, req *http.Request) { writer.WriteHeader(http.StatusOK) } + +type MockAcknowledger struct { +} + +func (m MockAcknowledger) Ack(tag uint64, multiple bool) error { + return nil +} +func (m MockAcknowledger) Nack(tag uint64, multiple bool, requeue bool) error { + return nil +} +func (m MockAcknowledger) Reject(tag uint64, requeue bool) error { + return nil +} + +type MockClient struct { + request func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) +} + +func (mock MockClient) Request(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return mock.request(ctx, m, transformers...) +} + +type MockStatsReporter struct { +} + +func (m MockStatsReporter) ReportEventCount(args *dispatcherstats.ReportArgs, responseCode int) error { + return nil +} + +func (m MockStatsReporter) ReportEventDispatchTime(args *dispatcherstats.ReportArgs, responseCode int, d time.Duration) error { + return nil +} + +func TestDispatcher_dispatch(t *testing.T) { + channel := rabbit.RabbitMQChannelMock{} + + type fields struct { + BrokerIngressURL string + SubscriberURL string + SubscriberCACerts string + MaxRetries int + BackoffDelay time.Duration + Timeout time.Duration + BackoffPolicy v1.BackoffPolicyType + WorkerCount int + Reporter dispatcherstats.StatsReporter + DLX bool + } + type args struct { + ctx context.Context + msg amqp.Delivery + client MockClient + channel rabbit.RabbitMQChannelInterface + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "invalid event", + fields: fields{}, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + }, + client: MockClient{}, + channel: nil, + }, + wantErr: true, + }, + { + name: "invalid request", + fields: fields{ + Reporter: &MockStatsReporter{}, + }, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + client: MockClient{ + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(500, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, + }, + wantErr: true, + }, + { + name: "invalid request dlq", + fields: fields{ + Reporter: &MockStatsReporter{}, + DLX: true, + }, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + client: MockClient{ + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(500, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, + }, + wantErr: true, + }, + { + name: "valid event", + fields: fields{ + Reporter: &MockStatsReporter{}, + }, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + client: MockClient{ + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(200, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, + }, + }, + { + name: "valid event dlq", + fields: fields{ + Reporter: &MockStatsReporter{}, + DLX: true, + }, + args: args{ + ctx: context.TODO(), + msg: amqp.Delivery{ + Acknowledger: &MockAcknowledger{}, + ContentType: "application/cloudevents+json", + Headers: amqp.Table{}, + Body: []byte(`{"specversion":"1.0","source":"valid-event","id":"valid-id","type":"valid-type"}`), + }, + client: MockClient{ + request: func(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) { + return m, v2.NewHTTPRetriesResult(v2.NewHTTPResult(200, ""), 0, time.Now(), []protocol.Result{}) + }, + }, + channel: &channel, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := &Dispatcher{ + BrokerIngressURL: tt.fields.BrokerIngressURL, + SubscriberURL: tt.fields.SubscriberURL, + SubscriberCACerts: tt.fields.SubscriberCACerts, + MaxRetries: tt.fields.MaxRetries, + BackoffDelay: tt.fields.BackoffDelay, + Timeout: tt.fields.Timeout, + BackoffPolicy: tt.fields.BackoffPolicy, + WorkerCount: tt.fields.WorkerCount, + Reporter: tt.fields.Reporter, + DLX: tt.fields.DLX, + } + + client, err := v2.NewClient(tt.args.client) + if err != nil { + log.Fatalf("Failed to create protocol, %v", err) + } + + if d.DLX { + if err = d.dispatchDLQ(tt.args.ctx, tt.args.msg, client); (err != nil) != tt.wantErr { + t.Errorf("dispatchDLQ() error = %v, wantErr %v", err, tt.wantErr) + } + } else { + if err = d.dispatch(tt.args.ctx, tt.args.msg, client, tt.args.channel); (err != nil) != tt.wantErr { + t.Errorf("dispatch() error = %v, wantErr %v", err, tt.wantErr) + } + } + }) + } +} diff --git a/pkg/rabbit/connections_handler_test.go b/pkg/rabbit/connections_handler_test.go index a7bea2edfa..b1aec3a535 100644 --- a/pkg/rabbit/connections_handler_test.go +++ b/pkg/rabbit/connections_handler_test.go @@ -62,6 +62,22 @@ func Test_InvalidSetupRabbitMQ(t *testing.T) { rabbitMQHelper.Connection, _ = rabbitMQHelper.createConnection("amqp://localhost:5672/%2f", ValidDial) rabbitMQHelper.Channel, _ = rabbitMQHelper.createChannel() + if rabbitMQHelper.Connection.IsClosed() { + t.Errorf("unexpected closed connection error") + } + + if _, err := rabbitMQHelper.Channel.QueueInspect("test"); err != nil { + t.Errorf("unexpected queue inspect error") + } + + if _, err := rabbitMQHelper.Channel.Consume("test", "test", true, true, true, true, amqp091.Table{}); err != nil { + t.Errorf("unexpected channel consume error") + } + + if _, err := rabbitMQHelper.Channel.PublishWithDeferredConfirm("test", "test", true, true, amqp091.Publishing{}); err != nil { + t.Errorf("unexpected channel publish with deferred confirm error") + } + err = rabbitMQHelper.configConnectionAndChannel(InvalidConfigTest) if err == nil || rabbitMQHelper.GetConnection() == nil || rabbitMQHelper.GetChannel() == nil { t.Errorf("unexpected error == nil when setting up invalid config %s %s %s", rabbitMQHelper.GetConnection(), rabbitMQHelper.GetChannel(), err) diff --git a/pkg/rabbit/rabbit_mocks.go b/pkg/rabbit/rabbit_mocks.go index 9c2cde6edb..0fec70d139 100644 --- a/pkg/rabbit/rabbit_mocks.go +++ b/pkg/rabbit/rabbit_mocks.go @@ -54,10 +54,6 @@ func (rm *RabbitMQBadConnectionMock) Close() error { return nil } -func (rm *RabbitMQBadConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error { - return c -} - type RabbitMQChannelMock struct { NotifyCloseChannel chan *amqp.Error ConsumeChannel <-chan amqp.Delivery diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 43db2006f4..22265dbab3 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -245,6 +245,7 @@ func (r *Reconciler) reconcileDLXDispatcherDeployment(ctx context.Context, b *ev BrokerIngressURL: b.Status.Address.URL, Configs: r.configs, ResourceRequirements: requirements, + DLX: true, }) return r.reconcileDeployment(ctx, expected) } diff --git a/pkg/reconciler/broker/broker_test.go b/pkg/reconciler/broker/broker_test.go index ca957dedb6..1cf054a576 100644 --- a/pkg/reconciler/broker/broker_test.go +++ b/pkg/reconciler/broker/broker_test.go @@ -1543,6 +1543,7 @@ func createDispatcherDeployment(opts ...func(*resources.DispatcherArgs)) *appsv1 BrokerUrlSecretKey: "brokerURL", BrokerIngressURL: brokerAddress, Subscriber: DLSAddressable, + DLX: true, } for _, o := range opts { o(args) diff --git a/pkg/reconciler/broker/resources/dispatcher.go b/pkg/reconciler/broker/resources/dispatcher.go index 84c4e128ae..00633a81ba 100644 --- a/pkg/reconciler/broker/resources/dispatcher.go +++ b/pkg/reconciler/broker/resources/dispatcher.go @@ -55,6 +55,7 @@ type DispatcherArgs struct { BrokerUrlSecretKey string BrokerIngressURL *apis.URL Subscriber *duckv1.Addressable + DLX bool Configs reconcilersource.ConfigAccessor ResourceRequirements corev1.ResourceRequirements } @@ -150,6 +151,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: *args.Subscriber.CACerts, }) } + if args.DLX { + envs = append(envs, + corev1.EnvVar{ + Name: "DLX", + Value: "true", + }) + } // Default requirements only if none of the requirements are set through annotations if len(args.ResourceRequirements.Limits) == 0 && len(args.ResourceRequirements.Requests) == 0 { // This resource requests and limits comes from performance testing 1500msgs/s with a parallelism of 1000 diff --git a/pkg/reconciler/broker/resources/dispatcher_test.go b/pkg/reconciler/broker/resources/dispatcher_test.go index 691d165e52..a9364e6e46 100644 --- a/pkg/reconciler/broker/resources/dispatcher_test.go +++ b/pkg/reconciler/broker/resources/dispatcher_test.go @@ -73,6 +73,7 @@ func TestMakeDispatcherDeployment(t *testing.T) { BackoffDelay: ptr.String("PT20S"), BackoffPolicy: &linear, }, + DLX: true, } got := MakeDispatcherDeployment(args) @@ -190,6 +191,9 @@ func TestMakeDispatcherDeployment(t *testing.T) { }, { Name: "SUBSCRIBER_CACERTS", Value: "test.cacert", + }, { + Name: "DLX", + Value: "true", }}, Ports: []corev1.ContainerPort{{ Name: "http-metrics", diff --git a/pkg/reconciler/trigger/resources/dispatcher.go b/pkg/reconciler/trigger/resources/dispatcher.go index 57a575100d..5ed863a3ce 100644 --- a/pkg/reconciler/trigger/resources/dispatcher.go +++ b/pkg/reconciler/trigger/resources/dispatcher.go @@ -57,6 +57,7 @@ type DispatcherArgs struct { BrokerIngressURL *apis.URL Subscriber *duckv1.Addressable DLX bool + DLXName string Configs reconcilersource.ConfigAccessor ResourceRequirements corev1.ResourceRequirements } @@ -198,6 +199,20 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment { Value: *args.Subscriber.CACerts, }) } + if args.DLX { + dispatcher.Env = append(dispatcher.Env, + corev1.EnvVar{ + Name: "DLX", + Value: "true", + }) + } + if args.DLXName != "" { + dispatcher.Env = append(dispatcher.Env, + corev1.EnvVar{ + Name: "DLX_NAME", + Value: args.DLXName, + }) + } deployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Trigger.Namespace, diff --git a/pkg/reconciler/trigger/resources/dispatcher_test.go b/pkg/reconciler/trigger/resources/dispatcher_test.go index a118b1c792..9ba4b24465 100644 --- a/pkg/reconciler/trigger/resources/dispatcher_test.go +++ b/pkg/reconciler/trigger/resources/dispatcher_test.go @@ -84,9 +84,20 @@ func TestMakeDispatcherDeployment(t *testing.T) { want: deployment( deploymentNamed("testtrigger-dlx-dispatcher"), withEnv(corev1.EnvVar{Name: "PARALLELISM", Value: "1"}), + withEnv(corev1.EnvVar{Name: "DLX", Value: "true"}), withEnv(corev1.EnvVar{Name: "POD_NAME", Value: "testtrigger-dlx-dispatcher"}), withDefaultResourceRequirements(), ), + }, { + name: "with dlx name", + args: dispatcherArgs(withDLXName("dlx-name")), + want: deployment( + deploymentNamed("testtrigger-dispatcher"), + withEnv(corev1.EnvVar{Name: "PARALLELISM", Value: "1"}), + withEnv(corev1.EnvVar{Name: "DLX_NAME", Value: "dlx-name"}), + withEnv(corev1.EnvVar{Name: "POD_NAME", Value: "testtrigger-dispatcher"}), + withDefaultResourceRequirements(), + ), }, { name: "with parallelism", @@ -310,6 +321,12 @@ func withDLX(args *DispatcherArgs) { args.DLX = true } +func withDLXName(name string) func(*DispatcherArgs) { + return func(args *DispatcherArgs) { + args.DLXName = name + } +} + func withParallelism(c string) func(*DispatcherArgs) { return func(args *DispatcherArgs) { if args.Trigger.ObjectMeta.Annotations == nil { diff --git a/pkg/reconciler/trigger/trigger.go b/pkg/reconciler/trigger/trigger.go index 060b5a8215..b6549ddbca 100644 --- a/pkg/reconciler/trigger/trigger.go +++ b/pkg/reconciler/trigger/trigger.go @@ -43,6 +43,7 @@ import ( triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" reconcilersource "knative.dev/eventing/pkg/reconciler/source" + "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -204,7 +205,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p } t.Status.MarkDeadLetterSinkResolvedSucceeded() t.Status.DeadLetterSinkURI = deadLetterSinkAddressable.URL - _, err = r.reconcileDispatcherDeployment(ctx, t, deadLetterSinkAddressable, t.Spec.Delivery, true, rabbitmqVhost) + _, err = r.reconcileDispatcherDeployment(ctx, t, deadLetterSinkAddressable, t.Spec.Delivery, true, rabbitmqVhost, "") if err != nil { logging.FromContext(ctx).Error("Problem reconciling DLX dispatcher Deployment", zap.Error(err)) t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err) @@ -292,9 +293,10 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p if delivery == nil { // If trigger didn't but Broker did, use it instead. delivery = broker.Spec.Delivery + dlxName = ptr.String(naming.BrokerExchangeName(broker, true)) } - _, err = r.reconcileDispatcherDeployment(ctx, t, subscriberAddressable, delivery, false, rabbitmqVhost) + _, err = r.reconcileDispatcherDeployment(ctx, t, subscriberAddressable, delivery, false, rabbitmqVhost, ptr.StringValue(dlxName)) if err != nil { logging.FromContext(ctx).Error("Problem reconciling dispatcher Deployment", zap.Error(err)) t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err) @@ -381,7 +383,7 @@ func (r *Reconciler) reconcileDispatcherDeployment( subscriberAddressable *duckv1.Addressable, delivery *eventingduckv1.DeliverySpec, dlq bool, - rabbitmqVhost string) (*v1.Deployment, error) { + rabbitmqVhost, dlxName string) (*v1.Deployment, error) { rabbitmqSecret, err := r.getRabbitmqSecret(ctx, t) if err != nil { return nil, err @@ -421,6 +423,7 @@ func (r *Reconciler) reconcileDispatcherDeployment( BrokerIngressURL: b.Status.Address.URL, Subscriber: subscriberAddressable, DLX: dlq, + DLXName: dlxName, Delivery: delivery, Configs: r.configs, ResourceRequirements: resourceRequirements, diff --git a/pkg/reconciler/trigger/trigger_test.go b/pkg/reconciler/trigger/trigger_test.go index 2c8636cdc0..51e09fd6c3 100644 --- a/pkg/reconciler/trigger/trigger_test.go +++ b/pkg/reconciler/trigger/trigger_test.go @@ -74,12 +74,13 @@ import ( ) const ( - systemNS = "knative-testing" - testNS = "test-namespace" - otherNS = "other-namespace" - brokerClass = "RabbitMQBroker" - brokerName = "test-broker" - brokerUID = "broker-test-uid" + systemNS = "knative-testing" + testNS = "test-namespace" + otherNS = "other-namespace" + brokerClass = "RabbitMQBroker" + brokerName = "test-broker" + brokerUID = "broker-test-uid" + brokerDLQName = "b.test-namespace.test-broker.dlx.broker-test-uid" rabbitSecretName = "test-broker-broker-rabbit" rabbitMQBrokerName = "rabbitbrokerhere" @@ -270,7 +271,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -306,7 +307,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeploymentWithRetries(), + createDispatcherDeploymentWithRetries(brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -325,7 +326,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeploymentWithRetries(), + createDispatcherDeploymentWithRetries(""), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithDeliverySpecReady(), @@ -420,7 +421,7 @@ func TestReconcile(t *testing.T) { markReady(createBinding(true, false, "")), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -441,7 +442,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -470,7 +471,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -499,7 +500,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -562,7 +563,7 @@ func TestReconcile(t *testing.T) { }, WantErr: true, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -599,7 +600,7 @@ func TestReconcile(t *testing.T) { }, WantErr: true, WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: createDispatcherDeployment(false, ""), + Object: createDispatcherDeployment(false, "", brokerDLQName), }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -626,7 +627,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), markReady(createQueue(config, false, "")), markReady(createBinding(false, false, "")), - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -651,7 +652,7 @@ func TestReconcile(t *testing.T) { createRabbitMQCluster(""), markReady(createQueue(config, false, "")), markReady(createBinding(true, false, "")), - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -821,7 +822,7 @@ func TestReconcile(t *testing.T) { }, WantErr: false, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -854,7 +855,7 @@ func TestReconcile(t *testing.T) { createDispatcherDeploymentWithParallelism(), }, WantUpdates: []clientgotesting.UpdateActionImpl{{ - Object: createDispatcherDeployment(false, ""), + Object: createDispatcherDeployment(false, "", brokerDLQName), }}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -883,11 +884,11 @@ func TestReconcile(t *testing.T) { markReady(createQueue(config, false, "")), markReady(createExchange()), markReady(createBinding(true, true, "")), - createDispatcherDeployment(true, ""), + createDispatcherDeployment(true, "", brokerDLQName), markReady(createPolicy()), }, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: triggerWithFilterReady(), @@ -947,7 +948,7 @@ func TestReconcile(t *testing.T) { }, WantErr: false, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, ""), + createDispatcherDeployment(false, "", brokerDLQName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewTrigger(triggerName, testNS, brokerName, @@ -984,7 +985,7 @@ func TestReconcile(t *testing.T) { }, WantErr: false, WantCreates: []runtime.Object{ - createDispatcherDeployment(false, rabbitMQVhost), + createDispatcherDeployment(false, rabbitMQVhost, brokerDLQName), }, WantUpdates: []clientgotesting.UpdateActionImpl{{ Object: markReady(createQueue(configWithRabbitMQBrokerConfig(), false, rabbitMQVhost)), @@ -1317,7 +1318,7 @@ func createRabbitMQBrokerConfig(vhost string) *v1alpha1.RabbitmqBrokerConfig { } } -func createDispatcherDeployment(dlq bool, vhost string) *appsv1.Deployment { +func createDispatcherDeployment(dlq bool, vhost, dlxName string) *appsv1.Deployment { args := &resources.DispatcherArgs{ Trigger: &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ @@ -1337,6 +1338,7 @@ func createDispatcherDeployment(dlq bool, vhost string) *appsv1.Deployment { BrokerIngressURL: brokerAddress, Subscriber: subscriberAddressable, DLX: dlq, + DLXName: dlxName, } return resources.MakeDispatcherDeployment(args) } @@ -1360,6 +1362,7 @@ func createDispatcherDeploymentWithResourceRequirements(dlq bool) *appsv1.Deploy BrokerIngressURL: brokerAddress, Subscriber: subscriberAddressable, DLX: dlq, + DLXName: brokerDLQName, ResourceRequirements: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse("500m"), @@ -1372,7 +1375,7 @@ func createDispatcherDeploymentWithResourceRequirements(dlq bool) *appsv1.Deploy return resources.MakeDispatcherDeployment(args) } -func createDispatcherDeploymentWithRetries() *appsv1.Deployment { +func createDispatcherDeploymentWithRetries(dlxName string) *appsv1.Deployment { args := &resources.DispatcherArgs{ Trigger: &eventingv1.Trigger{ ObjectMeta: metav1.ObjectMeta{ @@ -1391,6 +1394,7 @@ func createDispatcherDeploymentWithRetries() *appsv1.Deployment { BrokerIngressURL: brokerAddress, Subscriber: subscriberAddressable, Delivery: &eventingduckv1.DeliverySpec{}, + DLXName: dlxName, } return resources.MakeDispatcherDeployment(args) }