Skip to content

Commit

Permalink
Merge pull request #361 from lovoo/failure-handling
Browse files Browse the repository at this point in the history
Improved failure handling fully remove multierr.Errors system test for testing connection errors bugfix issue 350 View.Get fails on reconnecting view bugfix processor freeze on error (334)
  • Loading branch information
frairon authored Nov 11, 2021
2 parents 9ff2ce3 + 6334df4 commit 150be40
Show file tree
Hide file tree
Showing 17 changed files with 731 additions and 209 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


test:
go test -race ./...

test-systemtest:
GOKA_SYSTEMTEST=y go test -v github.com/lovoo/goka/systemtest

test-all: test test-systemtest
10 changes: 5 additions & 5 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/multierr"
"github.com/hashicorp/go-multierror"
)

type emitter func(topic string, key string, value []byte, headers Headers) *Promise
Expand Down Expand Up @@ -177,7 +177,7 @@ type cbContext struct {
dones int
stores int
}
errors multierr.Errors
errors *multierror.Error
m sync.Mutex
wg *sync.WaitGroup
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (ctx *cbContext) start() {
// this function must be called from a locked function.
func (ctx *cbContext) tryCommit(err error) {
if err != nil {
_ = ctx.errors.Collect(err)
ctx.errors = multierror.Append(ctx.errors, err)
}

// not all calls are done yet, do not send the ack upstream.
Expand All @@ -447,8 +447,8 @@ func (ctx *cbContext) tryCommit(err error) {
}

// commit if no errors, otherwise fail context
if ctx.errors.HasErrors() {
ctx.asyncFailer(ctx.errors.NilOrError())
if ctx.errors.ErrorOrNil() != nil {
ctx.asyncFailer(ctx.errors.ErrorOrNil())
} else {
ctx.commit()
}
Expand Down
3 changes: 2 additions & 1 deletion context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestContext_DeferCommit_witherror(t *testing.T) {
doneFunc(fmt.Errorf("async error"))
// no commit, no ack, so we'll get the message again.
test.AssertEqual(t, ack, 0)
test.AssertEqual(t, ctx.errors.NilOrError().Error(), "async error")
test.AssertStringContains(t, ctx.errors.ErrorOrNil().Error(), "async error")
}

func TestContext_Timestamp(t *testing.T) {
Expand Down Expand Up @@ -584,6 +584,7 @@ func TestContext_Lookup(t *testing.T) {
msg: &message{key: key},
views: map[string]*View{
string(table): {
state: newViewSignal().SetState(State(ViewStateRunning)),
opts: &voptions{
tableCodec: c,
hasher: DefaultHasher(),
Expand Down
124 changes: 44 additions & 80 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/hashicorp/go-multierror"
"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
)
Expand All @@ -17,6 +18,8 @@ const (

// internal offset we use to detect if the offset has never been stored locally
offsetNotStored int64 = -3

consumerDrainTimeout = time.Second
)

// Backoff is used for adding backoff capabilities to the restarting
Expand Down Expand Up @@ -191,7 +194,6 @@ func (p *PartitionTable) Close() error {
func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, error) {
var (
err error
errs = new(multierr.Errors)
st storage.Storage
start = time.Now()
done = make(chan struct{})
Expand All @@ -217,9 +219,7 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
}
err = st.Open()
if err != nil {
errs.Collect(st.Close())
errs.Collect(fmt.Errorf("error opening storage: %v", err))
return nil, errs.NilOrError()
return nil, multierror.Append(st.Close(), fmt.Errorf("error opening storage: %v", err)).ErrorOrNil()
}

// close the db if context was cancelled before the builder returned
Expand All @@ -228,10 +228,9 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
err = st.Close()
// only collect context error if Close() errored out
if err != nil {
errs.Collect(err)
errs.Collect(ctx.Err())
return nil, multierror.Append(err, ctx.Err()).ErrorOrNil()
}
return nil, errs.NilOrError()
return nil, nil
default:
}

Expand Down Expand Up @@ -280,37 +279,25 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
storedOffset int64
partConsumer sarama.PartitionConsumer
err error
errs = new(multierr.Errors)
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// deferred error handling
defer func() {
errs.Collect(rerr)

rerr = errs.NilOrError()
return
}()

p.state.SetState(State(PartitionConnecting))

// fetch local offset
storedOffset, err = p.st.GetOffset(offsetNotStored)
if err != nil {
errs.Collect(fmt.Errorf("error reading local offset: %v", err))
return
return fmt.Errorf("error reading local offset: %v", err)
}

loadOffset, hwm, err := p.findOffsetToLoad(storedOffset)
if err != nil {
errs.Collect(err)
return
return err
}

if storedOffset > 0 && hwm == 0 {
errs.Collect(fmt.Errorf("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d", p.topic, p.partition, hwm, storedOffset))
return
return fmt.Errorf("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d", p.topic, p.partition, hwm, storedOffset)
}

if storedOffset >= hwm {
Expand All @@ -334,8 +321,7 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
// AND we're here for catchup, so let's stop here
// and do not attempt to load anything
if stopAfterCatchup && loadOffset >= hwm {
errs.Collect(p.markRecovered(ctx))
return
return p.markRecovered(ctx)
}

if stopAfterCatchup {
Expand All @@ -348,17 +334,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr

partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err))
return
return fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err)
}

// consume errors asynchronously
go p.handleConsumerErrors(ctx, errs, partConsumer)

// close the consumer
defer func() {
partConsumer.AsyncClose()
p.drainConsumer(partConsumer, errs)
rerr = multierror.Append(rerr, p.drainConsumer(partConsumer)).ErrorOrNil()
}()

if stopAfterCatchup {
Expand All @@ -371,15 +353,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup)

if loadErr != nil {
errs.Collect(loadErr)
return
return loadErr
}

if stopAfterCatchup {
errs.Collect(p.markRecovered(ctx))

now := time.Now()
p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = now })
err := p.markRecovered(ctx)
p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = time.Now() })
return err
}
return
}
Expand Down Expand Up @@ -424,43 +404,27 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
}
}

