Skip to content

Commit

Permalink
Improved failure handling
Browse files Browse the repository at this point in the history
fully remove multierr.Errors
system test for testing connection errors
bugfix issue 350 View.Get fails on reconnecting view
bugfix processor freeze on error (334)
frairon committed Nov 8, 2021

Verified

This commit was signed with the committer’s verified signature.
wence- Lawrence Mitchell
1 parent 9ff2ce3 commit 6334df4
Showing 17 changed files with 731 additions and 209 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


test:
go test -race ./...

test-systemtest:
GOKA_SYSTEMTEST=y go test -v github.com/lovoo/goka/systemtest

test-all: test test-systemtest
10 changes: 5 additions & 5 deletions context.go
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/multierr"
"github.com/hashicorp/go-multierror"
)

type emitter func(topic string, key string, value []byte, headers Headers) *Promise
@@ -177,7 +177,7 @@ type cbContext struct {
dones int
stores int
}
errors multierr.Errors
errors *multierror.Error
m sync.Mutex
wg *sync.WaitGroup
}
@@ -438,7 +438,7 @@ func (ctx *cbContext) start() {
// this function must be called from a locked function.
func (ctx *cbContext) tryCommit(err error) {
if err != nil {
_ = ctx.errors.Collect(err)
ctx.errors = multierror.Append(ctx.errors, err)
}

// not all calls are done yet, do not send the ack upstream.
@@ -447,8 +447,8 @@ func (ctx *cbContext) tryCommit(err error) {
}

// commit if no errors, otherwise fail context
if ctx.errors.HasErrors() {
ctx.asyncFailer(ctx.errors.NilOrError())
if ctx.errors.ErrorOrNil() != nil {
ctx.asyncFailer(ctx.errors.ErrorOrNil())
} else {
ctx.commit()
}
3 changes: 2 additions & 1 deletion context_test.go
Original file line number Diff line number Diff line change
@@ -121,7 +121,7 @@ func TestContext_DeferCommit_witherror(t *testing.T) {
doneFunc(fmt.Errorf("async error"))
// no commit, no ack, so we'll get the message again.
test.AssertEqual(t, ack, 0)
test.AssertEqual(t, ctx.errors.NilOrError().Error(), "async error")
test.AssertStringContains(t, ctx.errors.ErrorOrNil().Error(), "async error")
}

func TestContext_Timestamp(t *testing.T) {
@@ -584,6 +584,7 @@ func TestContext_Lookup(t *testing.T) {
msg: &message{key: key},
views: map[string]*View{
string(table): {
state: newViewSignal().SetState(State(ViewStateRunning)),
opts: &voptions{
tableCodec: c,
hasher: DefaultHasher(),
124 changes: 44 additions & 80 deletions partition_table.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/hashicorp/go-multierror"
"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
)
@@ -17,6 +18,8 @@ const (

// internal offset we use to detect if the offset has never been stored locally
offsetNotStored int64 = -3

consumerDrainTimeout = time.Second
)

// Backoff is used for adding backoff capabilities to the restarting
@@ -191,7 +194,6 @@ func (p *PartitionTable) Close() error {
func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, error) {
var (
err error
errs = new(multierr.Errors)
st storage.Storage
start = time.Now()
done = make(chan struct{})
@@ -217,9 +219,7 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
}
err = st.Open()
if err != nil {
errs.Collect(st.Close())
errs.Collect(fmt.Errorf("error opening storage: %v", err))
return nil, errs.NilOrError()
return nil, multierror.Append(st.Close(), fmt.Errorf("error opening storage: %v", err)).ErrorOrNil()
}

// close the db if context was cancelled before the builder returned
@@ -228,10 +228,9 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
err = st.Close()
// only collect context error if Close() errored out
if err != nil {
errs.Collect(err)
errs.Collect(ctx.Err())
return nil, multierror.Append(err, ctx.Err()).ErrorOrNil()
}
return nil, errs.NilOrError()
return nil, nil
default:
}

@@ -280,37 +279,25 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
storedOffset int64
partConsumer sarama.PartitionConsumer
err error
errs = new(multierr.Errors)
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// deferred error handling
defer func() {
errs.Collect(rerr)

rerr = errs.NilOrError()
return
}()

p.state.SetState(State(PartitionConnecting))

// fetch local offset
storedOffset, err = p.st.GetOffset(offsetNotStored)
if err != nil {
errs.Collect(fmt.Errorf("error reading local offset: %v", err))
return
return fmt.Errorf("error reading local offset: %v", err)
}

loadOffset, hwm, err := p.findOffsetToLoad(storedOffset)
if err != nil {
errs.Collect(err)
return
return err
}

if storedOffset > 0 && hwm == 0 {
errs.Collect(fmt.Errorf("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d", p.topic, p.partition, hwm, storedOffset))
return
return fmt.Errorf("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d", p.topic, p.partition, hwm, storedOffset)
}

if storedOffset >= hwm {
@@ -334,8 +321,7 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
// AND we're here for catchup, so let's stop here
// and do not attempt to load anything
if stopAfterCatchup && loadOffset >= hwm {
errs.Collect(p.markRecovered(ctx))
return
return p.markRecovered(ctx)
}

if stopAfterCatchup {
@@ -348,17 +334,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr

partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err))
return
return fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err)
}

// consume errors asynchronously
go p.handleConsumerErrors(ctx, errs, partConsumer)

// close the consumer
defer func() {
partConsumer.AsyncClose()
p.drainConsumer(partConsumer, errs)
rerr = multierror.Append(rerr, p.drainConsumer(partConsumer)).ErrorOrNil()
}()

if stopAfterCatchup {
@@ -371,15 +353,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup)

if loadErr != nil {
errs.Collect(loadErr)
return
return loadErr
}

if stopAfterCatchup {
errs.Collect(p.markRecovered(ctx))

now := time.Now()
p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = now })
err := p.markRecovered(ctx)
p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = time.Now() })
return err
}
return
}
@@ -424,43 +404,27 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
}
}

