Skip to content

Commit

Permalink
bugfix #250 allow view.Get on reconnecting view
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Oct 28, 2021
1 parent 9c98886 commit cb777b8
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 196 deletions.
2 changes: 1 addition & 1 deletion partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.

func (p *PartitionTable) readyToRead() error {
pstate := p.CurrentState()
if pstate != PartitionRunning {
if pstate < PartitionConnecting {
return fmt.Errorf("Partition is not running (but %v) so it's not safe to read values", pstate)
}
return nil
Expand Down
130 changes: 130 additions & 0 deletions systemtest/multitopic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package systemtest

import (
"context"
"fmt"
"log"
"math/rand"
"testing"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/internal/test"
"github.com/lovoo/goka/multierr"
)

// Tests a processor with multiple input topics. Random values are emitted to random topics, the values are accumulated
// for a single key and checked for correctness after emitting a couple of messages.
// This is a regression/showcase test for https://github.com/lovoo/goka/issues/332
func TestMultiTopics(t *testing.T) {

brokers := initSystemTest(t)
var (
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-multitopic", time.Now().Unix()))
table = goka.GroupTable(group)
inputStreams []goka.Stream
)

for i := 0; i < 5; i++ {
inputStreams = append(inputStreams, goka.Stream(fmt.Sprintf("%s-input-%d", string(group), i)))
}

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

for _, inStream := range inputStreams {
err = tm.EnsureStreamExists(string(inStream), 1)
test.AssertNil(t, err)
}
// let the cluster create it
time.Sleep(5 * time.Second)

proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Inputs(inputStreams, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
var oldVal int64

if val := ctx.Value(); val != nil {
oldVal = val.(int64)
}

// accumulate with old value
ctx.SetValue(msg.(int64) + oldVal)
}),
goka.Persist(new(codec.Int64)),
),
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)),
)
test.AssertNil(t, err)

view, err := goka.NewView(brokers, table, new(codec.Int64))
test.AssertNil(t, err)

var emitters []*goka.Emitter

for _, input := range inputStreams {
emitter, err := goka.NewEmitter(brokers, input, new(codec.Int64))
test.AssertNil(t, err)
emitters = append(emitters, emitter)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errg, ctx := multierr.NewErrGroup(ctx)

errg.Go(func() error {
return proc.Run(ctx)
})
errg.Go(func() error {
return view.Run(ctx)
})

log.Printf("waiting for processor/view to be running")
pollTimed(t, "proc and view are recovered", 10.0, proc.Recovered, view.Recovered)
log.Printf("...done")

var sum int64
for i := int64(0); i < 100; i++ {
value := rand.Int63n(100)
// emit to random emitters in sync
err := emitters[rand.Intn(len(emitters))].EmitSync("key", value)
test.AssertNil(t, err)
// ... and batched
prom, err := emitters[rand.Intn(len(emitters))].Emit("key", value)
test.AssertNil(t, err)
prom.Then(func(err error) {
test.AssertNil(t, err)
})

// accumulate what we have sent so far
sum += (value * 2)
}

for _, emitter := range emitters {
test.AssertNil(t, emitter.Finish())
}

// poll the view and the processor until we're sure that we have
pollTimed(t, "all messages have been transferred", 10.0,
func() bool {
value, err := view.Get("key")
test.AssertNil(t, err)
return value != nil && value.(int64) == sum
},
func() bool {
value, err := proc.Get("key")
test.AssertNil(t, err)
return value != nil && value.(int64) == sum
},
)

// stop everything and wait until it's shut down
cancel()
test.AssertNil(t, errg.Wait().ErrorOrNil())
}
33 changes: 1 addition & 32 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package systemtest
import (
"context"
"fmt"
"log"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -348,8 +347,6 @@ func TestRebalance(t *testing.T) {
p := createProc(i)
ctx, cancel := context.WithTimeout(ctx, time.Duration(16)*time.Second)
defer cancel()
log.Printf("Starting processor %d", i)
defer log.Printf("Stopping processor %d", i)
return p.Run(ctx)
})
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -451,8 +448,7 @@ func TestProcessorSlowStuck(t *testing.T) {
goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) {
val := msg.(int64)
time.Sleep(500 * time.Microsecond)
log.Printf("%d", val)
if ctx.Partition() == 0 && val > 50 {
if ctx.Partition() == 0 && val > 10 {
// do an invalid action
panic("asdf")
}
Expand Down Expand Up @@ -484,7 +480,6 @@ func TestProcessorSlowStuck(t *testing.T) {
return proc.Run(ctx)
})
err = errg.Wait().ErrorOrNil()
log.Printf("%v", err)
test.AssertTrue(t, strings.Contains(err.Error(), "panic in callback"))
}

Expand All @@ -494,28 +489,6 @@ func TestProcessorSlowStuck(t *testing.T) {
// * Create a processor that consumes+accumulates this one value into its state. The final state obviously is 10.
// * restart this processor a couple of times and check whether it stays 10.
//
// Running it with go test -v github.com/lovoo/goka/systemtest -run TestMessageCommit -args -systemtest should yield:
// === RUN TestMessageCommit
// 2021/07/26 10:49:54 emitting 10 messages
// 2021/07/26 10:49:55 starting processor
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:50:05 received 10 messages
// 2021/07/26 10:50:06 starting processor
// 2021/07/26 10:50:16 received 10 messages
// 2021/07/26 10:50:16 starting processor
// 2021/07/26 10:50:26 received 10 messages
// --- PASS: TestMessageCommit (41.70s)
// PASS
// ok github.com/lovoo/goka/systemtest 41.716s
func TestMessageCommit(t *testing.T) {

brokers := initSystemTest(t)
Expand Down Expand Up @@ -549,7 +522,6 @@ func TestMessageCommit(t *testing.T) {
// give it time to actually create the topic
time.Sleep(10 * time.Second)

log.Printf("emitting %d messages", numMessages)
for i := 0; i < numMessages; i++ {
emitter.EmitSync("1", int64(1))
}
Expand All @@ -560,14 +532,12 @@ func TestMessageCommit(t *testing.T) {
// It always end up with a state of "10", but only consume the messages the first time.
// The second and third time it will just start as there are no new message in the topic.
for i := 0; i < 3; i++ {
log.Printf("starting processor")
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

proc, err := goka.NewProcessor(brokers, goka.DefineGroup(group,
goka.Input(inputStream, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
log.Printf("consuming message")
if val := ctx.Value(); val == nil {
ctx.SetValue(msg)
} else {
Expand All @@ -591,7 +561,6 @@ func TestMessageCommit(t *testing.T) {
test.AssertNil(t, err)
test.AssertTrue(t, val != nil)
test.AssertEqual(t, val.(int64), int64(numMessages))
log.Printf("received %d messages", numMessages)

cancel()
<-done
Expand Down
Loading

0 comments on commit cb777b8

Please sign in to comment.