Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readme example #261

Merged
merged 2 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand All @@ -114,18 +114,20 @@ var (
group goka.Group = "example-group"
)

// emits a single message and leave
// Emit messages forever every second
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
log.Fatalf("error creating emitter: %v", err)
}
defer emitter.Finish()
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
for {
time.Sleep(1 * time.Second)
err = emitter.EmitSync("some-key", "some-value")
if err != nil {
log.Fatalf("error emitting message: %v", err)
}
}
fmt.Println("message emitted")
}

// process messages until ctrl-c is pressed
Expand Down Expand Up @@ -163,6 +165,8 @@ func runProcessor() {
defer close(done)
if err = p.Run(ctx); err != nil {
log.Fatalf("error running processor: %v", err)
} else {
log.Printf("Processor shutdown cleanly")
}
}()

Expand All @@ -174,10 +178,12 @@ func runProcessor() {
}

func main() {
runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
go runEmitter() // emits one message and stops
runProcessor() // press ctrl-c to stop
}

```
A very similar example is also in *1-simplest*. Just run `go run examples/1-simplest/main.go`.

Note that tables have to be configured in Kafka with log compaction.
For details check the [Wiki](https://github.com/lovoo/goka/wiki/Tips#configuring-log-compaction-for-table-topics).
Expand Down
8 changes: 8 additions & 0 deletions examples/1-simplest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"syscall"

"github.com/Shopify/sarama"
"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)
Expand Down Expand Up @@ -98,6 +99,13 @@ func runProcessor() {

func main() {
config := goka.DefaultConfig()
// since the emitter only emits one message, we need to tell the processor
// to read from the beginning
// As the processor is slower to start than the emitter, it would not consume the first
// message otherwise.
// In production systems however, check whether you really want to read the whole topic on first start, which
// can be a lot of messages.
config.Consumer.Offsets.Initial = sarama.OffsetOldest
goka.ReplaceGlobalConfig(config)

tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
Expand Down