Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delivery failure #1163

Merged
merged 30 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
52e3b07
Add knativeerrordest to the event extension when delivery failure
thomasjohansen Jul 7, 2023
24c39af
Check if the extension is set, else set it. If not act like nothing h…
thomasjohansen Jul 10, 2023
4250b09
Cleanup and requeue variable
thomasjohansen Jul 10, 2023
0371885
Merge branch 'knative-sandbox:main' into delivery-failure
thomasjohansen Jul 10, 2023
51afdb7
Set hedders in msg and requeue
thomasjohansen Jul 10, 2023
69ef0b3
Lint
thomasjohansen Jul 10, 2023
30fdc1b
Wrong place to set headers
thomasjohansen Jul 10, 2023
0c426b7
Forgot the status code
thomasjohansen Jul 10, 2023
095834a
Nack if there's a header
thomasjohansen Jul 15, 2023
faea709
Send event directly to RabbitMQ
thomasjohansen Jul 16, 2023
d38fc3d
Handle error and send an event for it
thomasjohansen Jul 16, 2023
6be7192
Add functions to handle DLQ in the new way
thomasjohansen Jul 17, 2023
5fb51ca
Moved things around
thomasjohansen Jul 17, 2023
cf22797
Added missing arg
thomasjohansen Jul 17, 2023
8db07c6
We should always Ack in dispatch to DLQ
thomasjohansen Jul 17, 2023
4b1535f
Confirm mode. Added missing DLX argument
thomasjohansen Jul 19, 2023
031cacc
Fix for DLX
thomasjohansen Jul 19, 2023
0c14e46
Missed variable
thomasjohansen Jul 19, 2023
4a69726
Initial testing work
thomasjohansen Jul 22, 2023
29a2719
Improved tests
thomasjohansen Jul 23, 2023
7d49b80
Quick push
thomasjohansen Jul 24, 2023
e5e6abe
now the failed messages are directly routed to the DLQ exchange defin…
gabo1208 Jul 24, 2023
3589e6f
Update test. Still failing
thomasjohansen Jul 25, 2023
fcfc5ad
fixed tests and removed event nil case when binding.ToEvent fails cau…
gabo1208 Jul 27, 2023
114be75
Added two simple test, to test the dlq function
thomasjohansen Jul 27, 2023
4b38a8f
fixed unit tests
gabo1208 Jul 27, 2023
2c761bb
this is just an smoke test for consumefromqueue function to see if it…
gabo1208 Jul 27, 2023
b4852da
added defered confirmation to connection mock
gabo1208 Jul 27, 2023
8e2b28a
Extended tests
thomasjohansen Jul 31, 2023
107f3c3
improving coverage
gabo1208 Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type envConfig struct {
BrokerIngressURL string `envconfig:"BROKER_INGRESS_URL" required:"true"`
SubscriberURL string `envconfig:"SUBSCRIBER" required:"true"`
SubscriberCACerts string `envconfig:"SUBSCRIBER_CACERTS" required:"false"`
DLX bool `envconfig:"DLX" required:"false"`
DLXName string `envconfig:"DLX_NAME" required:"false"`

// Number of concurrent messages in flight
Parallelism int `envconfig:"PARALLELISM" default:"1" required:"false"`
Expand Down Expand Up @@ -97,11 +99,26 @@ func main() {
BackoffPolicy: backoffPolicy,
WorkerCount: env.Parallelism,
Reporter: reporter,
DLX: env.DLX,
DLXName: env.DLXName,
}

var err error
rmqHelper := rabbit.NewRabbitMQConnectionHandler(1000, logger)
rmqHelper.Setup(ctx, rabbit.VHostHandler(env.RabbitURL, env.RabbitMQVhost), rabbit.ChannelQoS, rabbit.DialWrapper)
rmqHelper.Setup(ctx, rabbit.VHostHandler(
env.RabbitURL,
env.RabbitMQVhost),
func(c rabbit.RabbitMQConnectionInterface, ch rabbit.RabbitMQChannelInterface) error {
if err := rabbit.ChannelQoS(c, ch); err != nil {
return err
}

if err := rabbit.ChannelConfirm(c, ch); err != nil {
return err
}
return nil
},
rabbit.DialWrapper)
for {
if err = d.ConsumeFromQueue(ctx, rmqHelper.GetConnection(), rmqHelper.GetChannel(), env.QueueName); err != nil {
if errors.Is(err, context.Canceled) {
Expand Down
181 changes: 157 additions & 24 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dispatcher

import (
"context"
"fmt"
"net/http"
"sync"
"time"
Expand Down Expand Up @@ -55,6 +56,8 @@ type Dispatcher struct {
BackoffPolicy eventingduckv1.BackoffPolicyType
WorkerCount int
Reporter dispatcher.StatsReporter
DLX bool
DLXName string
}

// ConsumeFromQueue consumes messages from the given message channel and queue.
Expand Down Expand Up @@ -91,7 +94,7 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
}

logging.FromContext(ctx).Info("rabbitmq receiver started, exit with CTRL+C")
logging.FromContext(ctx).Infow("Starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount))
logging.FromContext(ctx).Infow("starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount))

wg := &sync.WaitGroup{}
wg.Add(d.WorkerCount)
Expand All @@ -101,7 +104,11 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, conn rabbit.RabbitMQC
go func() {
defer wg.Done()
for msg := range workerQueue {
d.dispatch(ctx, msg, ceClient)
if d.DLX {
_ = d.dispatchDLQ(ctx, msg, ceClient)
} else {
_ = d.dispatch(ctx, msg, ceClient, channel)
}
}
}()
}
Expand Down Expand Up @@ -142,25 +149,128 @@ func getStatus(ctx context.Context, result protocol.Result) (int, bool) {
retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: httpResult.StatusCode}, nil)
return httpResult.StatusCode, !retry
}
logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v", retriesResult.Result)
logging.FromContext(ctx).Warnf("invalid result type, not HTTP Result: %v", retriesResult.Result)
return -1, false
}

logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult")
logging.FromContext(ctx).Warnf("invalid result type, not RetriesResult")
return -1, false
}

func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) {
func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client, channel rabbit.RabbitMQChannelInterface) error {
start := time.Now()
dlqExchange := d.DLXName

ctx, span := readSpan(ctx, msg)
defer span.End()

msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err (NACK-ing and not re-queueing): ", err)
err = msg.Nack(false, false)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
if event == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this if, the event probably is going to be nill anyways, so just set the headers and to the else part

logging.FromContext(ctx).Error("failed parsing event: ", err)
} else {
logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err)

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
}

