Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved failure handling fully remove multierr.Errors system test for testing connection errors bugfix issue 350 View.Get fails on reconnecting view bugfix processor freeze on error (334) #361

Merged
merged 1 commit into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


test:
go test -race ./...

test-systemtest:
GOKA_SYSTEMTEST=y go test -v github.com/lovoo/goka/systemtest

test-all: test test-systemtest
10 changes: 5 additions & 5 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/lovoo/goka/multierr"
"github.com/hashicorp/go-multierror"
)

type emitter func(topic string, key string, value []byte, headers Headers) *Promise
Expand Down Expand Up @@ -177,7 +177,7 @@ type cbContext struct {
dones int
stores int
}
errors multierr.Errors
errors *multierror.Error
m sync.Mutex
wg *sync.WaitGroup
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (ctx *cbContext) start() {
// this function must be called from a locked function.
func (ctx *cbContext) tryCommit(err error) {
if err != nil {
_ = ctx.errors.Collect(err)
ctx.errors = multierror.Append(ctx.errors, err)
}

// not all calls are done yet, do not send the ack upstream.
Expand All @@ -447,8 +447,8 @@ func (ctx *cbContext) tryCommit(err error) {
}

// commit if no errors, otherwise fail context
if ctx.errors.HasErrors() {
ctx.asyncFailer(ctx.errors.NilOrError())
if ctx.errors.ErrorOrNil() != nil {
ctx.asyncFailer(ctx.errors.ErrorOrNil())
} else {
ctx.commit()
}
Expand Down
3 changes: 2 additions & 1 deletion context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestContext_DeferCommit_witherror(t *testing.T) {
doneFunc(fmt.Errorf("async error"))
// no commit, no ack, so we'll get the message again.
test.AssertEqual(t, ack, 0)
test.AssertEqual(t, ctx.errors.NilOrError().Error(), "async error")
test.AssertStringContains(t, ctx.errors.ErrorOrNil().Error(), "async error")
}

func TestContext_Timestamp(t *testing.T) {
Expand Down Expand Up @@ -584,6 +584,7 @@ func TestContext_Lookup(t *testing.T) {
msg: &message{key: key},
views: map[string]*View{
string(table): {
state: newViewSignal().SetState(State(ViewStateRunning)),
opts: &voptions{
tableCodec: c,
hasher: DefaultHasher(),
Expand Down
124 changes: 44 additions & 80 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/Shopify/sarama"
"github.com/hashicorp/go-multierror"
"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
)
Expand All @@ -17,6 +18,8 @@ const (

// internal offset we use to detect if the offset has never been stored locally
offsetNotStored int64 = -3

consumerDrainTimeout = time.Second
)

// Backoff is used for adding backoff capabilities to the restarting
Expand Down Expand Up @@ -191,7 +194,6 @@ func (p *PartitionTable) Close() error {
func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, error) {
var (
err error
errs = new(multierr.Errors)
st storage.Storage
start = time.Now()
done = make(chan struct{})
Expand All @@ -217,9 +219,7 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
}
err = st.Open()
if err != nil {
errs.Collect(st.Close())
errs.Collect(fmt.Errorf("error opening storage: %v", err))
return nil, errs.NilOrError()
return nil, multierror.Append(st.Close(), fmt.Errorf("error opening storage: %v", err)).ErrorOrNil()
}

// close the db if context was cancelled before the builder returned
Expand All @@ -228,10 +228,9 @@ func (p *PartitionTable) createStorage(ctx context.Context) (*storageProxy, erro
err = st.Close()
// only collect context error if Close() errored out
if err != nil {
errs.Collect(err)
errs.Collect(ctx.Err())
return nil, multierror.Append(err, ctx.Err()).ErrorOrNil()
}
return nil, errs.NilOrError()
return nil, nil
default:
}

Expand Down Expand Up @@ -280,37 +279,25 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
storedOffset int64
partConsumer sarama.PartitionConsumer
err error
errs = new(multierr.Errors)
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// deferred error handling
defer func() {
errs.Collect(rerr)

rerr = errs.NilOrError()
return
}()

p.state.SetState(State(PartitionConnecting))

// fetch local offset
storedOffset, err = p.st.GetOffset(offsetNotStored)
if err != nil {
errs.Collect(fmt.Errorf("error reading local offset: %v", err))
return
return fmt.Errorf("error reading local offset: %v", err)
}

loadOffset, hwm, err := p.findOffsetToLoad(storedOffset)
if err != nil {
errs.Collect(err)
return
return err
}

if storedOffset > 0 && hwm == 0 {
errs.Collect(fmt.Errorf("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d", p.topic, p.partition, hwm, storedOffset))
return
return fmt.Errorf("kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic %s, partition %d, hwm %d, local offset %d", p.topic, p.partition, hwm, storedOffset)
}

if storedOffset >= hwm {
Expand All @@ -334,8 +321,7 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
// AND we're here for catchup, so let's stop here
// and do not attempt to load anything
if stopAfterCatchup && loadOffset >= hwm {
errs.Collect(p.markRecovered(ctx))
return
return p.markRecovered(ctx)
}

if stopAfterCatchup {
Expand All @@ -348,17 +334,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr

partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err))
return
return fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err)
}

// consume errors asynchronously
go p.handleConsumerErrors(ctx, errs, partConsumer)

// close the consumer
defer func() {
partConsumer.AsyncClose()
p.drainConsumer(partConsumer, errs)
rerr = multierror.Append(rerr, p.drainConsumer(partConsumer)).ErrorOrNil()
}()

if stopAfterCatchup {
Expand All @@ -371,15 +353,13 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup)

if loadErr != nil {
errs.Collect(loadErr)
return
return loadErr
}

if stopAfterCatchup {
errs.Collect(p.markRecovered(ctx))

now := time.Now()
p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = now })
err := p.markRecovered(ctx)
p.enqueueStatsUpdate(ctx, func() { p.stats.Recovery.RecoveryTime = time.Now() })
return err
}
return
}
Expand Down Expand Up @@ -424,43 +404,27 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
}
}

func (p *PartitionTable) handleConsumerErrors(ctx context.Context, errs *multierr.Errors, cons sarama.PartitionConsumer) {
for {
select {
case consError, ok := <-cons.Errors():
if !ok {
return
}
err := fmt.Errorf("Consumer error: %v", consError)
p.log.Printf("%v", err)
errs.Collect(err)
// if there's an error, close the consumer
cons.AsyncClose()
case <-ctx.Done():
return
}
}
}
func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {

func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *multierr.Errors) {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
defer cancel()

errg, ctx := multierr.NewErrGroup(ctx)
errg, _ := multierr.NewErrGroup(context.Background())

// drain errors channel
errg.Go(func() error {
var errs *multierror.Error

for {
select {
case <-ctx.Done():
case <-timeoutCtx.Done():
p.log.Printf("draining errors channel timed out")
return nil
return errs
mmreza79 marked this conversation as resolved.
Show resolved Hide resolved
case err, ok := <-cons.Errors():
if !ok {
return nil
return errs
}
errs.Collect(err)
errs = multierror.Append(errs, err)
}
}
})
Expand All @@ -469,7 +433,7 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
errg.Go(func() error {
for {
select {
case <-ctx.Done():
case <-timeoutCtx.Done():
p.log.Printf("draining messages channel timed out")
return nil
case _, ok := <-cons.Messages():
Expand All @@ -480,30 +444,31 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer, errs *mult
}
})

errg.Wait()
return errg.Wait().ErrorOrNil()
}

func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) (rerr error) {
errs := new(multierr.Errors)

// deferred error handling
defer func() {
errs.Collect(rerr)

rerr = errs.NilOrError()
return
}()
func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) error {

stallTicker := time.NewTicker(p.stallPeriod)
defer stallTicker.Stop()

lastMessage := time.Now()

messages := cons.Messages()
errors := cons.Errors()

for {
select {
case msg, ok := <-cons.Messages():
case err, ok := <-errors:
frairon marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return
return nil
}
if err != nil {
return err
}
case msg, ok := <-messages:
if !ok {
return nil
}

// This case is for the Tester to achieve synchronity.
Expand All @@ -521,8 +486,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition

lastMessage = time.Now()
if err := p.storeEvent(string(msg.Key), msg.Value, msg.Offset, msg.Headers); err != nil {
errs.Collect(fmt.Errorf("load: error updating storage: %v", err))
return
return fmt.Errorf("load: error updating storage: %v", err)
}

if stopAfterCatchup {
Expand All @@ -532,7 +496,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
p.enqueueStatsUpdate(ctx, func() { p.trackIncomingMessageStats(msg) })

if stopAfterCatchup && msg.Offset >= partitionHwm-1 {
return
return nil
}

case now := <-stallTicker.C:
Expand All @@ -543,7 +507,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
}

case <-ctx.Done():
return
return nil
}
}
}
Expand Down Expand Up @@ -711,7 +675,7 @@ func (p *PartitionTable) IteratorWithRange(start []byte, limit []byte) (storage.

func (p *PartitionTable) readyToRead() error {
pstate := p.CurrentState()
if pstate != PartitionRunning {
if pstate < PartitionConnecting {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually fixes bug #350

return fmt.Errorf("Partition is not running (but %v) so it's not safe to read values", pstate)
}
return nil
Expand Down
Loading