Skip to content

Commit

Permalink
Broker and Trigger use delivery spec for dead letter messaging
Browse files Browse the repository at this point in the history
- Plumbs deliverySpec.BackoffDelay through to the dispatcher
  • Loading branch information
gab-satchi committed Apr 19, 2022
1 parent 25f7eef commit 6c5aa04
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 30 deletions.
1 change: 1 addition & 0 deletions pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (r *Reconciler) reconcileDLXDispatcherDeployment(ctx context.Context, b *ev
Broker: b,
Image: r.dispatcherImage,
//ServiceAccountName string
Delivery: b.Spec.Delivery,
RabbitMQSecretName: resources.SecretName(b.Name),
QueueName: naming.CreateBrokerDeadLetterQueueName(b),
BrokerUrlSecretKey: resources.BrokerURLSecretKey,
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,7 @@ func createDispatcherDeployment() *appsv1.Deployment {
}
args := &resources.DispatcherArgs{
Broker: broker,
Delivery: delivery,
Image: dispatcherImage,
RabbitMQSecretName: rabbitBrokerSecretName,
QueueName: "b.test-namespace.test-broker.dlq.broker-test-uid",
Expand Down
36 changes: 34 additions & 2 deletions pkg/reconciler/broker/resources/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
reconcilersource "knative.dev/eventing/pkg/reconciler/source"
Expand All @@ -39,8 +40,9 @@ const (
// DispatcherArgs are the arguments to create a Broker's Dispatcher Deployment that handles
// DeadLetterSink deliveries.
type DispatcherArgs struct {
Broker *eventingv1.Broker
Image string
Delivery *eventingduckv1.DeliverySpec
Broker *eventingv1.Broker
Image string
//ServiceAccountName string
RabbitMQHost string
RabbitMQSecretName string
Expand Down Expand Up @@ -88,6 +90,36 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment {
if args.Configs != nil {
envs = append(envs, args.Configs.ToEnvVars()...)
}
if args.Delivery != nil {
if args.Delivery.Retry != nil {
envs = append(envs,
corev1.EnvVar{
Name: "RETRY",
Value: fmt.Sprint(*args.Delivery.Retry),
})

} else {
envs = append(envs,
corev1.EnvVar{
Name: "RETRY",
Value: "5",
})
}
if args.Delivery.BackoffPolicy != nil {
envs = append(envs,
corev1.EnvVar{
Name: "BACKOFF_POLICY",
Value: string(*args.Delivery.BackoffPolicy),
})
}
if args.Delivery.BackoffDelay != nil {
envs = append(envs,
corev1.EnvVar{
Name: "BACKOFF_DELAY",
Value: *args.Delivery.BackoffDelay,
})
}
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: args.Broker.Namespace,
Expand Down
17 changes: 17 additions & 0 deletions pkg/reconciler/broker/resources/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/ptr"
"knative.dev/pkg/system"

_ "knative.dev/pkg/system/testing"
Expand All @@ -49,6 +51,7 @@ func TestMakeDispatcherDeployment(t *testing.T) {
broker := &eventingv1.Broker{
ObjectMeta: metav1.ObjectMeta{Name: brokerName, Namespace: ns},
}
linear := v1.BackoffPolicyLinear
args := &DispatcherArgs{
Broker: broker,
Image: image,
Expand All @@ -58,6 +61,11 @@ func TestMakeDispatcherDeployment(t *testing.T) {
BrokerUrlSecretKey: brokerURLKey,
Subscriber: sURL,
BrokerIngressURL: bURL,
Delivery: &v1.DeliverySpec{
Retry: ptr.Int32(10),
BackoffDelay: ptr.String("20s"),
BackoffPolicy: &linear,
},
}

got := MakeDispatcherDeployment(args)
Expand Down Expand Up @@ -122,6 +130,15 @@ func TestMakeDispatcherDeployment(t *testing.T) {
}, {
Name: "BROKER_INGRESS_URL",
Value: brokerIngressURL,
}, {
Name: "RETRY",
Value: "10",
}, {
Name: "BACKOFF_POLICY",
Value: "linear",
}, {
Name: "BACKOFF_DELAY",
Value: "20s",
}},
}},
},
Expand Down
7 changes: 7 additions & 0 deletions pkg/reconciler/trigger/resources/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment {
Value: string(*args.Delivery.BackoffPolicy),
})
}
if args.Delivery.BackoffDelay != nil {
dispatcher.Env = append(dispatcher.Env,
corev1.EnvVar{
Name: "BACKOFF_DELAY",
Value: *args.Delivery.BackoffDelay,
})
}
}
if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok {
dispatcher.Env = append(dispatcher.Env,
Expand Down
5 changes: 4 additions & 1 deletion pkg/reconciler/trigger/resources/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/ptr"
"knative.dev/pkg/system"
_ "knative.dev/pkg/system/testing"
)
Expand Down Expand Up @@ -57,14 +58,16 @@ func TestMakeDispatcherDeployment(t *testing.T) {
want: deployment(),
},
{
name: "with retry and backoff",
name: "with delivery spec",
args: dispatcherArgs(withDelivery(&eventingduckv1.DeliverySpec{
Retry: Int32Ptr(10),
BackoffPolicy: &exponentialBackoff,
BackoffDelay: ptr.String("20s"),
})),
want: deployment(
withEnv(corev1.EnvVar{Name: "RETRY", Value: "10"}),
withEnv(corev1.EnvVar{Name: "BACKOFF_POLICY", Value: "exponential"}),
withEnv(corev1.EnvVar{Name: "BACKOFF_DELAY", Value: "20s"}),
),
},
{
Expand Down
38 changes: 11 additions & 27 deletions pkg/reconciler/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p
}
t.Status.MarkDeadLetterSinkResolvedSucceeded()
t.Status.DeadLetterSinkURI = deadLetterSinkURI
_, err = r.reconcileDLXDispatcherDeployment(ctx, t, deadLetterSinkURI)
_, err = r.reconcileDispatcherDeployment(ctx, t, deadLetterSinkURI, t.Spec.Delivery, true)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling DLX dispatcher Deployment", zap.Error(err))
t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err)
Expand Down Expand Up @@ -293,7 +293,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, t *eventingv1.Trigger) p
delivery = broker.Spec.Delivery
}