func (p *PartitionTable) handleConsumerErrors(ctx context.Context, errs *multierr.Errors, cons sarama.PartitionConsumer) {
for {
select {
case consError, ok := <-cons.Errors():
if !ok {
return
}
err := fmt.Errorf("Consumer error: %v", consError)
p.log.Printf("%v", err)
errs.Collect(err)
// if there's an error, close the consumer
cons.AsyncClose()
case <-ctx.Done():
return
}
}
}
func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {

func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *multierr.Errors) {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
defer cancel()

errg, ctx := multierr.NewErrGroup(ctx)
errg, _ := multierr.NewErrGroup(context.Background())

// drain errors channel
errg.Go(func() error {
var errs *multierror.Error

for {
select {
case <-ctx.Done():
case <-timeoutCtx.Done():
p.log.Printf("draining errors channel timed out")
return nil
return errs
case err, ok := <-cons.Errors():
if !ok {
return nil
return errs
}
errs.Collect(err)
errs = multierror.Append(errs, err)
}
}
})
Expand All @@ -469,7 +433,7 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
errg.Go(func() error {
for {
select {
case <-ctx.Done():
case <-timeoutCtx.Done():
p.log.Printf("draining messages channel timed out")
return nil
case _, ok := <-cons.Messages():
Expand All @@ -480,30 +444,31 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
}
})

errg.Wait()
return errg.Wait().ErrorOrNil()
}

func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) (rerr error) {
errs := new(multierr.Errors)

// deferred error handling
defer func() {
errs.Collect(rerr)

rerr = errs.NilOrError()
return
}()
func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) error {

stallTicker := time.NewTicker(p.stallPeriod)
defer stallTicker.Stop()

lastMessage := time.Now()

messages := cons.Messages()
errors := cons.Errors()

for {
select {
case msg, ok := <-cons.Messages():
case err, ok := <-errors:
if !ok {
return
return nil
}
if err != nil {
return err
}
case msg, ok := <-messages:
if !ok {
return nil
}

// This case is for the Tester to achieve synchronity.
Expand All @@ -521,8 +486,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition

lastMessage = time.Now()
if err := p.storeEvent(string(msg.Key), msg.Value, msg.Offset, msg.Headers); err != nil {
errs.Collect(fmt.Errorf("load: error updating storage: %v", err))
return
return fmt.Errorf("load: error updating storage: %v", err)
}

if stopAfterCatchup {
Expand All @@ -532,7 +496,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
p.enqueueStatsUpdate(ctx, func() { p.trackIncomingMessageStats(msg) })

if stopAfterCatchup && msg.Offset >= partitionHwm-1 {
return
return nil
}

case now := <-stallTicker.C:
Expand All @@ -543,7 +507,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
}

case <-ctx.Done():
return
return nil
}
}
}
Expand Down Expand Up @@ -711,7 +675,7 @@ func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.

func (p *PartitionTable) readyToRead() error {
pstate := p.CurrentState()
if pstate != PartitionRunning {
if pstate < PartitionConnecting {
return fmt.Errorf("Partition is not running (but %v) so it's not safe to read values", pstate)
}
return nil
Expand Down
Loading

0 comments on commit 150be40

Please sign in to comment.