Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for concurrent dispatch and configurable prefetch #418

Merged
merged 8 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ type envConfig struct {
// Should failed deliveries be requeued in the RabbitMQ?
Requeue bool `envconfig:"REQUEUE" default:"false" required:"true"`

// Number of concurrent messages in flight
PrefetchCount int `envconfig:"PREFETCH_COUNT" default:"10" required:"false"`
Retry int `envconfig:"RETRY" required:"false"`
BackoffPolicy string `envconfig:"BACKOFF_POLICY" required:"false"`
BackoffDelay time.Duration `envconfig:"BACKOFF_DELAY" required:"false"`
}

const (
defaultBackoffDelay = 50 * time.Millisecond
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would set this as a BackoffDelay default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean. The prefetch count in milliseconds should be the backoff delay?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultPrefetch = 1
defaultPrefetchSize = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to drop this since we force no-ack

"The prefetch-size is ignored if the no-ack option is set" https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you see that we force no-ack? https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.no-ack

That would imply we don't use acknowledgements, which is not true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I'm reading go-amqp and the Rabbit docs:

When autoAck (also known as noAck) is true, the server will acknowledge
deliveries to this consumer prior to writing the delivery to the network. When
autoAck is true, the consumer should not call Delivery.Ack.

https://github.com/knative-sandbox/eventing-rabbitmq/blob/c944a0c5a3bea768ce200c6812d44072b4bb7001/vendor/github.com/streadway/amqp/channel.go#L1017

The prefetch-size is ignored if the no-ack option is set.

https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size

We set autoAck=false: https://github.com/knative-sandbox/eventing-rabbitmq/blob/c944a0c5a3bea768ce200c6812d44072b4bb7001/pkg/dispatcher/dispatcher.go#L70

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think this is bad documentation on rabbit's part. I think they mean

The prefetch-size is ignored if the no-ack option is set to true.

Since we set it to false, we are not using autoAck/noAck, and therefore prefetch size/prefetch count is still relevant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gerhard can you back me up on this interpretation?

)

