Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Oct 28, 2021
1 parent 9ff2ce3 commit 9c98886
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 62 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


test:
goka test -race ./...

test-systemtest:
GOKA_SYSTEMTEST=y go test -race github.com/lovoo/goka/systemtest

test-all: test test-system
8 changes: 3 additions & 5 deletions systemtest/emitter_offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ func TestEmitterOffset(t *testing.T) {

var topic goka.Stream = goka.Stream(fmt.Sprintf("%s-%d", "goka-systemtest-emitter-offset", time.Now().Unix()))

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}
brokers := initSystemTest(t)

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)
tm.EnsureStreamExists(string(topic), 1)

var lastOffset int64

emitter, err := goka.NewEmitter([]string{*broker}, topic, new(codec.Int64))
emitter, err := goka.NewEmitter(brokers, topic, new(codec.Int64))
if err != nil {
t.Fatalf("error creating emitter: %v", err)
}
Expand Down
19 changes: 19 additions & 0 deletions systemtest/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package systemtest

import (
"os"
"strings"
"testing"
)

// Checks if the env-variable to activate system test is set and returns a broker
// If system tests are not activated, will skip the test
func initSystemTest(t *testing.T) []string {
if _, isIntegration := os.LookupEnv("GOKA_SYSTEMTEST"); !isIntegration {
t.Skip("*** skip integration test ***")
}
if brokers, ok := os.LookupEnv("GOKA_SYSTEMTEST_BROKERS"); ok {
return strings.Split(brokers, ",")
}
return []string{"localhost:9092"}
}
75 changes: 33 additions & 42 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ func TestHotStandby(t *testing.T) {
joinTable goka.Table = goka.Table(fmt.Sprintf("%s-%d", "goka-systemtest-hotstandby-join", time.Now().Unix()))
)

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}
brokers := initSystemTest(t)

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 2)
Expand All @@ -49,7 +47,7 @@ func TestHotStandby(t *testing.T) {

proc1Storages := newStorageTracker()

proc1, err := goka.NewProcessor([]string{*broker},
proc1, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
Expand All @@ -62,7 +60,7 @@ func TestHotStandby(t *testing.T) {

proc2Storages := newStorageTracker()

proc2, err := goka.NewProcessor([]string{*broker},
proc2, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
Expand Down Expand Up @@ -94,15 +92,15 @@ func TestHotStandby(t *testing.T) {
test.AssertEqual(t, len(proc1Storages.storages), 2)
test.AssertEqual(t, len(proc2Storages.storages), 4)

inputEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.String))
inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
test.AssertNil(t, err)
defer inputEmitter.Finish()
inputEmitter.EmitSync("key1", "message1")
inputEmitter.EmitSync("key2", "message2")

// emit something into the join table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
joinEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(joinTable), new(codec.String))
joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String))
test.AssertNil(t, err)
defer joinEmitter.Finish()
joinEmitter.EmitSync("key1", "joinval1")
Expand Down Expand Up @@ -162,22 +160,20 @@ func TestHotStandby(t *testing.T) {
// Test makes sure that still both processors recover the views/tables
func TestRecoverAhead(t *testing.T) {

brokers := initSystemTest(t)

var (
group goka.Group = "goka-systemtest-recoverahead"
inputStream string = string(group) + "-input"
table = string(goka.GroupTable(group))
joinTable goka.Table = "goka-systemtest-recoverahead-join"
)

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 1)
Expand All @@ -187,7 +183,7 @@ func TestRecoverAhead(t *testing.T) {

proc1Storages := newStorageTracker()

proc1, err := goka.NewProcessor([]string{*broker},
proc1, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
Expand All @@ -201,7 +197,7 @@ func TestRecoverAhead(t *testing.T) {

proc2Storages := newStorageTracker()

proc2, err := goka.NewProcessor([]string{*broker},
proc2, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
Expand All @@ -214,14 +210,14 @@ func TestRecoverAhead(t *testing.T) {

// emit something into the join table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
joinEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(joinTable), new(codec.String))
joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String))
test.AssertNil(t, err)
defer joinEmitter.Finish()
joinEmitter.EmitSync("key1", "joinval1")

// emit something into the join table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
tableEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(table), new(codec.String))
tableEmitter, err := goka.NewEmitter(brokers, goka.Stream(table), new(codec.String))
test.AssertNil(t, err)
defer tableEmitter.Finish()
tableEmitter.EmitSync("key1", "tableval1")
Expand Down Expand Up @@ -293,9 +289,8 @@ func TestRecoverAhead(t *testing.T) {
// TestRebalance runs some processors to test rebalance. It's merely a
// runs-without-errors test, not a real functional test.
func TestRebalance(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

brokers := initSystemTest(t)

var (
group goka.Group = "goka-systemtest-rebalance"
Expand All @@ -309,13 +304,13 @@ func TestRebalance(t *testing.T) {
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 20)
test.AssertNil(t, err)

em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.String))
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
test.AssertNil(t, err)

go func() {
Expand All @@ -329,7 +324,7 @@ func TestRebalance(t *testing.T) {
}()

createProc := func(id int) *goka.Processor {
proc, err := goka.NewProcessor([]string{*broker},
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
Expand Down Expand Up @@ -364,9 +359,8 @@ func TestRebalance(t *testing.T) {
}

func TestCallbackFail(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

brokers := initSystemTest(t)

var (
group goka.Group = goka.Group(fmt.Sprintf("goka-systemtest-callback-fail-%d", time.Now().Unix()))
Expand All @@ -378,16 +372,16 @@ func TestCallbackFail(t *testing.T) {
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 1)
test.AssertNil(t, err)

em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.Int64))
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.Int64))
test.AssertNil(t, err)

proc, err := goka.NewProcessor([]string{*broker},
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) {
Expand Down Expand Up @@ -430,9 +424,8 @@ func TestCallbackFail(t *testing.T) {
}

func TestProcessorSlowStuck(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

brokers := initSystemTest(t)

var (
group goka.Group = "goka-systemtest-slow-callback-fail"
Expand All @@ -443,16 +436,16 @@ func TestProcessorSlowStuck(t *testing.T) {
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 2)
test.AssertNil(t, err)

em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.Int64))
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.Int64))
test.AssertNil(t, err)

proc, err := goka.NewProcessor([]string{*broker},
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) {
Expand Down Expand Up @@ -525,18 +518,16 @@ func TestProcessorSlowStuck(t *testing.T) {
// ok github.com/lovoo/goka/systemtest 41.716s
func TestMessageCommit(t *testing.T) {

brokers := initSystemTest(t)

var (
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-message-commit", time.Now().Unix()))
inputStream goka.Stream = goka.Stream(group) + "-input"
numMessages = 10
)

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

// New Emitter that will in total send 10 messages
emitter, err := goka.NewEmitter([]string{*broker}, inputStream, new(codec.Int64))
emitter, err := goka.NewEmitter(brokers, inputStream, new(codec.Int64))
test.AssertNil(t, err)

// some boiler plate code to create the topics in kafka using
Expand All @@ -550,7 +541,7 @@ func TestMessageCommit(t *testing.T) {
goka.ReplaceGlobalConfig(cfg)

tmBuilder := goka.TopicManagerBuilderWithConfig(cfg, tmc)
tm, err := tmBuilder([]string{*broker})
tm, err := tmBuilder(brokers)
test.AssertNil(t, err)

tm.EnsureStreamExists(string(inputStream), 10)
Expand All @@ -574,7 +565,7 @@ func TestMessageCommit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

proc, err := goka.NewProcessor([]string{*broker}, goka.DefineGroup(group,
proc, err := goka.NewProcessor(brokers, goka.DefineGroup(group,
goka.Input(inputStream, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
log.Printf("consuming message")
if val := ctx.Value(); val == nil {
Expand Down
Loading

0 comments on commit 9c98886

Please sign in to comment.