Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#239 integrate sarama consumer group #260

Merged
merged 32 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
aa4b10f
Co-authored-by: frairon <[email protected]>
frairon Mar 19, 2020
b997692
bugfix in shutdown/rebalance: correctly closing joins
frairon Mar 19, 2020
1e92aeb
run update/request/response stats in own goroutine
frairon Mar 19, 2020
35fcae1
fix rebalancing by adding a copartitioning rebalance strategy
frairon Mar 20, 2020
3072453
updated readme for configuration, added changelog
frairon Mar 25, 2020
bf2c5b4
Merge pull request #240 from lovoo/readme-changelog
frairon Mar 30, 2020
95ad600
Open Storage in PartitionTable when performing Setup
frairon Apr 1, 2020
bf1c513
Merge pull request #242 from lovoo/open_storage_in_setup
Apr 2, 2020
e1f8fd9
return trackOutput if stats are nil
Apr 8, 2020
859992c
Merge pull request #244 from lovoo/fix-trackOutput
frairon Apr 8, 2020
38b6856
view.get fixed for uninitialized view
frairon Apr 8, 2020
ed7dcbc
Merge pull request #243 from lovoo/open_storage_in_setup
Apr 9, 2020
3ab367a
context stats tracking: use queueing mechanism to avoid race conditions
frairon Apr 8, 2020
1eaa22f
Merge pull request #245 from lovoo/bugfix-track-stats
frairon Apr 9, 2020
bc71a7b
Add simpleBackoff and add backoff options for processor and view
Apr 15, 2020
b281011
added strings to streams helper
frairon Apr 16, 2020
21558b3
Merge pull request #250 from lovoo/streams-helper
frairon Apr 16, 2020
40abaea
Merge pull request #246 from lovoo/add-retry-backoff
Apr 16, 2020
f818f57
#249 view example
frairon Apr 16, 2020
1b7c0e2
issue #249: migration guide, #241 panic documentation of context
frairon Apr 26, 2020
f7ae456
Merge pull request #252 from lovoo/249-migration-guide
frairon Apr 27, 2020
404bc24
#248 exposing partition table's connection state to view
frairon Apr 27, 2020
3a1e326
Migration: describe offsetbug
frairon May 1, 2020
40b650b
partition_table: implement autoreconnect in recovery mode
frairon May 1, 2020
bd5bf97
bugfix goroutine-leak in statsloop in partition table
frairon May 1, 2020
f47ae9c
#248: refactored state merger, bugfix race condition when shutting do…
frairon May 7, 2020
dc77c63
bugfix partition state notification
frairon Jun 14, 2020
f79a6bf
Merge pull request #256 from lovoo/248-connection-state
Jul 8, 2020
8075d3c
remove change in example, updated changelog
frairon Jul 9, 2020
e0b0b7a
fix readme example and add readme as example
frairon Jul 9, 2020
812812d
restore 1-simplest fix, remove readme-example
frairon Jul 9, 2020
e6bac0e
Merge pull request #261 from lovoo/readme-example-fix
Jul 9, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tmp*
*.*~
.tags*
vendor
290 changes: 290 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
This document sums up issues for migrating between 0.1.4 to 0.9.x

## restartable/auto reconnect view.Run()

In 0.1.4, if a view was created to be restartable, it returned from `Run` in case of errors, but allowed
to be restarted calling `Run` again. The view was still usable, even if it was not running and receiving updates.

The behavior of that option has changed in 0.9.x in a way that the `view.Run` does the reconnect internally using configurable backoff.
The Option was also renamed (the old version has been kept for compatibility reasons).

```go

// create a view
view014, _ := NewView(..., WithViewRestartable())

// reconnect logic in 0.1.4
go func(){
for running{
err:= view014.Run(context.Background())
// handle error

// sleep or simple backoff logic
time.Sleep(time.Second)
}
}()

// After migration:
// create a view
view09x, _ := NewView(..., WithViewAutoReconnect())
ctx, cancel := context.WithCancel(context.Background())

// no need for reconnect logic, it's handled by the view internally
go func(){
err:= view09x.Run(ctx)
// handle shutdown error
}()

// stop view
cancel()

```


## Offset bug in local storage
In 0.1.4 there was a bug that caused the table offset being stored in the local cache always be +1 compared the actual offset stored in kafka.
A second bug kind of evened it out so it never was an issue.