func (p *PartitionTable) handleConsumerErrors(ctx context.Context, errs *multierr.Errors, cons sarama.PartitionConsumer) {
for {
select {
case consError, ok := <-cons.Errors():
if !ok {
return
}
err := fmt.Errorf("Consumer error: %v", consError)
p.log.Printf("%v", err)
errs.Collect(err)
// if there's an error, close the consumer
cons.AsyncClose()
case <-ctx.Done():
return
}
}
}
func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {

func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *multierr.Errors) {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
defer cancel()

errg, ctx := multierr.NewErrGroup(ctx)
errg, _ := multierr.NewErrGroup(context.Background())

// drain errors channel
errg.Go(func() error {
var errs *multierror.Error

for {
select {
case <-ctx.Done():
case <-timeoutCtx.Done():
p.log.Printf("draining errors channel timed out")
return nil
return errs
case err, ok := <-cons.Errors():
if !ok {
return nil
return errs
}
errs.Collect(err)
errs = multierror.Append(errs, err)
}
}
})
@@ -469,7 +433,7 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
errg.Go(func() error {
for {
select {
case <-ctx.Done():
case <-timeoutCtx.Done():
p.log.Printf("draining messages channel timed out")
return nil
case _, ok := <-cons.Messages():
@@ -480,30 +444,31 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
}
})

errg.Wait()
return errg.Wait().ErrorOrNil()
}

func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) (rerr error) {
errs := new(multierr.Errors)

// deferred error handling
defer func() {
errs.Collect(rerr)

rerr = errs.NilOrError()
return
}()
func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) error {

stallTicker := time.NewTicker(p.stallPeriod)
defer stallTicker.Stop()

lastMessage := time.Now()

messages := cons.Messages()
errors := cons.Errors()

for {
select {
case msg, ok := <-cons.Messages():
case err, ok := <-errors:
if !ok {
return
return nil
}
if err != nil {
return err
}
case msg, ok := <-messages:
if !ok {
return nil
}

// This case is for the Tester to achieve synchronity.
@@ -521,8 +486,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition

lastMessage = time.Now()
if err := p.storeEvent(string(msg.Key), msg.Value, msg.Offset, msg.Headers); err != nil {
errs.Collect(fmt.Errorf("load: error updating storage: %v", err))
return
return fmt.Errorf("load: error updating storage: %v", err)
}

if stopAfterCatchup {
@@ -532,7 +496,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
p.enqueueStatsUpdate(ctx, func() { p.trackIncomingMessageStats(msg) })

if stopAfterCatchup && msg.Offset >= partitionHwm-1 {
return
return nil
}

case now := <-stallTicker.C:
@@ -543,7 +507,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
}

case <-ctx.Done():
return
return nil
}
}
}
@@ -711,7 +675,7 @@ func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.

