Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix examples initialization after auto creation of topics is disabled #384

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"

Expand All @@ -16,6 +17,8 @@ var (
brokers = []string{"127.0.0.1:9092"}
topic goka.Stream = "user-click"
group goka.Group = "mini-group"

tmc *goka.TopicManagerConfig
)

// A user is the object that is stored in the processor's group table
Expand All @@ -28,6 +31,12 @@ type user struct {
// group table.
type userCodec struct{}

func init() {
tmc = goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
}

// Encodes a user into []byte
func (jc *userCodec) Encode(value interface{}) ([]byte, error) {
if _, isUser := value.(*user); !isUser {
Expand Down Expand Up @@ -82,14 +91,11 @@ func process(ctx goka.Context, msg interface{}) {
fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}

func runProcessor() {
func runProcessor(initialized chan struct{}) {
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), process),
goka.Persist(new(userCodec)),
)
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
p, err := goka.NewProcessor(brokers,
g,
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)),
Expand All @@ -99,10 +105,14 @@ func runProcessor() {
panic(err)
}

close(initialized)

p.Run(context.Background())
}

func runView() {
func runView(initialized chan struct{}) {
<-initialized

view, err := goka.NewView(brokers,
goka.GroupTable(group),
new(userCodec),
Expand All @@ -124,7 +134,20 @@ func runView() {
}

func main() {
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}

// When this example is run the first time, wait for creation of all internal topics (this is done
// by goka.NewProcessor)
initialized := make(chan struct{})

go runEmitter()
go runProcessor()
runView()
go runProcessor(initialized)
runView(initialized)
}
2 changes: 1 addition & 1 deletion examples/3-messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func collect(ctx goka.Context, msg interface{}) {
ml = append(ml, *m)

if len(ml) > maxMessages {
ml = ml[len(ml)-maxMessages-1:]
ml = ml[len(ml)-maxMessages:]
}
ctx.SetValue(ml)
}
Expand Down
14 changes: 13 additions & 1 deletion examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package blocker
import (
"context"
"encoding/json"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
"sync"

"github.com/lovoo/goka"
)
Expand Down Expand Up @@ -58,16 +60,26 @@ func block(ctx goka.Context, msg interface{}) {
ctx.SetValue(s)
}

func Run(ctx context.Context, brokers []string) func() error {
func Run(ctx context.Context, brokers []string, initialized *sync.WaitGroup) func() error {
// to prevent race conditions we ensure that topics exist before the execution of the Goroutine
topicinit.EnsureStreamExists(string(Stream), brokers)

return func() error {
g := goka.DefineGroup(group,
goka.Input(Stream, new(BlockEventCodec), block),
goka.Persist(new(BlockValueCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
// we have to signal done here so other Goroutines of the errgroup
// can continue execution
initialized.Done()
return err
}

initialized.Done()
initialized.Wait()

return p.Run(ctx)
}
}
29 changes: 24 additions & 5 deletions examples/3-messaging/cmd/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"os/signal"
"sync"
"syscall"

"github.com/lovoo/goka/examples/3-messaging/blocker"
Expand All @@ -31,25 +32,43 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := errgroup.WithContext(ctx)

// When this example is run the first time, wait for creation of all internal topics
initialized := sync.WaitGroup{}
if *runCollector {
initialized.Add(1)
}
if *runFilter {
initialized.Add(1)
}
if *runBlocker {
initialized.Add(1)
}
if *runDetector {
initialized.Add(1)
}
if *runTranslator {
initialized.Add(1)
}

if *runCollector {
log.Println("starting collector")
grp.Go(collector.Run(ctx, brokers))
grp.Go(collector.Run(ctx, brokers, &initialized))
}
if *runFilter {
log.Println("starting filter")
grp.Go(filter.Run(ctx, brokers))
grp.Go(filter.Run(ctx, brokers, &initialized))
}
if *runBlocker {
log.Println("starting blocker")
grp.Go(blocker.Run(ctx, brokers))
grp.Go(blocker.Run(ctx, brokers, &initialized))
}
if *runDetector {
log.Println("starting detector")
grp.Go(detector.Run(ctx, brokers))
grp.Go(detector.Run(ctx, brokers, &initialized))
}
if *runTranslator {
log.Println("starting translator")
grp.Go(translator.Run(ctx, brokers))
grp.Go(translator.Run(ctx, brokers, &initialized))
}

// Wait for SIGINT/SIGTERM
Expand Down
15 changes: 13 additions & 2 deletions examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package collector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
"sync"
)

const maxMessages = 5
Expand Down Expand Up @@ -42,16 +43,26 @@ func collect(ctx goka.Context, msg interface{}) {
ctx.SetValue(ml)
}

func Run(ctx context.Context, brokers []string) func() error {
func Run(ctx context.Context, brokers []string, initialized *sync.WaitGroup) func() error {
// to prevent race conditions we ensure that topics exist before the execution of the Goroutine
topicinit.EnsureStreamExists(string(messaging.ReceivedStream), brokers)

return func() error {
g := goka.DefineGroup(group,
goka.Input(messaging.ReceivedStream, new(messaging.MessageCodec), collect),
goka.Persist(new(MessageListCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
// we have to signal done here so other Goroutines of the errgroup
// can continue execution
initialized.Done()
return err
}

initialized.Done()
initialized.Wait()

return p.Run(ctx)
}
}
13 changes: 12 additions & 1 deletion examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package detector
import (
"context"
"encoding/json"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
"sync"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
Expand Down Expand Up @@ -49,7 +51,10 @@ func detectSpammer(ctx goka.Context, c *Counters) bool {
return total >= minMessages && rate >= maxRate
}

func Run(ctx context.Context, brokers []string) func() error {
func Run(ctx context.Context, brokers []string, initialized *sync.WaitGroup) func() error {
// to prevent race conditions we ensure that topics exist before the execution of the Goroutine
topicinit.EnsureStreamExists(string(messaging.SentStream), brokers)

return func() error {
g := goka.DefineGroup(group,
goka.Input(messaging.SentStream, new(messaging.MessageCodec), func(ctx goka.Context, msg interface{}) {
Expand Down Expand Up @@ -81,9 +86,15 @@ func Run(ctx context.Context, brokers []string) func() error {
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
// we have to signal done here so other Goroutines of the errgroup
// can continue execution
initialized.Done()
return err
}

initialized.Done()
initialized.Wait()

return p.Run(ctx)
}
}
20 changes: 19 additions & 1 deletion examples/3-messaging/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package filter

import (
"context"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
"strings"
"sync"

"github.com/lovoo/goka"
messaging "github.com/lovoo/goka/examples/3-messaging"
Expand Down Expand Up @@ -41,7 +43,16 @@ func translate(ctx goka.Context, m *messaging.Message) *messaging.Message {
}
}

func Run(ctx context.Context, brokers []string) func() error {
func Run(ctx context.Context, brokers []string, initialized *sync.WaitGroup) func() error {
// to prevent race conditions we ensure that topics exist before the execution of the Goroutine
topicinit.EnsureStreamExists(string(messaging.SentStream), brokers)

// We refer to these tables, ensure that they exist initially also in the
// case if the translator or blocker processors are not started
for _, topicName := range []string{string(translator.Table), string(blocker.Table)} {
topicinit.EnsureTableExists(topicName, brokers)
}

return func() error {
g := goka.DefineGroup(group,
goka.Input(messaging.SentStream, new(messaging.MessageCodec), filter),
Expand All @@ -51,8 +62,15 @@ func Run(ctx context.Context, brokers []string) func() error {
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
// we have to signal done here so other Goroutines of the errgroup
// can continue execution
initialized.Done()
return err
}

initialized.Done()
initialized.Wait()

return p.Run(ctx)
}
}
33 changes: 33 additions & 0 deletions examples/3-messaging/topicinit/topicinit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package topicinit

import (
"github.com/lovoo/goka"
"log"
)

// EnsureStreamExists is a convenience wrapper for TopicManager.EnsureStreamExists
func EnsureStreamExists(topic string, brokers []string) {
tm := createTopicManager(brokers)
norbertklawikowski marked this conversation as resolved.
Show resolved Hide resolved
err := tm.EnsureStreamExists(topic, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}
}

// EnsureTableExists is a convenience wrapper for TopicManager.EnsureTableExists
func EnsureTableExists(topic string, brokers []string) {
tm := createTopicManager(brokers)
err := tm.EnsureTableExists(topic, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}
}

func createTopicManager(brokers []string) goka.TopicManager {
tmc := goka.NewTopicManagerConfig()
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
return tm
}
15 changes: 14 additions & 1 deletion examples/3-messaging/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package translator

import (
"context"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
"sync"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
Expand All @@ -21,16 +23,27 @@ func translate(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg.(string))
}

func Run(ctx context.Context, brokers []string) func() error {
func Run(ctx context.Context, brokers []string, initialized *sync.WaitGroup) func() error {
// to prevent race conditions we ensure that topics exist before the execution of the Goroutine
topicinit.EnsureStreamExists(string(group), brokers)
topicinit.EnsureStreamExists(string(Stream), brokers)

return func() error {
g := goka.DefineGroup(group,
goka.Input(Stream, new(ValueCodec), translate),
goka.Persist(new(ValueCodec)),
)
p, err := goka.NewProcessor(brokers, g)
if err != nil {
// we have to signal done here so other Goroutines of the errgroup
// can continue execution
initialized.Done()
return err
}

initialized.Done()
initialized.Wait()

return p.Run(ctx)
}
}
Loading