Skip to content

Commit

Permalink
Merge pull request #384 from norbertklawikowski/fix-examples-initiali…
Browse files Browse the repository at this point in the history
…zation

Fix examples initialization after auto creation of topics is disabled
  • Loading branch information
norbertklawikowski authored Jun 13, 2022
2 parents 2ff0aea + 9074262 commit 443f482
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 23 deletions.
1 change: 1 addition & 0 deletions examples/1-simplest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func main() {
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
Expand Down
1 change: 1 addition & 0 deletions examples/10-visit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func main() {
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Fatalf("Error creating kafka topic %s: %v", topic, err)
Expand Down
38 changes: 31 additions & 7 deletions examples/2-clicks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"

Expand All @@ -16,6 +17,8 @@ var (
brokers = []string{"127.0.0.1:9092"}
topic goka.Stream = "user-click"
group goka.Group = "mini-group"

tmc *goka.TopicManagerConfig
)

// A user is the object that is stored in the processor's group table
Expand All @@ -28,6 +31,12 @@ type user struct {
// group table.
type userCodec struct{}

func init() {
tmc = goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
}

// Encodes a user into []byte
func (jc *userCodec) Encode(value interface{}) ([]byte, error) {
if _, isUser := value.(*user); !isUser {
Expand Down Expand Up @@ -82,14 +91,11 @@ func process(ctx goka.Context, msg interface{}) {
fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}

func runProcessor() {
func runProcessor(initialized chan struct{}) {
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), process),
goka.Persist(new(userCodec)),
)
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
p, err := goka.NewProcessor(brokers,
g,
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)),
Expand All @@ -99,10 +105,14 @@ func runProcessor() {
panic(err)
}

close(initialized)

p.Run(context.Background())
}

func runView() {
func runView(initialized chan struct{}) {
<-initialized

view, err := goka.NewView(brokers,
goka.GroupTable(group),
new(userCodec),
Expand All @@ -124,7 +134,21 @@ func runView() {
}

func main() {
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
err = tm.EnsureStreamExists(string(topic), 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}

// When this example is run the first time, wait for creation of all internal topics (this is done
// by goka.NewProcessor)
initialized := make(chan struct{})

go runEmitter()
go runProcessor()
runView()
go runProcessor(initialized)
runView(initialized)
}
2 changes: 1 addition & 1 deletion examples/3-messaging/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func collect(ctx goka.Context, msg interface{}) {
ml = append(ml, *m)

if len(ml) > maxMessages {
ml = ml[len(ml)-maxMessages-1:]
ml = ml[len(ml)-maxMessages:]
}
ctx.SetValue(ml)
}
Expand Down
7 changes: 6 additions & 1 deletion examples/3-messaging/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package blocker
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

var (
Expand Down Expand Up @@ -58,6 +58,10 @@ func block(ctx goka.Context, msg interface{}) {
ctx.SetValue(s)
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(Stream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand All @@ -68,6 +72,7 @@ func Run(ctx context.Context, brokers []string) func() error {
if err != nil {
return err
}

return p.Run(ctx)
}
}
17 changes: 17 additions & 0 deletions examples/3-messaging/cmd/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
grp, ctx := errgroup.WithContext(ctx)

// Create topics if they do not already exist
if *runCollector {
collector.PrepareTopics(brokers)
}
if *runFilter {
filter.PrepareTopics(brokers)
}
if *runBlocker {
blocker.PrepareTopics(brokers)
}
if *runDetector {
detector.PrepareTopics(brokers)
}
if *runTranslator {
translator.PrepareTopics(brokers)
}

if *runCollector {
log.Println("starting collector")
grp.Go(collector.Run(ctx, brokers))
Expand Down
6 changes: 5 additions & 1 deletion examples/3-messaging/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package collector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

const maxMessages = 5
Expand Down Expand Up @@ -42,6 +42,10 @@ func collect(ctx goka.Context, msg interface{}) {
ctx.SetValue(ml)
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(messaging.ReceivedStream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand Down
6 changes: 5 additions & 1 deletion examples/3-messaging/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package detector
import (
"context"
"encoding/json"

"github.com/lovoo/goka"
"github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

const (
Expand Down Expand Up @@ -49,6 +49,10 @@ func detectSpammer(ctx goka.Context, c *Counters) bool {
return total >= minMessages && rate >= maxRate
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(messaging.SentStream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand Down
15 changes: 13 additions & 2 deletions examples/3-messaging/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package filter

import (
"context"
"strings"

"github.com/lovoo/goka"
messaging "github.com/lovoo/goka/examples/3-messaging"
"github.com/lovoo/goka/examples/3-messaging/blocker"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
"github.com/lovoo/goka/examples/3-messaging/translator"
"strings"
)

var (
Expand Down Expand Up @@ -41,6 +41,16 @@ func translate(ctx goka.Context, m *messaging.Message) *messaging.Message {
}
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(messaging.SentStream), brokers)

// We refer to these tables, ensure that they exist initially also in the
// case that the translator or blocker processors are not started
for _, topicName := range []string{string(translator.Table), string(blocker.Table)} {
topicinit.EnsureTableExists(topicName, brokers)
}
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand All @@ -53,6 +63,7 @@ func Run(ctx context.Context, brokers []string) func() error {
if err != nil {
return err
}

return p.Run(ctx)
}
}
35 changes: 35 additions & 0 deletions examples/3-messaging/topicinit/topicinit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package topicinit

import (
"github.com/lovoo/goka"
"log"
)

// EnsureStreamExists is a convenience wrapper for TopicManager.EnsureStreamExists
func EnsureStreamExists(topic string, brokers []string) {
tm := createTopicManager(brokers)
defer tm.Close()
err := tm.EnsureStreamExists(topic, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}
}

// EnsureTableExists is a convenience wrapper for TopicManager.EnsureTableExists
func EnsureTableExists(topic string, brokers []string) {
tm := createTopicManager(brokers)
defer tm.Close()
err := tm.EnsureTableExists(topic, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topic, err)
}
}

func createTopicManager(brokers []string) goka.TopicManager {
tmc := goka.NewTopicManagerConfig()
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
return tm
}
7 changes: 6 additions & 1 deletion examples/3-messaging/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package translator

import (
"context"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/examples/3-messaging/topicinit"
)

var (
Expand All @@ -21,6 +21,10 @@ func translate(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg.(string))
}

func PrepareTopics(brokers []string) {
topicinit.EnsureStreamExists(string(Stream), brokers)
}

func Run(ctx context.Context, brokers []string) func() error {
return func() error {
g := goka.DefineGroup(group,
Expand All @@ -31,6 +35,7 @@ func Run(ctx context.Context, brokers []string) func() error {
if err != nil {
return err
}

return p.Run(ctx)
}
}
40 changes: 36 additions & 4 deletions examples/5-multiple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,36 @@ func process(ctx goka.Context, msg interface{}) {
ctx.SetValue(u)
}

func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Server) error {
func runProcessor(ctx context.Context,
monitor *monitor.Server,
query *query.Server,
groupInitialized chan struct{}) error {

tmc := goka.NewTopicManagerConfig()
tm, err := goka.NewTopicManager(brokers, goka.DefaultConfig(), tmc)
if err != nil {
log.Fatalf("Error creating topic manager: %v", err)
}
defer tm.Close()
for _, topicName := range []string{string(inputA), string(inputB)} {
err = tm.EnsureStreamExists(topicName, 8)
if err != nil {
log.Printf("Error creating kafka topic %s: %v", topicName, err)
}
}

p, err := goka.NewProcessor(brokers, goka.DefineGroup(group,
goka.Input(inputA, new(codec.String), process),
goka.Input(inputB, new(codec.String), process),
goka.Persist(new(userCodec)),
),
goka.WithStorageBuilder(randomStorageBuilder("proc")),
)
if err != nil {
return err
}

close(groupInitialized)

// attach the processor to the monitor
monitor.AttachProcessor(p)
Expand All @@ -135,7 +157,14 @@ func runProcessor(ctx context.Context, monitor *monitor.Server, query *query.Ser
return err
}

func runView(ctx context.Context, errg *multierr.ErrGroup, root *mux.Router, monitor *monitor.Server) error {
func runView(ctx context.Context,
errg *multierr.ErrGroup,
root *mux.Router,
monitor *monitor.Server,
groupInitialized chan struct{}) error {

<-groupInitialized

view, err := goka.NewView(brokers,
goka.GroupTable(group),
new(userCodec),
Expand Down Expand Up @@ -220,16 +249,19 @@ func main() {
cancel()
}()

// runView uses the group table, which first has to be initialized by runProcessor
groupInitialized := make(chan struct{})

errg, ctx := multierr.NewErrGroup(ctx)
errg.Go(func() error {
defer log.Printf("emitter done")
return runEmitter(ctx)
})
errg.Go(func() error {
defer log.Printf("processor done")
return runProcessor(ctx, monitorServer, queryServer)
return runProcessor(ctx, monitorServer, queryServer, groupInitialized)
})
if err := runView(ctx, errg, root, monitorServer); err != nil {
if err := runView(ctx, errg, root, monitorServer, groupInitialized); err != nil {
log.Printf("Error running view, will shutdown: %v", err)
cancel()
}
Expand Down
Loading

0 comments on commit 443f482

Please sign in to comment.