From 0.9.x, both bugs are fixed. However, if you upgrade goka and restart a processor using the same cache files that were maintained by the old version you'll see a warning like this
```
Error: local offset is higher than partition offset. topic some-topic, partition 0, hwm 1312, local offset 1314. This can have several reasons:
(1) The kafka topic storing the table is gone --> delete the local cache and restart!
(2) the processor crashed last time while writing to disk.
(3) You found a bug!
```
This is because goka sees an offset that it is not expecting.
You should see this error only once per partition and processor. The offset will be fixed automatically. If it appears on every start or regularily, it might actually a bug or some error and should be further investigated
(or reported to goka :)).
53 changes: 45 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,37 @@ You can install Goka by running the following command:

``$ go get -u github.com/lovoo/goka``

## Configuration

Goka relies on [Sarama](https://github.com/Shopify/sarama) to perform the actual communication with Kafka, which offers many configuration settings. The config is documented [here](https://godoc.org/github.com/Shopify/sarama#Config).

In most cases, you need to modify the config, e.g. to set the Kafka Version.

```
cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)
```

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the
component's constructor, e.g.

```
cfg := goka.DefaultConfig()
// modify the config with component-specific settings


// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
goka.WithConsumerGroupBuilder(
goka.ConsumerGroupBuilderWithConfig(cfg),
),
// ...
)
```

## Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.
Expand Down Expand Up @@ -67,11 +98,11 @@ package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand All @@ -83,18 +114,20 @@ var (
group goka.Group = "example-group"
)

// emits a single message and leave
// Emit messages forever every second
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
for {
time.Sleep(1 * time.Second)
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
}
fmt.Println("message emitted")
}

// process messages until ctrl-c is pressed
Expand Down Expand Up @@ -132,6 +165,8 @@ func runProcessor() {
defer close(done)
if err = p.Run(ctx); err != nil {
log.Fatalf("error running processor: %v", err)
} else {
log.Printf("Processor shutdown cleanly")
}
}()

Expand All @@ -143,10 +178,12 @@ func runProcessor() {
}

func main() {
runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
go runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
}