_, err = r.reconcileDispatcherDeployment(ctx, t, subscriberURI, delivery)
_, err = r.reconcileDispatcherDeployment(ctx, t, subscriberURI, delivery, false)
if err != nil {
logging.FromContext(ctx).Error("Problem reconciling dispatcher Deployment", zap.Error(err))
t.Status.MarkDependencyFailed("DeploymentFailure", "%v", err)
Expand Down Expand Up @@ -328,7 +328,7 @@ func (r *Reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment)
}

// reconcileDispatcherDeployment reconciles Trigger's dispatcher deployment.
func (r *Reconciler) reconcileDispatcherDeployment(ctx context.Context, t *eventingv1.Trigger, sub *apis.URL, delivery *eventingduckv1.DeliverySpec) (*v1.Deployment, error) {
func (r *Reconciler) reconcileDispatcherDeployment(ctx context.Context, t *eventingv1.Trigger, sub *apis.URL, delivery *eventingduckv1.DeliverySpec, dlq bool) (*v1.Deployment, error) {
rabbitmqSecret, err := r.getRabbitmqSecret(ctx, t)
if err != nil {
return nil, err
Expand All @@ -337,39 +337,23 @@ func (r *Reconciler) reconcileDispatcherDeployment(ctx context.Context, t *event
if err != nil {
return nil, err
}
expected := resources.MakeDispatcherDeployment(&resources.DispatcherArgs{
Trigger: t,
Image: r.dispatcherImage,
RabbitMQSecretName: rabbitmqSecret.Name,
QueueName: naming.CreateTriggerQueueName(t),
BrokerUrlSecretKey: brokerresources.BrokerURLSecretKey,
BrokerIngressURL: b.Status.Address.URL,
Subscriber: sub,
Delivery: delivery,
Configs: r.configs,
})
return r.reconcileDeployment(ctx, expected)
}

// reconcileDispatcherDeployment reconciles Trigger's dispatcher deployment.
func (r *Reconciler) reconcileDLXDispatcherDeployment(ctx context.Context, t *eventingv1.Trigger, sub *apis.URL) (*v1.Deployment, error) {
rabbitmqSecret, err := r.getRabbitmqSecret(ctx, t)
if err != nil {
return nil, err
}
b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker)
if err != nil {
return nil, err
queueName := naming.CreateTriggerQueueName(t)
if dlq {
// overwrite to a dlq queueName if it's a dlq
queueName = naming.CreateTriggerDeadLetterQueueName(t)
}

expected := resources.MakeDispatcherDeployment(&resources.DispatcherArgs{
Trigger: t,
Image: r.dispatcherImage,
RabbitMQSecretName: rabbitmqSecret.Name,
QueueName: naming.CreateTriggerDeadLetterQueueName(t),
QueueName: queueName,
BrokerUrlSecretKey: brokerresources.BrokerURLSecretKey,
BrokerIngressURL: b.Status.Address.URL,
Subscriber: sub,
DLX: true,
DLX: dlq,
Delivery: delivery,
Configs: r.configs,
})
return r.reconcileDeployment(ctx, expected)
Expand Down

0 comments on commit 6c5aa04

Please sign in to comment.