Skip to content

Commit

Permalink
#239 sarama consumer group update
Browse files Browse the repository at this point in the history
Co-authored-by: frairon <[email protected]>
Co-authored-by: R053NR07 <[email protected]>
  • Loading branch information
frairon and R053NR07 committed Mar 19, 2020
1 parent 2554930 commit e25b9f5
Show file tree
Hide file tree
Showing 94 changed files with 9,101 additions and 9,974 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tmp*
*.*~
.tags*
vendor
13 changes: 13 additions & 0 deletions assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package goka

import (
"fmt"
)

// Assignment represents a partition:offset assignment for the current connection
type Assignment map[int32]int64

func (a *Assignment) string() string {
var am map[int32]int64 = *a
return fmt.Sprintf("Assignment %v", am)
}
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package goka

import "github.com/Shopify/sarama"

// Broker is an interface for the sarama broker
type Broker interface {
Addr() string
Connected() (bool, error)
CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
Open(conf *sarama.Config) error
}
88 changes: 88 additions & 0 deletions builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package goka

import (
"hash"

"github.com/Shopify/sarama"
)

// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := globalConfig
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config)
}

// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)

// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, NewTopicManagerConfig())
}

// TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
return NewTopicManager(brokers, config, tmConfig)
}
}

// TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, tmConfig)
}
}

type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

// DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, &config)
}

// ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config
func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder {
return func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, config)
}
}

type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

// DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumer(brokers, &config)
}

// ConsumerBuilderWithConfig creates a sarama consumergroup using passed config
func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder {
return func(brokers []string, clientID string) (sarama.Consumer, error) {
config.ClientID = clientID
return sarama.NewConsumer(brokers, config)
}
}
1 change: 0 additions & 1 deletion codec/codec_test.go

This file was deleted.

57 changes: 57 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package goka

import (
"time"

"github.com/Shopify/sarama"
)

var (
globalConfig = *DefaultConfig()
)

const (
// size of sarama buffer for consumer and producer
defaultChannelBufferSize = 256

// time sarama-cluster assumes the processing of an event may take
defaultMaxProcessingTime = 1 * time.Second

// producer flush configuration
defaultFlushFrequency = 100 * time.Millisecond
defaultFlushBytes = 64 * 1024
defaultProducerMaxRetries = 10
)

// DefaultConfig creates a new config used by goka per default
// Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify
// goka's global config
func DefaultConfig() *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime
// this configures the initial offset for streams. Tables are always
// consumed from OffsetOldest.
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries
return config
}

// ReplaceGlobalConfig registeres a standard config used during building if no
// other config is specified
func ReplaceGlobalConfig(config *sarama.Config) {
if config == nil {
panic("nil config registered as global config")
}
globalConfig = *config
}
42 changes: 42 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package goka

import (
"testing"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/internal/test"
)

func TestConfig_DefaultConfig(t *testing.T) {
t.Run("equal", func(t *testing.T) {
cfg := DefaultConfig()
test.AssertTrue(t, cfg.Version == sarama.V2_0_0_0)
test.AssertTrue(t, cfg.Consumer.Return.Errors == true)
test.AssertTrue(t, cfg.Consumer.MaxProcessingTime == defaultMaxProcessingTime)
test.AssertTrue(t, cfg.Consumer.Offsets.Initial == sarama.OffsetNewest)
test.AssertTrue(t, cfg.Producer.RequiredAcks == sarama.WaitForLocal)
test.AssertTrue(t, cfg.Producer.Compression == sarama.CompressionSnappy)
test.AssertTrue(t, cfg.Producer.Flush.Frequency == defaultFlushFrequency)
test.AssertTrue(t, cfg.Producer.Flush.Bytes == defaultFlushBytes)
test.AssertTrue(t, cfg.Producer.Return.Successes == true)
test.AssertTrue(t, cfg.Producer.Return.Errors == true)
test.AssertTrue(t, cfg.Producer.Retry.Max == defaultProducerMaxRetries)
})
}

func TestConfig_ReplaceGlobalConfig(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
custom := DefaultConfig()
custom.Version = sarama.V0_8_2_0
ReplaceGlobalConfig(custom)
test.AssertEqual(t, globalConfig.Version, custom.Version)
})
t.Run("panic", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("there was no panic")
}
}()
ReplaceGlobalConfig(nil)
})
}
70 changes: 38 additions & 32 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"sync"
"time"

"github.com/lovoo/goka/kafka"
"github.com/Shopify/sarama"
"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
)

type emitter func(topic string, key string, value []byte) *Promise

// Context provides access to the processor's table and emit capabilities to
// arbitrary topics in kafka.
// Upon arrival of a message from subscribed topics, the respective
Expand Down Expand Up @@ -67,23 +68,25 @@ type Context interface {
Context() context.Context
}

type emitter func(topic string, key string, value []byte) *kafka.Promise

