diff --git a/README.md b/README.md index 5fd130be..a3f4bb1a 100644 --- a/README.md +++ b/README.md @@ -98,11 +98,11 @@ package main import ( "context" - "fmt" "log" "os" "os/signal" "syscall" + "time" "github.com/lovoo/goka" "github.com/lovoo/goka/codec" @@ -114,18 +114,20 @@ var ( group goka.Group = "example-group" ) -// emits a single message and leave +// Emit messages forever every second func runEmitter() { emitter, err := goka.NewEmitter(brokers, topic, new(codec.String)) if err != nil { log.Fatalf("error creating emitter: %v", err) } defer emitter.Finish() - err = emitter.EmitSync("some-key", "some-value") - if err != nil { - log.Fatalf("error emitting message: %v", err) + for { + time.Sleep(1 * time.Second) + err = emitter.EmitSync("some-key", "some-value") + if err != nil { + log.Fatalf("error emitting message: %v", err) + } } - fmt.Println("message emitted") } // process messages until ctrl-c is pressed @@ -163,6 +165,8 @@ func runProcessor() { defer close(done) if err = p.Run(ctx); err != nil { log.Fatalf("error running processor: %v", err) + } else { + log.Printf("Processor shutdown cleanly") } }() @@ -174,10 +178,12 @@ func runProcessor() { } func main() { - runEmitter() // emits one message and stops - runProcessor() // press ctrl-c to stop + go runEmitter() // emits one message and stops + runProcessor() // press ctrl-c to stop } + ``` +A very similar example is also in *1-simplest*. Just run `go run examples/1-simplest/main.go`. Note that tables have to be configured in Kafka with log compaction. For details check the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics). diff --git a/examples/1-simplest/main.go b/examples/1-simplest/main.go index 211fa52e..c41afeac 100644 --- a/examples/1-simplest/main.go +++ b/examples/1-simplest/main.go @@ -7,6 +7,7 @@ import ( "os/signal" "syscall" + "github.com/Shopify/sarama" "github.com/lovoo/goka" "github.com/lovoo/goka/codec" ) @@ -98,6 +99,13 @@ func runProcessor() { func main() { config := goka.DefaultConfig() + // since the emitter only emits one message, we need to tell the processor + // to read from the beginning + // As the processor is slower to start than the emitter, it would not consume the first + // message otherwise. + // In production systems however, check whether you really want to read the whole topic on first start, which + // can be a lot of messages. + config.Consumer.Offsets.Initial = sarama.OffsetOldest goka.ReplaceGlobalConfig(config) tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)