```
A very similar example is also in *1-simplest*. Just run `go run examples/1-simplest/main.go`.

Note that tables have to be configured in Kafka with log compaction.
For details check the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics).
Expand Down
13 changes: 13 additions & 0 deletions assignment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package goka

import (
"fmt"
)

// Assignment represents a partition:offset assignment for the current connection
type Assignment map[int32]int64

func (a *Assignment) string() string {
var am map[int32]int64 = *a
return fmt.Sprintf("Assignment %v", am)
}
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package goka

import "github.com/Shopify/sarama"

// Broker is an interface for the sarama broker
type Broker interface {
Addr() string
Connected() (bool, error)
CreateTopics(request *sarama.CreateTopicsRequest) (*sarama.CreateTopicsResponse, error)
Open(conf *sarama.Config) error
}
99 changes: 99 additions & 0 deletions builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package goka

import (
"hash"
"time"

"github.com/Shopify/sarama"
)

// ProducerBuilder create a Kafka producer.
type ProducerBuilder func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error)

// DefaultProducerBuilder creates a Kafka producer using the Sarama library.
func DefaultProducerBuilder(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config := globalConfig
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, &config)
}

// ProducerBuilderWithConfig creates a Kafka consumer using the Sarama library.
func ProducerBuilderWithConfig(config *sarama.Config) ProducerBuilder {
return func(brokers []string, clientID string, hasher func() hash.Hash32) (Producer, error) {
config.ClientID = clientID
config.Producer.Partitioner = sarama.NewCustomHashPartitioner(hasher)
return NewProducer(brokers, config)
}
}

// TopicManagerBuilder creates a TopicManager to check partition counts and
// create tables.
type TopicManagerBuilder func(brokers []string) (TopicManager, error)

// DefaultTopicManagerBuilder creates TopicManager using the Sarama library.
func DefaultTopicManagerBuilder(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, NewTopicManagerConfig())
}

// TopicManagerBuilderWithConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithConfig(config *sarama.Config, tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
return NewTopicManager(brokers, config, tmConfig)
}
}

// TopicManagerBuilderWithTopicManagerConfig creates TopicManager using the Sarama library.
func TopicManagerBuilderWithTopicManagerConfig(tmConfig *TopicManagerConfig) TopicManagerBuilder {
return func(brokers []string) (TopicManager, error) {
config := globalConfig
config.ClientID = "goka-topic-manager"
return NewTopicManager(brokers, &config, tmConfig)
}
}

// ConsumerGroupBuilder creates a `sarama.ConsumerGroup`
type ConsumerGroupBuilder func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error)

// DefaultConsumerGroupBuilder creates a Kafka consumer using the Sarama library.
func DefaultConsumerGroupBuilder(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, &config)
}

// ConsumerGroupBuilderWithConfig creates a sarama consumergroup using passed config
func ConsumerGroupBuilderWithConfig(config *sarama.Config) ConsumerGroupBuilder {
return func(brokers []string, group, clientID string) (sarama.ConsumerGroup, error) {
config.ClientID = clientID
return sarama.NewConsumerGroup(brokers, group, config)
}
}

// SaramaConsumerBuilder creates a `sarama.Consumer`
type SaramaConsumerBuilder func(brokers []string, clientID string) (sarama.Consumer, error)

// DefaultSaramaConsumerBuilder creates a Kafka consumer using the Sarama library.
func DefaultSaramaConsumerBuilder(brokers []string, clientID string) (sarama.Consumer, error) {
config := globalConfig
config.ClientID = clientID
return sarama.NewConsumer(brokers, &config)
}

// SaramaConsumerBuilderWithConfig creates a sarama consumer using passed config
func SaramaConsumerBuilderWithConfig(config *sarama.Config) SaramaConsumerBuilder {
return func(brokers []string, clientID string) (sarama.Consumer, error) {
config.ClientID = clientID
return sarama.NewConsumer(brokers, config)
}
}

// BackoffBuilder creates a backoff
type BackoffBuilder func() (Backoff, error)

// DefaultBackoffBuilder returnes a simpleBackoff with 10 second steps
func DefaultBackoffBuilder() (Backoff, error) {
return NewSimpleBackoff(time.Second * 10), nil
}
1 change: 0 additions & 1 deletion codec/codec_test.go

This file was deleted.

58 changes: 58 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package goka

import (
"time"

"github.com/Shopify/sarama"
)

var (
globalConfig = *DefaultConfig()
)

const (
// size of sarama buffer for consumer and producer
defaultChannelBufferSize = 256

// time sarama-cluster assumes the processing of an event may take
defaultMaxProcessingTime = 1 * time.Second

// producer flush configuration
defaultFlushFrequency = 100 * time.Millisecond
defaultFlushBytes = 64 * 1024
defaultProducerMaxRetries = 10
)

// DefaultConfig creates a new config used by goka per default
// Use it to modify and pass to `goka.ReplaceGlobalConifg(...)` to modify
// goka's global config
func DefaultConfig() *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0

// consumer configuration
config.Consumer.Return.Errors = true
config.Consumer.MaxProcessingTime = defaultMaxProcessingTime
// this configures the initial offset for streams. Tables are always
// consumed from OffsetOldest.
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Consumer.Group.Rebalance.Strategy = CopartitioningStrategy
// producer configuration
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Flush.Frequency = defaultFlushFrequency
config.Producer.Flush.Bytes = defaultFlushBytes
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.Retry.Max = defaultProducerMaxRetries
return config
}

// ReplaceGlobalConfig registeres a standard config used during building if no
// other config is specified
func ReplaceGlobalConfig(config *sarama.Config) {
if config == nil {
panic("nil config registered as global config")
}
globalConfig = *config
}
42 changes: 42 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package goka

import (
"testing"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/internal/test"
)

func TestConfig_DefaultConfig(t *testing.T) {
t.Run("equal", func(t *testing.T) {
cfg := DefaultConfig()
test.AssertTrue(t, cfg.Version == sarama.V2_0_0_0)
test.AssertTrue(t, cfg.Consumer.Return.Errors == true)
test.AssertTrue(t, cfg.Consumer.MaxProcessingTime == defaultMaxProcessingTime)
test.AssertTrue(t, cfg.Consumer.Offsets.Initial == sarama.OffsetNewest)
test.AssertTrue(t, cfg.Producer.RequiredAcks == sarama.WaitForLocal)
test.AssertTrue(t, cfg.Producer.Compression == sarama.CompressionSnappy)
test.AssertTrue(t, cfg.Producer.Flush.Frequency == defaultFlushFrequency)
test.AssertTrue(t, cfg.Producer.Flush.Bytes == defaultFlushBytes)
test.AssertTrue(t, cfg.Producer.Return.Successes == true)
test.AssertTrue(t, cfg.Producer.Return.Errors == true)
test.AssertTrue(t, cfg.Producer.Retry.Max == defaultProducerMaxRetries)
})
}

func TestConfig_ReplaceGlobalConfig(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
custom := DefaultConfig()
custom.Version = sarama.V0_8_2_0
ReplaceGlobalConfig(custom)
test.AssertEqual(t, globalConfig.Version, custom.Version)
})
t.Run("panic", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Fatal("there was no panic")
}
}()
ReplaceGlobalConfig(nil)
})
}
Loading