Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delivery failure #1163

Merged
merged 30 commits into from
Jul 31, 2023
Merged
Changes from 10 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
52e3b07
Add knativeerrordest to the event extension when delivery failure
thomasjohansen Jul 7, 2023
24c39af
Check if the extension is set, else set it. If not act like nothing h…
thomasjohansen Jul 10, 2023
4250b09
Cleanup and requeue variable
thomasjohansen Jul 10, 2023
0371885
Merge branch 'knative-sandbox:main' into delivery-failure
thomasjohansen Jul 10, 2023
51afdb7
Set hedders in msg and requeue
thomasjohansen Jul 10, 2023
69ef0b3
Lint
thomasjohansen Jul 10, 2023
30fdc1b
Wrong place to set headers
thomasjohansen Jul 10, 2023
0c426b7
Forgot the status code
thomasjohansen Jul 10, 2023
095834a
Nack if there's a header
thomasjohansen Jul 15, 2023
faea709
Send event directly to RabbitMQ
thomasjohansen Jul 16, 2023
d38fc3d
Handle error and send an event for it
thomasjohansen Jul 16, 2023
6be7192
Add functions to handle DLQ in the new way
thomasjohansen Jul 17, 2023
5fb51ca
Moved things around
thomasjohansen Jul 17, 2023
cf22797
Added missing arg
thomasjohansen Jul 17, 2023
8db07c6
We should always Ack in dispatch to DLQ
thomasjohansen Jul 17, 2023
4b1535f
Confirm mode. Added missing DLX argument
thomasjohansen Jul 19, 2023
031cacc
Fix for DLX
thomasjohansen Jul 19, 2023
0c14e46
Missed variable
thomasjohansen Jul 19, 2023
4a69726
Initial testing work
thomasjohansen Jul 22, 2023
29a2719
Improved tests
thomasjohansen Jul 23, 2023
7d49b80
Quick push
thomasjohansen Jul 24, 2023
e5e6abe
now the failed messages are directly routed to the DLQ exchange defin…
gabo1208 Jul 24, 2023
3589e6f
Update test. Still failing
thomasjohansen Jul 25, 2023
fcfc5ad
fixed tests and removed event nil case when binding.ToEvent fails cau…
gabo1208 Jul 27, 2023
114be75
Added two simple test, to test the dlq function
thomasjohansen Jul 27, 2023
4b38a8f
fixed unit tests
gabo1208 Jul 27, 2023
2c761bb
this is just an smoke test for consumefromqueue function to see if it…
gabo1208 Jul 27, 2023
b4852da
added defered confirmation to connection mock
gabo1208 Jul 27, 2023
8e2b28a
Extended tests
thomasjohansen Jul 31, 2023
107f3c3
improving coverage
gabo1208 Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 64 additions & 17 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher

import (
"context"
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
go func() {
defer wg.Done()
for msg := range workerQueue {
d.dispatch(ctx, msg, ceClient)
d.dispatch(ctx, msg, ceClient, channel)
}
}()
}
Expand Down Expand Up @@ -150,15 +151,22 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {
return -1, false
}

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) {
start := time.Now()
if _, ok := msg.Headers["knativeerrordest"]; ok {
gabo1208 marked this conversation as resolved.
Show resolved Hide resolved
if err := msg.Ack(false); err != nil {
gabo1208 marked this conversation as resolved.
Show resolved Hide resolved
logging.FromContext(ctx).Warn("failed to Nack event: ", err)
}
return
}

msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err)

gabo1208 marked this conversation as resolved.
Show resolved Hide resolved
if err = msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to Nack event: ", err)
}
return
}
Expand All @@ -180,35 +188,56 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err := d.Reporter.ReportEventCount(args, statusCode); err != nil {
if err = d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
gabo1208 marked this conversation as resolved.
Show resolved Hide resolved
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)

// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return
} else if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result := ceClient.Send(ctx, *response)
_, isSuccess := getStatus(ctx, result)
result = ceClient.Send(ctx, *response)
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)
err = msg.Nack(false, false) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)
event.SetExtension("knativeerrordata", result)

// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, msg.Exchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
Expand All @@ -217,6 +246,24 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
}
}

func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error {
tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
dc, err := channel.PublishWithDeferredConfirm(
exchangeName,
"", // routing key
false, // mandatory
false, // immediate
*rabbit.CloudEventToRabbitMQMessage(event, tp, ts))
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

if ack := dc.Wait(); !ack {
return errors.New("failed to publish message: nacked")
}
return nil
}

func readSpan(ctx context.Context, msg amqp.Delivery) (context.Context, *trace.Span) {
traceparent, ok := msg.Headers["traceparent"].(string)
if !ok {
Expand Down