Skip to content

Commit

Permalink
Add functions to handle DLQ in the new way
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjohansen committed Jul 17, 2023
1 parent d38fc3d commit 6be7192
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 3 deletions.
2 changes: 2 additions & 0 deletions cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type envConfig struct {
BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"`
SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"`
SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"`
DLX bool `envconfig:"DLX" required:"false"`

// Number of concurrent messages in flight
Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"`
Expand Down Expand Up @@ -97,6 +98,7 @@ func main() {
BackoffPolicy: backoffPolicy,
WorkerCount: env.Parallelism,
Reporter: reporter,
DLX: env.DLX,
}

var err error
Expand Down
78 changes: 75 additions & 3 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Dispatcher struct {
BackoffPolicy eventingduckv1.BackoffPolicyType
WorkerCount int
Reporter dispatcher.StatsReporter
DLX bool
}

// ConsumeFromQueue consumes messages from the given message channel and queue.
Expand Down Expand Up @@ -102,7 +103,11 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
go func() {
defer wg.Done()
for msg := range workerQueue {
d.dispatch(ctx, msg, ceClient, channel)
if d.DLX {
d.dispatchDLQ(ctx, msg, ceClient, channel)
} else {
d.dispatch(ctx, msg, ceClient)
}
}
}()
}
Expand Down Expand Up @@ -151,7 +156,74 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {
return -1, false
}

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) {
func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
start := time.Now()
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}

ctx, span := readSpan(ctx, msg)
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}

ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)
if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}

response, result := ceClient.Request(ctx, *event)
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err := d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
} else if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result := ceClient.Send(ctx, *response)
_, isSuccess := getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)
err = msg.Nack(false, false) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
return
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
dispatchTime := time.Since(start)
_ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime)
}
}

func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) {
start := time.Now()
if _, ok := msg.Headers["knativeerrordest"]; ok {
if err := msg.Nack(false, false); err != nil {
Expand Down Expand Up @@ -181,7 +253,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c

return
}

if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/reconciler/trigger/resources/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment {
Value: *args.Subscriber.CACerts,
})
}
if args.DLX {
dispatcher.Env = append(dispatcher.Env,
corev1.EnvVar{
Name: "DLX",
Value: "true",
})
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: args.Trigger.Namespace,
Expand Down

0 comments on commit 6be7192

Please sign in to comment.