func (p *PartitionTable) readyToRead() error {
pstate := p.CurrentState()
if pstate != PartitionRunning {
if pstate < PartitionConnecting {
return fmt.Errorf("Partition is not running (but %v) so it's not safe to read values", pstate)
}
return nil
69 changes: 47 additions & 22 deletions processor.go
Original file line number Diff line number Diff line change
@@ -321,13 +321,6 @@ func (g *Processor) rebalanceLoop(ctx context.Context) (rerr error) {
return fmt.Errorf(errBuildConsumer, err)
}

go func() {
errs := consumerGroup.Errors()
for err := range errs {
g.log.Printf("error while executing consumer group: %v", err)
}
}()

var topics []string
for _, e := range g.graph.InputStreams() {
topics = append(topics, e.Topic())
@@ -352,33 +345,65 @@ func (g *Processor) rebalanceLoop(ctx context.Context) (rerr error) {
}()

for {
sessionCtx, sessionCtxCancel := context.WithCancel(ctx)

go func() {
g.handleSessionErrors(ctx, sessionCtx, sessionCtxCancel, consumerGroup)
}()

err := consumerGroup.Consume(ctx, topics, g)
var (
errProc *errProcessing
errSetup *errSetup
)
sessionCtxCancel()

if errors.As(err, &errProc) {
g.log.Debugf("error processing message (non-transient), shutting down processor: %v", err)
return err
}
if errors.As(err, &errSetup) {
g.log.Debugf("setup error (non-transient), shutting down processor: %v", err)
// error consuming, no way to recover so we have to kill the processor
if err != nil {
return err
}

if err != nil {
g.log.Printf("Error executing group consumer (continuing execution, rebalance after short sleep): %v", err)
}
g.log.Debugf("Consumer group returned, trying rebalance")

select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
case <-sessionCtx.Done():
return nil
}
}
}

func (g *Processor) handleSessionErrors(ctx, sessionCtx context.Context, sessionCtxCancel context.CancelFunc, consumerGroup sarama.ConsumerGroup) {
errs := consumerGroup.Errors()

for {
select {
case <-ctx.Done():
return
case <-sessionCtx.Done():
return
case err, ok := <-errs:
if !ok {
return
}

if err != nil {
g.log.Printf("error during execution of consumer group: %v", err)
}

var (
errProc *errProcessing
errSetup *errSetup
)

if errors.As(err, &errProc) {
g.log.Debugf("error processing message (non-transient), shutting down processor: %v", err)
sessionCtxCancel()
}
if errors.As(err, &errSetup) {
g.log.Debugf("setup error (non-transient), shutting down processor: %v", err)
sessionCtxCancel()
}
}
}
}

// waits for all tables that are supposed to start up
func (g *Processor) waitForStartupTables(ctx context.Context) error {

@@ -717,7 +742,7 @@ func (g *Processor) WaitForReady() {

// wait for all partitionprocessors to be running

// copy them first with the mutex so we don't run in to race conditions
// copy them first with the mutex so we don't run into a deadlock

g.mTables.RLock()
parts := make([]*PartitionProcessor, 0, len(g.partitions))
92 changes: 92 additions & 0 deletions systemtest/emitter_disconnect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package systemtest

import (
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/internal/test"
)

func TestEmitter_KafkaDisconnect(t *testing.T) {
brokers := initSystemTest(t)
var (
topic = goka.Stream(fmt.Sprintf("goka_systemtest_emitter_disconnect-%d", time.Now().Unix()))
)

tmgr, err := goka.DefaultTopicManagerBuilder(brokers)
test.AssertNil(t, err)
test.AssertNil(t, tmgr.EnsureStreamExists(string(topic), 10))

cfg := goka.DefaultConfig()

fi := NewFIProxy()
cfg.Net.Proxy.Enable = true
cfg.Net.Proxy.Dialer = fi

// get it faster over with
cfg.Producer.Retry.Max = 1
cfg.Producer.Retry.Backoff = 0

em, err := goka.NewEmitter(brokers, topic, new(codec.Int64),
goka.WithEmitterProducerBuilder(goka.ProducerBuilderWithConfig(cfg)),
)
test.AssertNil(t, err)
var (
i int64
success int64
)

done := make(chan struct{})
go func() {
defer close(done)
var closeOnce sync.Once
stop := make(chan struct{})
for {
select {
case <-stop:
return
default:
}

prom, err := em.Emit(fmt.Sprintf("key-%d", i%20), i)
if err != nil {
if errors.Is(err, goka.ErrEmitterAlreadyClosed) {
return
}
log.Printf("error emitting: %v", err)
}
prom.Then(func(err error) {
if err != nil {
log.Printf("error emitting (async): %v", err)
closeOnce.Do(func() {
close(stop)
})
return
}
if err == nil {
atomic.AddInt64(&success, 1)
}

})
time.Sleep(10 * time.Millisecond)
i++
}

}()

pollTimed(t, "emitter emitted something successfully", 10, func() bool {
return atomic.LoadInt64(&success) > 0
})

fi.SetWriteError(syscall.EPIPE)
<-done
test.AssertNil(t, em.Finish())
}
8 changes: 3 additions & 5 deletions systemtest/emitter_offset_test.go
Original file line number Diff line number Diff line change
@@ -21,20 +21,18 @@ func TestEmitterOffset(t *testing.T) {

var topic goka.Stream = goka.Stream(fmt.Sprintf("%s-%d", "goka-systemtest-emitter-offset", time.Now().Unix()))

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}
brokers := initSystemTest(t)

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)
tm.EnsureStreamExists(string(topic), 1)

var lastOffset int64

emitter, err := goka.NewEmitter([]string{*broker}, topic, new(codec.Int64))
emitter, err := goka.NewEmitter(brokers, topic, new(codec.Int64))
if err != nil {
t.Fatalf("error creating emitter: %v", err)
}
19 changes: 19 additions & 0 deletions systemtest/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package systemtest

import (
"os"
"strings"
"testing"
)

// Checks if the env-variable to activate system test is set and returns a broker
// If system tests are not activated, will skip the test
func initSystemTest(t *testing.T) []string {
if _, isIntegration := os.LookupEnv("GOKA_SYSTEMTEST"); !isIntegration {
t.Skip("*** skip integration test ***")
}
if brokers, ok := os.LookupEnv("GOKA_SYSTEMTEST_BROKERS"); ok {
return strings.Split(brokers, ",")
}
return []string{"localhost:9092"}
}
130 changes: 130 additions & 0 deletions systemtest/multitopic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package systemtest

import (
"context"
"fmt"
"log"
"math/rand"
"testing"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/internal/test"
"github.com/lovoo/goka/multierr"
)

// Tests a processor with multiple input topics. Random values are emitted to random topics, the values are accumulated
// for a single key and checked for correctness after emitting a couple of messages.
// This is a regression/showcase test for https://github.com/lovoo/goka/issues/332
func TestMultiTopics(t *testing.T) {

brokers := initSystemTest(t)
var (
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-multitopic", time.Now().Unix()))
table = goka.GroupTable(group)
inputStreams []goka.Stream
)

for i := 0; i < 5; i++ {
inputStreams = append(inputStreams, goka.Stream(fmt.Sprintf("%s-input-%d", string(group), i)))
}

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

for _, inStream := range inputStreams {
err = tm.EnsureStreamExists(string(inStream), 1)
test.AssertNil(t, err)
}
// let the cluster create it
time.Sleep(5 * time.Second)

proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Inputs(inputStreams, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
var oldVal int64

if val := ctx.Value(); val != nil {
oldVal = val.(int64)
}

// accumulate with old value
ctx.SetValue(msg.(int64) + oldVal)
}),
goka.Persist(new(codec.Int64)),
),
goka.WithTopicManagerBuilder(goka.TopicManagerBuilderWithTopicManagerConfig(tmc)),
)
test.AssertNil(t, err)

view, err := goka.NewView(brokers, table, new(codec.Int64))
test.AssertNil(t, err)

var emitters []*goka.Emitter

for _, input := range inputStreams {
emitter, err := goka.NewEmitter(brokers, input, new(codec.Int64))
test.AssertNil(t, err)
emitters = append(emitters, emitter)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errg, ctx := multierr.NewErrGroup(ctx)

errg.Go(func() error {
return proc.Run(ctx)
})
errg.Go(func() error {
return view.Run(ctx)
})

log.Printf("waiting for processor/view to be running")
pollTimed(t, "proc and view are recovered", 10.0, proc.Recovered, view.Recovered)
log.Printf("...done")

var sum int64
for i := int64(0); i < 100; i++ {
value := rand.Int63n(100)
// emit to random emitters in sync
err := emitters[rand.Intn(len(emitters))].EmitSync("key", value)
test.AssertNil(t, err)
// ... and batched
prom, err := emitters[rand.Intn(len(emitters))].Emit("key", value)
test.AssertNil(t, err)
prom.Then(func(err error) {
test.AssertNil(t, err)
})

// accumulate what we have sent so far
sum += (value * 2)
}

for _, emitter := range emitters {
test.AssertNil(t, emitter.Finish())
}

// poll the view and the processor until we're sure that we have
pollTimed(t, "all messages have been transferred", 10.0,
func() bool {
value, err := view.Get("key")
test.AssertNil(t, err)
return value != nil && value.(int64) == sum
},
func() bool {
value, err := proc.Get("key")
test.AssertNil(t, err)
return value != nil && value.(int64) == sum
},
)

// stop everything and wait until it's shut down
cancel()
test.AssertNil(t, errg.Wait().ErrorOrNil())
}
96 changes: 96 additions & 0 deletions systemtest/proc_disconnect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package systemtest

import (
"context"
"fmt"
"io"
"log"
"testing"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/internal/test"
"github.com/lovoo/goka/multierr"
)

func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
brokers := initSystemTest(t)
var (
topic = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d", time.Now().Unix()))
group = goka.Group(topic)
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errg, ctx := multierr.NewErrGroup(ctx)

tmgr, err := goka.DefaultTopicManagerBuilder(brokers)
test.AssertNil(t, err)
test.AssertNil(t, tmgr.EnsureStreamExists(string(topic), 10))

// emit values
errg.Go(func() error {
em, err := goka.NewEmitter(brokers, topic, new(codec.Int64))
test.AssertNil(t, err)
defer em.Finish()
var i int64
for {
select {
case <-ctx.Done():
return nil
default:
}

prom, err := em.Emit(fmt.Sprintf("key-%d", i%20), i)
test.AssertNil(t, err)
prom.Then(func(err error) {
test.AssertNil(t, err)
})
time.Sleep(100 * time.Millisecond)
i++
}
})

cfg := goka.DefaultConfig()

fi := NewFIProxy()
cfg.Net.Proxy.Enable = true
cfg.Net.Proxy.Dialer = fi

proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(topic, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
if val := ctx.Value(); val != nil {
ctx.SetValue(val.(int64) + msg.(int64))
} else {
ctx.SetValue(msg)
}
}),
goka.Persist(new(codec.Int64)),
),
goka.WithConsumerGroupBuilder(goka.ConsumerGroupBuilderWithConfig(cfg)),
goka.WithProducerBuilder(goka.ProducerBuilderWithConfig(cfg)),
goka.WithConsumerSaramaBuilder(goka.SaramaConsumerBuilderWithConfig(cfg)),
)
test.AssertNil(t, err)