func main() {
Expand Down Expand Up @@ -99,15 +99,23 @@ func main() {
}()

err = channel.Qos(
defaultPrefetch, // prefetch count
defaultPrefetchSize, // prefetch size
false, // global
env.PrefetchCount, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
logging.FromContext(ctx).Fatal("Failed to create QoS: ", err)
}

d := dispatcher.NewDispatcher(env.BrokerIngressURL, env.SubscriberURL, env.Requeue, env.Retry, backoffDelay, backoffPolicy)
d := &dispatcher.Dispatcher{
BrokerIngressURL: env.BrokerIngressURL,
SubscriberURL: env.SubscriberURL,
Requeue: env.Requeue,
MaxRetries: env.Retry,
BackoffDelay: backoffDelay,
BackoffPolicy: backoffPolicy,
WorkerCount: env.PrefetchCount,
}
if err := d.ConsumeFromQueue(ctx, channel, env.QueueName); err != nil {
// ignore ctx cancelled and channel closed errors
if errors.Is(err, context.Canceled) || errors.Is(err, amqperr.ErrClosed) {
Expand Down
161 changes: 82 additions & 79 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dispatcher
import (
"context"
"encoding/json"
"sync"
"time"

"github.com/NeowayLabs/wabbit"
Expand All @@ -37,27 +38,17 @@ const (
)

type Dispatcher struct {
brokerIngressURL string
subscriberURL string
BrokerIngressURL string
SubscriberURL string

// Upon failure to deliver to sink, should the RabbitMQ messages be requeued.
// For example, if the DeadLetterSink has been configured, then do not requeue.
requeue bool
// For example, if the DeadLetterSink has been configured, then do not Requeue.
Requeue bool

maxRetries int
backoffDelay time.Duration
backoffPolicy eventingduckv1.BackoffPolicyType
}

func NewDispatcher(brokerIngressURL, subscriberURL string, requeue bool, maxRetries int, backoffDelay time.Duration, backoffPolicy eventingduckv1.BackoffPolicyType) *Dispatcher {
return &Dispatcher{
brokerIngressURL: brokerIngressURL,
subscriberURL: subscriberURL,
requeue: requeue,
maxRetries: maxRetries,
backoffDelay: backoffDelay,
backoffPolicy: backoffPolicy,
}
MaxRetries int
BackoffDelay time.Duration
BackoffPolicy eventingduckv1.BackoffPolicyType
WorkerCount int
}

// ConsumeFromQueue consumes messages from the given message channel and queue.
Expand All @@ -83,76 +74,31 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, channel wabbit.Channe
}

logging.FromContext(ctx).Info("rabbitmq receiver started, exit with CTRL+C")
logging.FromContext(ctx).Infow("Starting to process messages", zap.String("queue", queueName))
logging.FromContext(ctx).Infow("Starting to process messages", zap.String("queue", queueName), zap.Int("workers", d.WorkerCount))

wg := &sync.WaitGroup{}
wg.Add(d.WorkerCount)
workerQueue := make(chan wabbit.Delivery, d.WorkerCount)

for i := 0; i < d.WorkerCount; i++ {
go d.dispatch(ctx, wg, workerQueue, ceClient)
}
for {
select {
case <-ctx.Done():
logging.FromContext(ctx).Info("context done, stopping message consumer")
logging.FromContext(ctx).Info("context done, stopping message consumers")
close(workerQueue)
wg.Wait()
return ctx.Err()

case msg, ok := <-msgs:
if !ok {
logging.FromContext(ctx).Warn("message channel closed, stopping message consumer")
logging.FromContext(ctx).Warn("message channel closed, stopping message consumers")
close(workerQueue)
wg.Wait()
return amqperr.ErrClosed
}

event := cloudevents.NewEvent()
err := json.Unmarshal(msg.Body(), &event)
if err != nil {
logging.FromContext(ctx).Warn("failed to unmarshal event (NACK-ing and not re-queueing): ", err)
err = msg.Nack(ackMultiple, false) // do not requeue
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
continue
}

logging.FromContext(ctx).Debugf("Got event as: %+v", event)
ctx = cloudevents.ContextWithTarget(ctx, d.subscriberURL)

// Our dispatcher uses Retries, but cloudevents is the max total tries. So we need
// to adjust to initial + retries.
// TODO: What happens if I specify 0 to cloudevents. Does it not even retry.
retryCount := d.maxRetries
if d.backoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.backoffDelay, retryCount)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.backoffDelay, retryCount)
}

response, result := ceClient.Request(ctx, event)
if !isSuccess(ctx, result) {
logging.FromContext(ctx).Warnf("Failed to deliver to %q requeue: %v", d.subscriberURL, d.requeue)
err = msg.Nack(ackMultiple, d.requeue)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
continue
}

logging.FromContext(ctx).Debugf("Got Response: %+v", response)
if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.brokerIngressURL)
backoffDelay := 50 * time.Millisecond
// Use the retries so we can just parse out the results in a common way.
cloudevents.ContextWithRetriesExponentialBackoff(ctx, backoffDelay, 1)
result := ceClient.Send(ctx, *response)
if !isSuccess(ctx, result) {
logging.FromContext(ctx).Warnf("Failed to deliver to %q requeue: %v", d.brokerIngressURL, d.requeue)
err = msg.Nack(ackMultiple, d.requeue) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
continue
}
}

err = msg.Ack(ackMultiple)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
}
workerQueue <- msg
}
}
}
Expand All @@ -172,10 +118,67 @@ func isSuccess(ctx context.Context, result protocol.Result) bool {
return false
}
}
logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result")
logging.FromContext(ctx).Warnf("Invalid result type, not HTTP Result: %v", retriesResult.Result)
return false
}

logging.FromContext(ctx).Warnf("Invalid result type, not RetriesResult")
return false
}