type cbContext struct {
ctx context.Context
graph *GroupGraph
// commit commits the message in the consumer session
commit func()

commit func()
emitter emitter
failer func(err error)
emitter emitter
asyncFailer func(err error)
syncFailer func(err error)

storage storage.Storage
pviews map[string]*partition
views map[string]*View
table *PartitionTable
// joins
pviews map[string]*PartitionTable
// lookup tables
views map[string]*View

pstats *PartitionStats
partProcStats *PartitionProcStats

msg *message
msg *sarama.ConsumerMessage
done bool
counters struct {
emits int
Expand Down Expand Up @@ -146,11 +149,7 @@ func (ctx *cbContext) emit(topic string, key string, value []byte) {
}
ctx.emitDone(err)
})

s := ctx.pstats.Output[topic]
s.Count++
s.Bytes += len(value)
ctx.pstats.Output[topic] = s
ctx.partProcStats.trackOutput(topic, len(value))
}

func (ctx *cbContext) Delete() {
Expand All @@ -161,7 +160,7 @@ func (ctx *cbContext) Delete() {

// Value returns the value of the key in the group table.
func (ctx *cbContext) Value() interface{} {
val, err := ctx.valueForKey(ctx.msg.Key)
val, err := ctx.valueForKey(ctx.Key())
if err != nil {
ctx.Fail(err)
}
Expand All @@ -170,7 +169,7 @@ func (ctx *cbContext) Value() interface{} {

// SetValue updates the value of the key in the group table.
func (ctx *cbContext) SetValue(value interface{}) {
if err := ctx.setValueForKey(ctx.msg.Key, value); err != nil {
if err := ctx.setValueForKey(ctx.Key(), value); err != nil {
ctx.Fail(err)
}
}
Expand All @@ -181,7 +180,7 @@ func (ctx *cbContext) Timestamp() time.Time {
}

func (ctx *cbContext) Key() string {
return ctx.msg.Key
return string(ctx.msg.Key)
}

func (ctx *cbContext) Topic() Stream {
Expand Down Expand Up @@ -235,11 +234,11 @@ func (ctx *cbContext) Lookup(topic Table, key string) interface{} {

// valueForKey returns the value of key in the processor state.
func (ctx *cbContext) valueForKey(key string) (interface{}, error) {
if ctx.storage == nil {
if ctx.table == nil {
return nil, fmt.Errorf("Cannot access state in stateless processor")
}

data, err := ctx.storage.Get(key)
data, err := ctx.table.Get(key)
if err != nil {
return nil, fmt.Errorf("error reading value: %v", err)
} else if data == nil {
Expand All @@ -259,7 +258,7 @@ func (ctx *cbContext) deleteKey(key string) error {
}

ctx.counters.stores++
if err := ctx.storage.Delete(key); err != nil {
if err := ctx.table.Delete(key); err != nil {
return fmt.Errorf("error deleting key (%s) from storage: %v", key, err)
}

Expand Down Expand Up @@ -287,20 +286,22 @@ func (ctx *cbContext) setValueForKey(key string, value interface{}) error {
}

ctx.counters.stores++
if err = ctx.storage.Set(key, encodedValue); err != nil {
if err = ctx.table.Set(key, encodedValue); err != nil {
return fmt.Errorf("error storing value: %v", err)
}

table := ctx.graph.GroupTable().Topic()
ctx.counters.emits++
ctx.emitter(table, key, encodedValue).Then(func(err error) {
ctx.emitter(table, key, encodedValue).ThenWithMessage(func(msg *sarama.ProducerMessage, err error) {
if err == nil && msg != nil {
err = ctx.table.storeNewestOffset(msg.Offset)
}
ctx.emitDone(err)
})

s := ctx.pstats.Output[table]
s.Count++
s.Bytes += len(encodedValue)
ctx.pstats.Output[table] = s
// for a table write we're tracking both the diskwrites and the kafka output
ctx.partProcStats.trackOutput(table, len(encodedValue))
ctx.table.trackMessageWrite(len(encodedValue))

return nil
}
Expand All @@ -327,6 +328,7 @@ func (ctx *cbContext) start() {

// calls ctx.commit once all emits have successfully finished, or fails context
// if some emit failed.
// this function must be called from a locked function.
func (ctx *cbContext) tryCommit(err error) {
if err != nil {
_ = ctx.errors.Collect(err)
Expand All @@ -339,18 +341,22 @@ func (ctx *cbContext) tryCommit(err error) {

// commit if no errors, otherwise fail context
if ctx.errors.HasErrors() {
ctx.failer(ctx.errors.NilOrError())
ctx.asyncFailer(ctx.errors.NilOrError())
} else {
ctx.commit()
}

// no further callback will be called from this context
ctx.markDone()
}

// markdone marks the context as done
func (ctx *cbContext) markDone() {
ctx.wg.Done()
}

// Fail stops execution and shuts down the processor
func (ctx *cbContext) Fail(err error) {
panic(err)
ctx.syncFailer(err)
}

func (ctx *cbContext) Context() context.Context {
Expand Down
Loading

0 comments on commit e25b9f5

Please sign in to comment.