errg.Go(func() error {
return proc.Run(ctx)
})
pollTimed(t, "proc running", 10, proc.Recovered, func() bool {
if val, _ := proc.Get("key-15"); val != nil && val.(int64) > 0 {
return true
}
return false
})

log.Printf("disconnecting consumer-group")
fi.SetReadError(io.EOF)
fi.SetWriteError(io.ErrClosedPipe)
err = errg.Wait().ErrorOrNil()

test.AssertNotNil(t, err)
}
108 changes: 34 additions & 74 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ package systemtest
import (
"context"
"fmt"
"log"
"os"
"strings"
"testing"
@@ -30,14 +29,12 @@ func TestHotStandby(t *testing.T) {
joinTable goka.Table = goka.Table(fmt.Sprintf("%s-%d", "goka-systemtest-hotstandby-join", time.Now().Unix()))
)

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}
brokers := initSystemTest(t)

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 2)
@@ -49,7 +46,7 @@ func TestHotStandby(t *testing.T) {

proc1Storages := newStorageTracker()

proc1, err := goka.NewProcessor([]string{*broker},
proc1, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
@@ -62,7 +59,7 @@ func TestHotStandby(t *testing.T) {

proc2Storages := newStorageTracker()

proc2, err := goka.NewProcessor([]string{*broker},
proc2, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
@@ -94,15 +91,15 @@ func TestHotStandby(t *testing.T) {
test.AssertEqual(t, len(proc1Storages.storages), 2)
test.AssertEqual(t, len(proc2Storages.storages), 4)

inputEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.String))
inputEmitter, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
test.AssertNil(t, err)
defer inputEmitter.Finish()
inputEmitter.EmitSync("key1", "message1")
inputEmitter.EmitSync("key2", "message2")

// emit something into the join table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
joinEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(joinTable), new(codec.String))
joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String))
test.AssertNil(t, err)
defer joinEmitter.Finish()
joinEmitter.EmitSync("key1", "joinval1")
@@ -162,22 +159,20 @@ func TestHotStandby(t *testing.T) {
// Test makes sure that still both processors recover the views/tables
func TestRecoverAhead(t *testing.T) {

brokers := initSystemTest(t)

var (
group goka.Group = "goka-systemtest-recoverahead"
inputStream string = string(group) + "-input"
table = string(goka.GroupTable(group))
joinTable goka.Table = "goka-systemtest-recoverahead-join"
)

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 1)
@@ -187,7 +182,7 @@ func TestRecoverAhead(t *testing.T) {

proc1Storages := newStorageTracker()

proc1, err := goka.NewProcessor([]string{*broker},
proc1, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
@@ -201,7 +196,7 @@ func TestRecoverAhead(t *testing.T) {

proc2Storages := newStorageTracker()

proc2, err := goka.NewProcessor([]string{*broker},
proc2, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
@@ -214,14 +209,14 @@ func TestRecoverAhead(t *testing.T) {

// emit something into the join table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
joinEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(joinTable), new(codec.String))
joinEmitter, err := goka.NewEmitter(brokers, goka.Stream(joinTable), new(codec.String))
test.AssertNil(t, err)
defer joinEmitter.Finish()
joinEmitter.EmitSync("key1", "joinval1")

// emit something into the join table (like simulating a processor ctx.SetValue()).
// Our test processors should update their value in the join-table
tableEmitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(table), new(codec.String))
tableEmitter, err := goka.NewEmitter(brokers, goka.Stream(table), new(codec.String))
test.AssertNil(t, err)
defer tableEmitter.Finish()
tableEmitter.EmitSync("key1", "tableval1")
@@ -293,9 +288,8 @@ func TestRecoverAhead(t *testing.T) {
// TestRebalance runs some processors to test rebalance. It's merely a
// runs-without-errors test, not a real functional test.
func TestRebalance(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

brokers := initSystemTest(t)

var (
group goka.Group = "goka-systemtest-rebalance"
@@ -309,13 +303,13 @@ func TestRebalance(t *testing.T) {
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 20)
test.AssertNil(t, err)

em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.String))
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
test.AssertNil(t, err)

go func() {
@@ -329,7 +323,7 @@ func TestRebalance(t *testing.T) {
}()

createProc := func(id int) *goka.Processor {
proc, err := goka.NewProcessor([]string{*broker},
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
@@ -353,8 +347,6 @@ func TestRebalance(t *testing.T) {
p := createProc(i)
ctx, cancel := context.WithTimeout(ctx, time.Duration(16)*time.Second)
defer cancel()
log.Printf("Starting processor %d", i)
defer log.Printf("Stopping processor %d", i)
return p.Run(ctx)
})
time.Sleep(2 * time.Second)
@@ -364,9 +356,8 @@ func TestRebalance(t *testing.T) {
}

func TestCallbackFail(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

brokers := initSystemTest(t)

var (
group goka.Group = goka.Group(fmt.Sprintf("goka-systemtest-callback-fail-%d", time.Now().Unix()))
@@ -378,16 +369,16 @@ func TestCallbackFail(t *testing.T) {
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 1)
test.AssertNil(t, err)

em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.Int64))
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.Int64))
test.AssertNil(t, err)

proc, err := goka.NewProcessor([]string{*broker},
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) {
@@ -430,9 +421,8 @@ func TestCallbackFail(t *testing.T) {
}

func TestProcessorSlowStuck(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

brokers := initSystemTest(t)

var (
group goka.Group = "goka-systemtest-slow-callback-fail"
@@ -443,23 +433,22 @@ func TestProcessorSlowStuck(t *testing.T) {
tmc.Table.Replication = 1
tmc.Stream.Replication = 1
cfg := goka.DefaultConfig()
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

err = tm.EnsureStreamExists(inputStream, 2)
test.AssertNil(t, err)

em, err := goka.NewEmitter([]string{*broker}, goka.Stream(inputStream), new(codec.Int64))
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.Int64))
test.AssertNil(t, err)

proc, err := goka.NewProcessor([]string{*broker},
proc, err := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.Int64), func(ctx goka.Context, msg interface{}) {
val := msg.(int64)
time.Sleep(500 * time.Microsecond)
log.Printf("%d", val)
if ctx.Partition() == 0 && val > 50 {
if ctx.Partition() == 0 && val > 10 {
// do an invalid action
panic("asdf")
}
@@ -491,7 +480,6 @@ func TestProcessorSlowStuck(t *testing.T) {
return proc.Run(ctx)
})
err = errg.Wait().ErrorOrNil()
log.Printf("%v", err)
test.AssertTrue(t, strings.Contains(err.Error(), "panic in callback"))
}

@@ -501,42 +489,18 @@ func TestProcessorSlowStuck(t *testing.T) {
// * Create a processor that consumes+accumulates this one value into its state. The final state obviously is 10.
// * restart this processor a couple of times and check whether it stays 10.
//
// Running it with go test -v github.com/lovoo/goka/systemtest -run TestMessageCommit -args -systemtest should yield:
// === RUN TestMessageCommit
// 2021/07/26 10:49:54 emitting 10 messages
// 2021/07/26 10:49:55 starting processor
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:49:59 consuming message
// 2021/07/26 10:50:05 received 10 messages
// 2021/07/26 10:50:06 starting processor
// 2021/07/26 10:50:16 received 10 messages
// 2021/07/26 10:50:16 starting processor
// 2021/07/26 10:50:26 received 10 messages
// --- PASS: TestMessageCommit (41.70s)
// PASS
// ok github.com/lovoo/goka/systemtest 41.716s
func TestMessageCommit(t *testing.T) {

brokers := initSystemTest(t)

var (
group goka.Group = goka.Group(fmt.Sprintf("%s-%d", "goka-systemtest-message-commit", time.Now().Unix()))
inputStream goka.Stream = goka.Stream(group) + "-input"
numMessages = 10
)

if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}

// New Emitter that will in total send 10 messages
emitter, err := goka.NewEmitter([]string{*broker}, inputStream, new(codec.Int64))
emitter, err := goka.NewEmitter(brokers, inputStream, new(codec.Int64))
test.AssertNil(t, err)

// some boiler plate code to create the topics in kafka using
@@ -550,15 +514,14 @@ func TestMessageCommit(t *testing.T) {
goka.ReplaceGlobalConfig(cfg)

tmBuilder := goka.TopicManagerBuilderWithConfig(cfg, tmc)
tm, err := tmBuilder([]string{*broker})
tm, err := tmBuilder(brokers)
test.AssertNil(t, err)

tm.EnsureStreamExists(string(inputStream), 10)

// give it time to actually create the topic
time.Sleep(10 * time.Second)

log.Printf("emitting %d messages", numMessages)
for i := 0; i < numMessages; i++ {
emitter.EmitSync("1", int64(1))
}
@@ -569,14 +532,12 @@ func TestMessageCommit(t *testing.T) {
// It always end up with a state of "10", but only consume the messages the first time.
// The second and third time it will just start as there are no new message in the topic.
for i := 0; i < 3; i++ {
log.Printf("starting processor")
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

proc, err := goka.NewProcessor([]string{*broker}, goka.DefineGroup(group,
proc, err := goka.NewProcessor(brokers, goka.DefineGroup(group,
goka.Input(inputStream, new(codec.Int64), func(ctx goka.Context, msg interface{}) {
log.Printf("consuming message")
if val := ctx.Value(); val == nil {
ctx.SetValue(msg)
} else {
@@ -600,7 +561,6 @@ func TestMessageCommit(t *testing.T) {
test.AssertNil(t, err)
test.AssertTrue(t, val != nil)
test.AssertEqual(t, val.(int64), int64(numMessages))
log.Printf("received %d messages", numMessages)

cancel()
<-done
110 changes: 110 additions & 0 deletions systemtest/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package systemtest

import (
"net"
"sync"
)

// FIProxy is a fault injecting proxy hooked into the sarama-config
// to proxy connections to kafka and inject connection loss etc.
type FIProxy struct {
m sync.RWMutex

readErr error
writeErr error

conns map[string]*Conn
}

type Conn struct {
net.Conn
fip *FIProxy
}

func (c *Conn) Close() error {
defer c.fip.removeConn(c.Conn)
return c.Conn.Close()
}

func (c *Conn) Read(b []byte) (int, error) {
c.fip.m.RLock()
defer c.fip.m.RUnlock()

if c.fip.readErr != nil {
return 0, c.fip.readErr
}
return c.Conn.Read(b)
}

func (c *Conn) Write(b []byte) (int, error) {
c.fip.m.RLock()
defer c.fip.m.RUnlock()
if c.fip.writeErr != nil {
return 0, c.fip.writeErr
}
return c.Conn.Write(b)
}

func NewFIProxy() *FIProxy {
return &FIProxy{
conns: make(map[string]*Conn),
}
}

func (fip *FIProxy) Dial(network, addr string) (c net.Conn, err error) {
fip.m.Lock()
defer fip.m.Unlock()

conn, err := net.Dial(network, addr)

wrappedConn := &Conn{
Conn: conn,
fip: fip,
}
key := conn.LocalAddr().String()

fip.conns[key] = wrappedConn
return wrappedConn, err
}

func (fip *FIProxy) removeConn(c net.Conn) {
fip.m.Lock()
defer fip.m.Unlock()

delete(fip.conns, c.LocalAddr().String())
}

func (fip *FIProxy) getConns() []string {
fip.m.Lock()
defer fip.m.Unlock()
var conns []string

for c := range fip.conns {
conns = append(conns, c)
}

return conns
}

func (fip *FIProxy) SetReadError(err error) {
fip.m.Lock()
defer fip.m.Unlock()
fip.readErr = err
}

func (fip *FIProxy) SetWriteError(err error) {
fip.m.Lock()
defer fip.m.Unlock()
fip.writeErr = err
}

func (fip *FIProxy) ResetErrors() {
fip.m.Lock()
defer fip.m.Unlock()
fip.readErr = nil
fip.writeErr = nil
}

func (fip *FIProxy) String() string {
return "Fault Injecting Proxy (FIP)"
}
20 changes: 5 additions & 15 deletions systemtest/topicmanager_test.go
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ package systemtest
import (
"crypto/rand"
"encoding/hex"
"flag"
"strings"
"testing"
"time"
@@ -13,20 +12,13 @@ import (
"github.com/lovoo/goka/internal/test"
)

var (
systemtest = flag.Bool("systemtest", false, "set to run systemtests that require a running kafka-version")
broker = flag.String("broker", "localhost:9092", "bootstrap broker to use for local testing")
)

func TestTopicManagerCreate(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}
brokers := initSystemTest(t)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0

tm, err := goka.TopicManagerBuilderWithConfig(cfg, goka.NewTopicManagerConfig())([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, goka.NewTopicManagerConfig())(brokers)
test.AssertNil(t, err)

err = tm.EnsureTopicExists("test10", 4, 2, nil)
@@ -37,20 +29,18 @@ func TestTopicManagerCreate(t *testing.T) {
// Tests the topic manager with sarama version v11 --> so it will test topic configuration using
// the sarama.ClusterAdmin
func TestTopicManager_v11(t *testing.T) {
if !*systemtest {
t.Skipf("Ignoring systemtest. pass '-args -systemtest' to `go test` to include them")
}
brokers := initSystemTest(t)

cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
tmc := goka.NewTopicManagerConfig()
tmc.Table.Replication = 1
tmc.MismatchBehavior = goka.TMConfigMismatchBehaviorFail

tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)([]string{*broker})
tm, err := goka.TopicManagerBuilderWithConfig(cfg, tmc)(brokers)
test.AssertNil(t, err)

client, _ := sarama.NewClient([]string{*broker}, cfg)
client, _ := sarama.NewClient(brokers, cfg)
admin, _ := sarama.NewClusterAdminFromClient(client)

t.Run("ensure-new-stream", func(t *testing.T) {
115 changes: 115 additions & 0 deletions systemtest/view_reconnect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package systemtest

import (
"context"
"fmt"
"io"
"testing"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/internal/test"
"github.com/lovoo/goka/multierr"
)

// Tests the following scenario:
// A view started with `WithViewAutoReconnect` should still return values even after losing connection to kafka.
// Therefore we start a view on a topic fed by an emitter, the view proxies through the FIProxy and loses connection
// after recovering. The values are still be served/returned
func TestView_Reconnect(t *testing.T) {
var topic = fmt.Sprintf("goka_systemtest_view_reconnect_test-%d", time.Now().Unix())
brokers := initSystemTest(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

errg, ctx := multierr.NewErrGroup(ctx)

tmgr, err := goka.DefaultTopicManagerBuilder(brokers)
test.AssertNil(t, err)
test.AssertNil(t, tmgr.EnsureStreamExists(topic, 10))

errg.Go(func() error {
em, err := goka.NewEmitter(brokers, goka.Stream(topic), new(codec.Int64))
if err != nil {
return err
}
defer em.Finish()
var i int64
for {
select {
case <-ctx.Done():
return nil
default:
}

test.AssertNil(t, em.EmitSync("key", i))
time.Sleep(10 * time.Millisecond)
i++
}
})

cfg := goka.DefaultConfig()

fi := NewFIProxy()
cfg.Net.Proxy.Enable = true
cfg.Net.Proxy.Dialer = fi

// we'll use a view on the stream.
view, err := goka.NewView(brokers, goka.Table(topic), new(codec.Int64),
goka.WithViewAutoReconnect(),
goka.WithViewConsumerSaramaBuilder(goka.SaramaConsumerBuilderWithConfig(cfg)),
goka.WithViewTopicManagerBuilder(goka.TopicManagerBuilderWithConfig(cfg, goka.NewTopicManagerConfig())),
)
test.AssertNil(t, err)

// Start view and wait for it to be recovered
errg.Go(func() error {
return view.Run(ctx)
})
pollTimed(t, "view-recovered", 10, view.Recovered)

val := func() int64 {
val, err := view.Get("key")
test.AssertNil(t, err)
if val == nil {
return 0
}
return val.(int64)
}

pollTimed(t, "wait-first-value", 3, func() bool {
return val() > 0
})
firstVal := val()

time.Sleep(500 * time.Millisecond)

// kill kafka connection
fi.SetReadError(io.EOF)
pollTimed(t, "view-reconnecting", 10, func() bool {
return view.CurrentState() == goka.ViewStateConnecting
})

// the view still should have gotten the update before the EOF
secondVal := val()
test.AssertTrue(t, secondVal > firstVal)

// let some time pass -> the value should not have updated
time.Sleep(500 * time.Millisecond)
test.AssertTrue(t, val() == secondVal)

// connect kafka again, wait until it's running -> the value should have changed
fi.ResetErrors()
pollTimed(t, "view-running", 10, func() bool {
return view.CurrentState() == goka.ViewStateRunning
})
pollTimed(t, "view-running", 5, func() bool {
return val() > secondVal
})

// shut everything down
cancel()
test.AssertNil(t, errg.Wait().ErrorOrNil())
}
6 changes: 1 addition & 5 deletions topic_manager.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/multierr"
)

// TopicManager provides an interface to create/check topics and their partitions
@@ -106,10 +105,7 @@ func checkBroker(broker Broker, config *sarama.Config) error {
}

func (m *topicManager) Close() error {
errs := new(multierr.Errors)
errs.Collect(m.client.Close())

return errs.NilOrError()
return m.client.Close()
}

func (m *topicManager) Partitions(topic string) ([]int32, error) {
11 changes: 11 additions & 0 deletions view.go
Original file line number Diff line number Diff line change
@@ -331,6 +331,11 @@ func (v *View) Topic() string {
// Get can be called by multiple goroutines concurrently.
// Get can only be called after Recovered returns true.
func (v *View) Get(key string) (interface{}, error) {

if v.state.IsState(State(ViewStateIdle)) || v.state.IsState(State(ViewStateInitializing)) {
return nil, fmt.Errorf("View is either not running, not correctly initialized or stopped again. It's not safe to retrieve values")
}

// find partition where key is located
partTable, err := v.find(key)
if err != nil {
@@ -425,6 +430,12 @@ func (v *View) Evict(key string) error {

// Recovered returns true when the view has caught up with events from kafka.
func (v *View) Recovered() bool {
// no partitions --> never recover
// Otherwise we might mask errors of initializing the view
if len(v.partitions) == 0 {
return false
}

for _, p := range v.partitions {
if !p.IsRecovered() {
return false
10 changes: 8 additions & 2 deletions view_test.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,11 @@ func createTestView(t *testing.T, consumer sarama.Consumer) (*View, *builderMock
}
opts.builders.backoff = DefaultBackoffBuilder

view := &View{topic: viewTestTopic, opts: opts, log: opts.log}
view := &View{topic: viewTestTopic,
opts: opts,
log: opts.log,
state: newViewSignal(),
}
return view, bm, ctrl
}

@@ -185,6 +189,7 @@ func TestView_Get(t *testing.T) {
},
}
view.opts.tableCodec = &codec.Int64{}
view.state.SetState(State(ViewStateRunning))

bm.mst.EXPECT().Get(key).Return([]byte(strconv.FormatInt(value, 10)), nil)

@@ -211,7 +216,7 @@ func TestView_Get(t *testing.T) {
},
}
view.opts.tableCodec = &codec.Int64{}

view.state.SetState(State(ViewStateRunning))
bm.mst.EXPECT().Get(key).Return(nil, nil)

ret, err := view.Get(key)
@@ -238,6 +243,7 @@ func TestView_Get(t *testing.T) {
},
}
view.opts.tableCodec = &codec.Int64{}
view.state.SetState(State(ViewStateRunning))
bm.mst.EXPECT().Get(key).Return(nil, errRet)

_, err := view.Get(key)

0 comments on commit 6334df4

Please sign in to comment.