func (d *Dispatcher) dispatch(ctx context.Context, wg *sync.WaitGroup, queue <-chan wabbit.Delivery, ceClient cloudevents.Client) {
defer wg.Done()

for msg := range queue {
event := cloudevents.NewEvent()
err := json.Unmarshal(msg.Body(), &event)
if err != nil {
logging.FromContext(ctx).Warn("failed to unmarshal event (NACK-ing and not re-queueing): ", err)
err = msg.Nack(ackMultiple, false) // do not requeue
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
continue
}

logging.FromContext(ctx).Debugf("Got event as: %+v", event)
ctx = cloudevents.ContextWithTarget(ctx, d.SubscriberURL)

if d.BackoffPolicy == eventingduckv1.BackoffPolicyLinear {
ctx = cloudevents.ContextWithRetriesLinearBackoff(ctx, d.BackoffDelay, d.MaxRetries)
} else {
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
}

response, result := ceClient.Request(ctx, event)
if !isSuccess(ctx, result) {
logging.FromContext(ctx).Warnf("Failed to deliver to %q requeue: %v", d.SubscriberURL, d.Requeue)
err = msg.Nack(ackMultiple, d.Requeue)
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
continue
}

logging.FromContext(ctx).Debugf("Got Response: %+v", response)
if response != nil {
logging.FromContext(ctx).Infof("Sending an event: %+v", response)
ctx = cloudevents.ContextWithTarget(ctx, d.BrokerIngressURL)
cloudevents.ContextWithRetriesExponentialBackoff(ctx, d.BackoffDelay, d.MaxRetries)
result := ceClient.Send(ctx, *response)
if !isSuccess(ctx, result) {
logging.FromContext(ctx).Warnf("Failed to deliver to %q requeue: %v", d.BrokerIngressURL, d.Requeue)
err = msg.Nack(ackMultiple, d.Requeue) // not multiple
if err != nil {
logging.FromContext(ctx).Warn("failed to NACK event: ", err)
}
continue
}
}

err = msg.Ack(ackMultiple)
if err != nil {
logging.FromContext(ctx).Warn("failed to ACK event: ", err)
}
}
}
26 changes: 16 additions & 10 deletions pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

ce "github.com/cloudevents/sdk-go/v2/event"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
)

Expand Down Expand Up @@ -128,15 +129,11 @@ func TestFailToConsume(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)

backoffDelay, err := time.ParseDuration("1s")
if err != nil {
t.Error("Failed to parse duration: ", err)
}
ch, _, err := createRabbitAndQueue()
if err != nil {
t.Error("Failed to create rabbit and queue")
}
d := NewDispatcher("", "", false, 1, backoffDelay, eventingduckv1.BackoffPolicyExponential)
d := &Dispatcher{}
err = d.ConsumeFromQueue(context.TODO(), ch, "nosuchqueue")
if err == nil {
t.Fatal("Did not fail to consume.", err)
Expand Down Expand Up @@ -322,7 +319,15 @@ func TestEndToEnd(t *testing.T) {
backoffPolicy = eventingduckv1.BackoffPolicyLinear

}
d := NewDispatcher(broker.URL, subscriber.URL, tc.requeue, tc.maxRetries, backoffDelay, backoffPolicy)
d := &Dispatcher{
BrokerIngressURL: broker.URL,
SubscriberURL: subscriber.URL,
Requeue: tc.requeue,
MaxRetries: tc.maxRetries,
BackoffDelay: backoffDelay,
BackoffPolicy: backoffPolicy,
WorkerCount: 1,
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -365,10 +370,8 @@ func TestEndToEnd(t *testing.T) {
if subscriberHandler.getReceivedCount() != tc.subscriberReceiveCount {
t.Errorf("subscriber got %d events, wanted %d", subscriberHandler.getReceivedCount(), tc.subscriberReceiveCount)
} else {
for i := range tc.expectedSubscriberBodies {
if diff := cmp.Diff(tc.expectedSubscriberBodies[i], subscriberHandler.getBodies()[i]); diff != "" {
t.Error("unexpected subscriber diff (-want, +got) = ", diff)
}
if diff := cmp.Diff(tc.expectedSubscriberBodies, subscriberHandler.getBodies(), cmpopts.SortSlices(stringSort)); diff != "" {
t.Error("unexpected subscriber diff (-want, +got) = ", diff)
}
}
if brokerHandler.getReceivedCount() != tc.brokerReceiveCount {
Expand Down Expand Up @@ -440,3 +443,6 @@ func createEvent(data string) ce.Event {
event.SetData(cloudevents.ApplicationJSON, data)
return event
}
func stringSort(x, y string) bool {
return x < y
}
Loading