gabo1208 marked this conversation as resolved.
Show resolved Hide resolved
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

return fmt.Errorf("failed parsing event: %s", err)
}

if span.IsRecordingEvents() {
span.AddAttributes(client.EventTraceAttributes(event)...)
}

ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)
if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}

response, result := ceClient.Request(ctx, *event)
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err = d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("something happened: %v", err)
}
return
}

if !isSuccess {
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)

// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return fmt.Errorf("failed to deliver to %q", d.SubscriberURL)
} else if response != nil {
logging.FromContext(ctx).Infof("sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result = ceClient.Send(ctx, *response)
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL)

// We need to ack the original message.
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
event.SetExtension("knativeerrorcode", statusCode)
event.SetExtension("knativeerrordata", result)

// Queue the event into DLQ with the correct headers.
if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL)
}
}

if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
dispatchTime := time.Since(start)
_ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime)
}
return nil
}

// Defaulting to Ack as this always hits the DLQ.
func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClient cloudevents.Client) error {
start := time.Now()
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
logging.FromContext(ctx).Warn("failed creating event from delivery, err: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return fmt.Errorf("failed creating event from delivery, err: %s", err)
}

ctx, span := readSpan(ctx, msg)
Expand All @@ -180,41 +290,64 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
statusCode, isSuccess := getStatus(ctx, result)
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
if err := d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("Something happened: %v", err)
if err = d.Reporter.ReportEventCount(args, statusCode); err != nil {
logging.FromContext(ctx).Errorf("something happened: %v", err)
}
}

if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.SubscriberURL)
if err := msg.Nack(false, false); err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return
return fmt.Errorf("failed to deliver to %q", d.SubscriberURL)
} else if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
result := ceClient.Send(ctx, *response)
_, isSuccess := getStatus(ctx, result)
result = ceClient.Send(ctx, *response)
_, isSuccess = getStatus(ctx, result)
if !isSuccess {
logging.FromContext(ctx).Warnf("Failed to deliver to %q", d.BrokerIngressURL)
err = msg.Nack(false, false) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.BrokerIngressURL)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
return
return fmt.Errorf("failed to deliver to %q", d.BrokerIngressURL)
}
}

err = msg.Ack(false)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
if statusCode != -1 {
args := &dispatcher.ReportArgs{EventType: event.Type()}
dispatchTime := time.Since(start)
_ = d.Reporter.ReportEventDispatchTime(args, statusCode, dispatchTime)
}
return nil
}

func sendToRabbitMQ(channel rabbit.RabbitMQChannelInterface, exchangeName string, span *trace.Span, event *cloudevents.Event) error {
// no dlq defined in the trigger nor the broker, return
if exchangeName == "" {
return nil
}

tp, ts := (&tracecontext.HTTPFormat{}).SpanContextToHeaders(span.SpanContext())
dc, err := channel.PublishWithDeferredConfirm(
exchangeName,
"", // routing key
false, // mandatory
false, // immediate
*rabbit.CloudEventToRabbitMQMessage(event, tp, ts))
if err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

if ack := dc.Wait(); !ack {
return errors.New("failed to publish message: nacked")
}
return nil
}

func readSpan(ctx context.Context, msg amqp.Delivery) (context.Context, *trace.Span) {
Expand Down
Loading