diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..e65af0c9 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ + + +test: + goka test -race ./... + +test-systemtest: + GOKA_SYSTEMTEST=y go test -race github.com/lovoo/goka/systemtest + +test-all: test test-system \ No newline at end of file diff --git a/systemtest/emitter_offset_test.go b/systemtest/emitter_offset_test.go index 0b2919f6..44e41f55 100644 --- a/systemtest/emitter_offset_test.go +++ b/systemtest/emitter_offset_test.go @@ -21,20 +21,18 @@ func TestEmitterOffset(t *testing.T) { var topic goka.Stream = goka.Stream(fmt.Sprintf("%s-%d", "goka-systemtest-emitter-offset", time.Now().Unix())) - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } + brokers := initSystemTest(t) tmc := goka.NewTopicManagerConfig() tmc.Table.Replication = 1 cfg := goka.DefaultConfig() - tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) test.AssertNil(t, err) tm.EnsureStreamExists(string(topic), 1) var lastOffset int64 - emitter, err := goka.NewEmitter([]string{*broker}, topic, new(codec.Int64)) + emitter, err := goka.NewEmitter(brokers, topic, new(codec.Int64)) if err != nil { t.Fatalf("error creating emitter: %v", err) } diff --git a/systemtest/env.go b/systemtest/env.go new file mode 100644 index 00000000..82d24e11 --- /dev/null +++ b/systemtest/env.go @@ -0,0 +1,19 @@ +package systemtest + +import ( + "os" + "strings" + "testing" +) + +// Checks if the env-variable to activate system test is set and returns a broker +// If system tests are not activated, will skip the test +func initSystemTest(t *testing.T) []string { + if _, isIntegration := os.LookupEnv("GOKA_SYSTEMTEST"); !isIntegration { + t.Skip("*** skip integration test ***") + } + if brokers, ok := os.LookupEnv("GOKA_SYSTEMTEST_BROKERS"); ok { + return strings.Split(brokers, ",") + } + return []string{"localhost:9092"} +} diff --git a/systemtest/processor_test.go b/systemtest/processor_test.go index 5bef6d31..f7dca834 100644 --- a/systemtest/processor_test.go +++ b/systemtest/processor_test.go @@ -30,14 +30,12 @@ func TestHotStandby(t *testing.T) { joinTable goka.Table = goka.Table(fmt.Sprintf("%s-%d", "goka-systemtest-hotstandby-join", time.Now().Unix())) ) - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } + brokers := initSystemTest(t) tmc := goka.NewTopicManagerConfig() tmc.Table.Replication = 1 cfg := goka.DefaultConfig() - tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) test.AssertNil(t, err) err = tm.EnsureStreamExists(inputStream, 2) @@ -49,7 +47,7 @@ func TestHotStandby(t *testing.T) { proc1Storages := newStorageTracker() - proc1, err := goka.NewProcessor([]string{*broker}, + proc1, err := goka.NewProcessor(brokers, goka.DefineGroup( group, goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }), @@ -62,7 +60,7 @@ func TestHotStandby(t *testing.T) { proc2Storages := newStorageTracker() - proc2, err := goka.NewProcessor([]string{*broker}, + proc2, err := goka.NewProcessor(brokers, goka.DefineGroup( group, goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }), @@ -94,7 +92,7 @@ func TestHotStandby(t *testing.T) { test.AssertEqual(t, len(proc1Storages.storages), 2) test.AssertEqual(t, len(proc2Storages.storages), 4) - inputEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.String)) + inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String)) test.AssertNil(t, err) defer inputEmitter.Finish() inputEmitter.EmitSync("key1", "message1") @@ -102,7 +100,7 @@ func TestHotStandby(t *testing.T) { // emit something into the join table (like simulating a processor ctx.SetValue()). // Our test processors should update their value in the join-table - joinEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(joinTable), new(codec.String)) + joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String)) test.AssertNil(t, err) defer joinEmitter.Finish() joinEmitter.EmitSync("key1", "joinval1") @@ -162,6 +160,8 @@ func TestHotStandby(t *testing.T) { // Test makes sure that still both processors recover the views/tables func TestRecoverAhead(t *testing.T) { + brokers := initSystemTest(t) + var ( group goka.Group = "goka-systemtest-recoverahead" inputStream string = string(group) + "-input" @@ -169,15 +169,11 @@ func TestRecoverAhead(t *testing.T) { joinTable goka.Table = "goka-systemtest-recoverahead-join" ) - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } - tmc := goka.NewTopicManagerConfig() tmc.Table.Replication = 1 tmc.Stream.Replication = 1 cfg := goka.DefaultConfig() - tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) test.AssertNil(t, err) err = tm.EnsureStreamExists(inputStream, 1) @@ -187,7 +183,7 @@ func TestRecoverAhead(t *testing.T) { proc1Storages := newStorageTracker() - proc1, err := goka.NewProcessor([]string{*broker}, + proc1, err := goka.NewProcessor(brokers, goka.DefineGroup( group, goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }), @@ -201,7 +197,7 @@ func TestRecoverAhead(t *testing.T) { proc2Storages := newStorageTracker() - proc2, err := goka.NewProcessor([]string{*broker}, + proc2, err := goka.NewProcessor(brokers, goka.DefineGroup( group, goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }), @@ -214,14 +210,14 @@ func TestRecoverAhead(t *testing.T) { // emit something into the join table (like simulating a processor ctx.SetValue()). // Our test processors should update their value in the join-table - joinEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(joinTable), new(codec.String)) + joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String)) test.AssertNil(t, err) defer joinEmitter.Finish() joinEmitter.EmitSync("key1", "joinval1") // emit something into the join table (like simulating a processor ctx.SetValue()). // Our test processors should update their value in the join-table - tableEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(table), new(codec.String)) + tableEmitter, err := goka.NewEmitter(brokers, goka.Stream(table), new(codec.String)) test.AssertNil(t, err) defer tableEmitter.Finish() tableEmitter.EmitSync("key1", "tableval1") @@ -293,9 +289,8 @@ func TestRecoverAhead(t *testing.T) { // TestRebalance runs some processors to test rebalance. It's merely a // runs-without-errors test, not a real functional test. func TestRebalance(t *testing.T) { - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } + + brokers := initSystemTest(t) var ( group goka.Group = "goka-systemtest-rebalance" @@ -309,13 +304,13 @@ func TestRebalance(t *testing.T) { tmc.Table.Replication = 1 tmc.Stream.Replication = 1 cfg := goka.DefaultConfig() - tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) test.AssertNil(t, err) err = tm.EnsureStreamExists(inputStream, 20) test.AssertNil(t, err) - em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.String)) + em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String)) test.AssertNil(t, err) go func() { @@ -329,7 +324,7 @@ func TestRebalance(t *testing.T) { }() createProc := func(id int) *goka.Processor { - proc, err := goka.NewProcessor([]string{*broker}, + proc, err := goka.NewProcessor(brokers, goka.DefineGroup( group, goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }), @@ -364,9 +359,8 @@ func TestRebalance(t *testing.T) { } func TestCallbackFail(t *testing.T) { - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } + + brokers := initSystemTest(t) var ( group goka.Group = goka.Group(fmt.Sprintf("goka-systemtest-callback-fail-%d", time.Now().Unix())) @@ -378,16 +372,16 @@ func TestCallbackFail(t *testing.T) { tmc.Table.Replication = 1 tmc.Stream.Replication = 1 cfg := goka.DefaultConfig() - tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) test.AssertNil(t, err) err = tm.EnsureStreamExists(inputStream, 1) test.AssertNil(t, err) - em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.Int64)) + em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.Int64)) test.AssertNil(t, err) - proc, err := goka.NewProcessor([]string{*broker}, + proc, err := goka.NewProcessor(brokers, goka.DefineGroup( group, goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) { @@ -430,9 +424,8 @@ func TestCallbackFail(t *testing.T) { } func TestProcessorSlowStuck(t *testing.T) { - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } + + brokers := initSystemTest(t) var ( group goka.Group = "goka-systemtest-slow-callback-fail" @@ -443,16 +436,16 @@ func TestProcessorSlowStuck(t *testing.T) { tmc.Table.Replication = 1 tmc.Stream.Replication = 1 cfg := goka.DefaultConfig() - tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) test.AssertNil(t, err) err = tm.EnsureStreamExists(inputStream, 2) test.AssertNil(t, err) - em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.Int64)) + em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.Int64)) test.AssertNil(t, err) - proc, err := goka.NewProcessor([]string{*broker}, + proc, err := goka.NewProcessor(brokers, goka.DefineGroup( group, goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) { @@ -525,18 +518,16 @@ func TestProcessorSlowStuck(t *testing.T) { // ok github.com/lovoo/goka/systemtest 41.716s func TestMessageCommit(t *testing.T) { + brokers := initSystemTest(t) + var ( group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-message-commit", time.Now().Unix())) inputStream goka.Stream = goka.Stream(group) + "-input" numMessages = 10 ) - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } - // New Emitter that will in total send 10 messages - emitter, err := goka.NewEmitter([]string{*broker}, inputStream, new(codec.Int64)) + emitter, err := goka.NewEmitter(brokers, inputStream, new(codec.Int64)) test.AssertNil(t, err) // some boiler plate code to create the topics in kafka using @@ -550,7 +541,7 @@ func TestMessageCommit(t *testing.T) { goka.ReplaceGlobalConfig(cfg) tmBuilder := goka.TopicManagerBuilderWithConfig(cfg, tmc) - tm, err := tmBuilder([]string{*broker}) + tm, err := tmBuilder(brokers) test.AssertNil(t, err) tm.EnsureStreamExists(string(inputStream), 10) @@ -574,7 +565,7 @@ func TestMessageCommit(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - proc, err := goka.NewProcessor([]string{*broker}, goka.DefineGroup(group, + proc, err := goka.NewProcessor(brokers, goka.DefineGroup(group, goka.Input(inputStream, new(codec.Int64), func(ctx goka.Context, msg interface{}) { log.Printf("consuming message") if val := ctx.Value(); val == nil { diff --git a/systemtest/proxy.go b/systemtest/proxy.go new file mode 100644 index 00000000..41be69e0 --- /dev/null +++ b/systemtest/proxy.go @@ -0,0 +1,151 @@ +package systemtest + +import ( + "log" + "net" + "sync" +) + +// FIProxy is a fault injecting proxy hooked into the sarama-config +// to proxy connections to kafka and inject connection loss etc. +type FIProxy struct { + m sync.Mutex + + conns map[string]*Conn +} + +type Conns []*Conn + +func (cs Conns) SetReadError(err error) { + for _, conn := range cs { + conn.SetReadError(err) + } +} + +func (cs Conns) SetWriteError(err error) { + for _, conn := range cs { + conn.SetWriteError(err) + } +} + +func (cs Conns) ResetErrors() { + for _, conn := range cs { + conn.ResetErrors() + } +} + +type Conn struct { + m sync.RWMutex + net.Conn + + readErr error + writeErr error + + fip *FIProxy +} + +func (c *Conn) Close() error { + defer c.fip.removeConn(c.Conn) + return c.Conn.Close() +} + +func (c *Conn) SetReadError(err error) { + c.m.Lock() + defer c.m.Unlock() + c.readErr = err +} + +func (c *Conn) SetWriteError(err error) { + c.m.Lock() + defer c.m.Unlock() + c.writeErr = err +} + +func (c *Conn) ResetErrors() { + c.m.Lock() + defer c.m.Unlock() + c.writeErr = nil + c.readErr = nil +} + +func (c *Conn) Read(b []byte) (int, error) { + c.m.RLock() + defer c.m.RUnlock() + + if c.readErr != nil { + return 0, c.readErr + } + return c.Conn.Read(b) +} + +func (c *Conn) Write(b []byte) (int, error) { + c.m.RLock() + defer c.m.RUnlock() + + if c.writeErr != nil { + return 0, c.writeErr + } + return c.Conn.Write(b) +} + +func NewFIProxy() *FIProxy { + return &FIProxy{ + conns: make(map[string]*Conn), + } +} + +func (fip *FIProxy) Dial(network, addr string) (c net.Conn, err error) { + fip.m.Lock() + defer fip.m.Unlock() + + conn, err := net.Dial(network, addr) + log.Printf("dialing %s:%s", network, addr) + + wrappedConn := &Conn{ + Conn: conn, + fip: fip, + } + + fip.conns[conn.LocalAddr().String()] = wrappedConn + return wrappedConn, err +} + +func (fip *FIProxy) removeConn(c net.Conn) { + fip.m.Lock() + defer fip.m.Unlock() + + log.Printf("removing connection %s", c.LocalAddr().String()) + delete(fip.conns, c.LocalAddr().String()) +} + +func (fip *FIProxy) getConns() []string { + var conns []string + + fip.m.Lock() + defer fip.m.Unlock() + + for c := range fip.conns { + conns = append(conns, c) + } + + return conns +} + +func (fip *FIProxy) GetOne() (bool, net.Conn) { + + fip.m.Lock() + defer fip.m.Unlock() + + for _, conn := range fip.conns { + return true, conn + } + return false, nil +} + +func (fip *FIProxy) All() Conns { + var conns Conns + for _, conn := range fip.conns { + conns = append(conns, conn) + } + return conns +} diff --git a/systemtest/topicmanager_test.go b/systemtest/topicmanager_test.go index 7fda5469..c8cde42c 100644 --- a/systemtest/topicmanager_test.go +++ b/systemtest/topicmanager_test.go @@ -3,7 +3,6 @@ package systemtest import ( "crypto/rand" "encoding/hex" - "flag" "strings" "testing" "time" @@ -13,20 +12,13 @@ import ( "github.com/lovoo/goka/internal/test" ) -var ( - systemtest = flag.Bool("systemtest", false, "set to run systemtests that require a running kafka-version") - broker = flag.String("broker", "localhost:9092", "bootstrap broker to use for local testing") -) - func TestTopicManagerCreate(t *testing.T) { - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } + brokers := initSystemTest(t) cfg := sarama.NewConfig() cfg.Version = sarama.V0_11_0_0 - tm, err := goka.TopicManagerBuilderWithConfig(cfg, goka.NewTopicManagerConfig())([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, goka.NewTopicManagerConfig())(brokers) test.AssertNil(t, err) err = tm.EnsureTopicExists("test10", 4, 2, nil) @@ -37,9 +29,7 @@ func TestTopicManagerCreate(t *testing.T) { // Tests the topic manager with sarama version v11 --> so it will test topic configuration using // the sarama.ClusterAdmin func TestTopicManager_v11(t *testing.T) { - if !*systemtest { - t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them") - } + brokers := initSystemTest(t) cfg := sarama.NewConfig() cfg.Version = sarama.V0_11_0_0 @@ -47,10 +37,10 @@ func TestTopicManager_v11(t *testing.T) { tmc.Table.Replication = 1 tmc.MismatchBehavior = goka.TMConfigMismatchBehaviorFail - tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker}) + tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers) test.AssertNil(t, err) - client, _ := sarama.NewClient([]string{*broker}, cfg) + client, _ := sarama.NewClient(brokers, cfg) admin, _ := sarama.NewClusterAdminFromClient(client) t.Run("ensure-new-stream", func(t *testing.T) { diff --git a/systemtest/view_test.go b/systemtest/view_test.go new file mode 100644 index 00000000..119fd162 --- /dev/null +++ b/systemtest/view_test.go @@ -0,0 +1,111 @@ +package systemtest + +import ( + "context" + "fmt" + "io" + "log" + "testing" + "time" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" + "github.com/lovoo/goka/internal/test" + "github.com/lovoo/goka/multierr" +) + +func TestView_Reconnect(t *testing.T) { + brokers := initSystemTest(t) + + var ( + topic = fmt.Sprintf("goka_systemtest_reconnect_test-%d", time.Now().Unix()) + ) + + em, err := goka.NewEmitter(brokers, goka.Stream(topic), new(codec.Int64)) + test.AssertNil(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + errg, ctx := multierr.NewErrGroup(ctx) + + tmgr, err := goka.DefaultTopicManagerBuilder(brokers) + test.AssertNil(t, err) + test.AssertNil(t, tmgr.EnsureStreamExists(topic, 10)) + + errg.Go(func() error { + defer em.Finish() + var i int64 + for { + select { + case <-ctx.Done(): + return nil + default: + } + + time.Sleep(10 * time.Millisecond) + test.AssertNil(t, em.EmitSync(fmt.Sprintf("key-%d", i%10), i)) + i++ + } + }) + + cfg := goka.DefaultConfig() + + fi := NewFIProxy() + + cfg.Net.Proxy.Enable = true + cfg.Net.Proxy.Dialer = fi + + // we'll use a view on the stream. + view, err := goka.NewView(brokers, goka.Table(topic), new(codec.Int64), + goka.WithViewAutoReconnect(), + goka.WithViewConsumerSaramaBuilder(goka.SaramaConsumerBuilderWithConfig(cfg)), + ) + test.AssertNil(t, err) + + errg.Go(func() error { + return view.Run(ctx) + }) + pollTimed(t, "view-recovered", 10, view.Recovered) + + time.Sleep(time.Second) + + val, err := view.Get("key-5") + test.AssertNil(t, err) + test.AssertTrue(t, val != nil) + firstVal := val.(int64) + test.AssertTrue(t, firstVal > 0) + + log.Printf("simulate eof") + fi.All().SetReadError(io.EOF) + + time.Sleep(2 * time.Second) + + log.Printf("state: %v", view.CurrentState()) + + val, err = view.Get("key-5") + test.AssertNil(t, err) + test.AssertTrue(t, val != nil) + secondVal := val.(int64) + test.AssertTrue(t, secondVal > firstVal) + + time.Sleep(time.Second) + val, err = view.Get("key-5") + test.AssertNil(t, err) + test.AssertTrue(t, val != nil) + test.AssertTrue(t, val.(int64) == secondVal) + + fi.All().ResetErrors() + + time.Sleep(2 * time.Second) + + log.Printf("state: %v", view.CurrentState()) + + val, err = view.Get("key-5") + test.AssertNil(t, err) + test.AssertTrue(t, val != nil) + test.AssertTrue(t, val.(int64) > secondVal) + + // cancel() + test.AssertNil(t, errg.Wait().ErrorOrNil()) +} diff --git a/view.go b/view.go index 7dc82a20..72292b6b 100644 --- a/view.go +++ b/view.go @@ -331,6 +331,11 @@ func (v *View) Topic() string { // Get can be called by multiple goroutines concurrently. // Get can only be called after Recovered returns true. func (v *View) Get(key string) (interface{}, error) { + + if v.state.IsState(State(ViewStateIdle)) || v.state.IsState(State(ViewStateInitializing)) { + return nil, fmt.Errorf("View is either not running, not correctly initialized or stopped again. It's not safe to retrieve values") + } + // find partition where key is located partTable, err := v.find(key) if err != nil { @@ -425,6 +430,12 @@ func (v *View) Evict(key string) error { // Recovered returns true when the view has caught up with events from kafka. func (v *View) Recovered() bool { + // no partitions --> never recover + // Otherwise we might mask errors of initializing the view + if len(v.partitions) == 0 { + return false + } + for _, p := range v.partitions { if !p.IsRecovered() { return false