Skip to content

Commit

Permalink
stream: made it easier to run a batch consumer and added methods to c…
Browse files Browse the repository at this point in the history
…onvert between normal and batch consumer callbacks;

This allows you to use `application.RunBatchConsumer` similary to how
you would use `application.RunConsumer` to run a batch consumer. It also
adds `stream.ConsumerToBatchConsumer` and `stream.ConsumerToParallelBatchConsumer`
to adapt single consumers to batch consumers.

You should set stream.consumer.<name>.batch_size to take advantage of batching as it defaults to 1.
ajscholl committed Nov 21, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent a5f7508 commit d113b65
Showing 4 changed files with 156 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pkg/application/runners.go
Original file line number Diff line number Diff line change
@@ -78,6 +78,21 @@ func RunConsumers(consumers stream.ConsumerCallbackMap, options ...Option) {
Run(options...)
}

func RunBatchConsumer(callback stream.BatchConsumerCallbackFactory, options ...Option) {
RunBatchConsumers(stream.BatchConsumerCallbackMap{
"default": callback,
}, options...)
}

func RunBatchConsumers(consumers stream.BatchConsumerCallbackMap, options ...Option) {
factory := stream.NewBatchConsumerFactory(consumers)

options = append(options, WithModuleMultiFactory(factory))
options = append(options, WithExecBackoffInfinite)

Run(options...)
}

func RunMdlSubscriber(transformers mdlsub.TransformerMapTypeVersionFactories, options ...Option) {
subs := mdlsub.NewSubscriberFactory(transformers)

106 changes: 106 additions & 0 deletions pkg/stream/consumer_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package stream

import (
"context"

"github.com/justtrackio/gosoline/pkg/coffin"
)

type consumerBatchAdapter struct {
ConsumerCallback
}

type parallelConsumerBatchAdapter struct {
ConsumerCallback
}

type consumerAdapter struct {
BatchConsumerCallback
}

var (
_ BatchConsumerCallback = consumerBatchAdapter{}
_ BatchConsumerCallback = parallelConsumerBatchAdapter{}
_ ConsumerCallback = consumerAdapter{}
)

func (c consumerBatchAdapter) Consume(ctx context.Context, models []any, attributes []map[string]string) ([]bool, error) {
acks := make([]bool, len(models))
for i, model := range models {
ack, err := c.ConsumerCallback.Consume(ctx, model, attributes[i])
if err != nil {
return acks, err
}

acks[i] = ack
}

return acks, nil
}

func (c parallelConsumerBatchAdapter) Consume(ctx context.Context, models []any, attributes []map[string]string) ([]bool, error) {
acks := make([]bool, len(models))
cfn := coffin.New()

cfn.Go(func() error {
for i, model := range models {
i := i
model := model

cfn.Go(func() error {
ack, err := c.ConsumerCallback.Consume(ctx, model, attributes[i])
if err != nil {
return err
}

acks[i] = ack

return nil
})
}

return nil
})

err := cfn.Wait()

return acks, err
}

func (c consumerAdapter) Consume(ctx context.Context, model any, attributes map[string]string) (bool, error) {
acks, err := c.BatchConsumerCallback.Consume(ctx, []any{model}, []map[string]string{attributes})
if len(acks) != 1 {
return false, err
}

return acks[0], err
}

// ConsumerToBatchConsumer turns any consumer callback to a batch consumer callback by calling the callback
// for each single item of the batch. All items are processed serially and acknowledged in one single call
// (useful if you want to save on calls to the underlying input without complicating your code).
//
// Don't forget to configure stream.consumer.<name>.batch_size to take advantage of batching.
func ConsumerToBatchConsumer(consumer ConsumerCallback) BatchConsumerCallback {
return consumerBatchAdapter{
ConsumerCallback: consumer,
}
}

// ConsumerToParallelBatchConsumer is similar to ConsumerToBatchConsumer, but runs the callback in parallel
// for each item in the batch.
//
// Don't forget to configure stream.consumer.<name>.batch_size to take advantage of batching.
func ConsumerToParallelBatchConsumer(consumer ConsumerCallback) BatchConsumerCallback {
return parallelConsumerBatchAdapter{
ConsumerCallback: consumer,
}
}

// BatchConsumerToConsumer turns a batch consumer to a normal consumer. It calls the batch consumer with
// a single item each time, allowing you to reuse the code of a batch consumer as a normal consumer.
func BatchConsumerToConsumer(consumer BatchConsumerCallback) ConsumerCallback {
return consumerAdapter{
BatchConsumerCallback: consumer,
}
}
31 changes: 31 additions & 0 deletions pkg/stream/consumer_batch_module_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package stream

import (
"context"
"fmt"

"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/kernel"
"github.com/justtrackio/gosoline/pkg/log"
)

type BatchConsumerCallbackMap map[string]BatchConsumerCallbackFactory

func NewBatchConsumerFactory(callbacks BatchConsumerCallbackMap) kernel.ModuleMultiFactory {
return func(ctx context.Context, config cfg.Config, logger log.Logger) (map[string]kernel.ModuleFactory, error) {
return BatchConsumerFactory(callbacks)
}
}

func BatchConsumerFactory(callbacks BatchConsumerCallbackMap) (map[string]kernel.ModuleFactory, error) {
modules := make(map[string]kernel.ModuleFactory)

for name, callback := range callbacks {
moduleName := fmt.Sprintf("batch-consumer-%s", name)
consumer := NewBatchConsumer(name, callback)

modules[moduleName] = consumer
}

return modules, nil
}
4 changes: 4 additions & 0 deletions pkg/test/suite/options_suite.go
Original file line number Diff line number Diff line change
@@ -116,6 +116,10 @@ func WithConsumer(callback stream.ConsumerCallbackFactory) Option {
return WithModule("consumer-default", stream.NewConsumer("default", callback))
}

func WithBatchConsumer(callback stream.BatchConsumerCallbackFactory) Option {
return WithModule("batch-consumer-default", stream.NewBatchConsumer("default", callback))
}

func WithEnvSetup(setups ...func() error) Option {
return func(s *suiteOptions) {
s.envSetup = append(s.envSetup, setups...)

0 comments on commit d113b65

Please sign in to comment.