Skip to content

Commit

Permalink
issue #433: bugfix on message drain for panic in visits (#434)
Browse files Browse the repository at this point in the history
* issue 433: bugfix on message drain for panic in visits
  • Loading branch information
frairon authored Sep 27, 2023
1 parent 185d24a commit 3a79a99
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 17 deletions.
2 changes: 2 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
defaultBackoffStep = 10 * time.Second
// maximum duration to wait for the backoff
defaultBackoffMax = 120 * time.Second

defaultPPVisitChannelSize = 100
)

// DefaultProcessorStoragePath is the default path where processor state
Expand Down
30 changes: 22 additions & 8 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func newPartitionProcessor(partition int32,
joins: make(map[string]*PartitionTable),
input: make(chan *message, opts.partitionChannelSize),
inputTopics: topicList,
visitInput: make(chan *visit, 100),
visitInput: make(chan *visit, defaultPPVisitChannelSize),
visitCallbacks: visitCallbacks,
graph: graph,
stats: newPartitionProcStats(topicList, outputList),
Expand Down Expand Up @@ -693,12 +693,25 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}

var wg sync.WaitGroup
// drain the channel and set all items to done we have added.
// Otherwise the caller will wait forever on the waitgroup
drainVisitInput := func() {

// drains the channel and drops out when closed.
// This is done when the processor shuts down during visit
// and makes sure the waitgroup is fully counted down.
drainUntilClose := func() {
for range pp.visitInput {
wg.Done()
}
}

// drains the input channel until there are no more items.
// does not wait for close, because the channel stays open for the next visit
drainUntilEmpty := func() {
for {
select {
case <-pp.visitInput:
case _, ok := <-pp.visitInput:
if !ok {
return
}
wg.Done()
default:
return
Expand All @@ -717,11 +730,11 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
wg.Add(1)
select {
case <-stopping:
drainVisitInput()
drainUntilClose()
wg.Done()
return ErrVisitAborted
case <-ctx.Done():
drainVisitInput()
drainUntilEmpty()
wg.Done()
return ctx.Err()
// enqueue the visit
Expand All @@ -747,9 +760,10 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}()
select {
case <-stopping:
drainVisitInput()
drainUntilClose()
return ErrVisitAborted
case <-ctx.Done():
drainUntilEmpty()
return ctx.Err()
case <-wgDone:
}
Expand Down
108 changes: 99 additions & 9 deletions systemtest/processor_visit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/stretchr/testify/require"
)

// size of the channel used for the visitor, defined in goka/config.go
var (
visitChannelSize = 100
numPartitions = 10
)

// TestProcessorVisit tests the visiting functionality.
func TestProcessorVisit(t *testing.T) {
brokers := initSystemTest(t)
Expand All @@ -34,7 +40,7 @@ func TestProcessorVisit(t *testing.T) {
}

createEmitter := func(topic goka.Stream) (*goka.Emitter, func()) {
err = tm.EnsureStreamExists(string(topic), 10)
err = tm.EnsureStreamExists(string(topic), numPartitions)
require.NoError(t, err)

em, err := goka.NewEmitter(brokers, topic, new(codec.Int64),
Expand Down Expand Up @@ -90,7 +96,7 @@ func TestProcessorVisit(t *testing.T) {

pollTimed(t, "recovered", proc.Recovered)

em.EmitSync("value1", int64(1))
_ = em.EmitSync("value1", int64(1))

pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get("value1")
Expand All @@ -114,7 +120,7 @@ func TestProcessorVisit(t *testing.T) {

pollTimed(t, "recovered", proc.Recovered)

em.EmitSync("value1", int64(1))
_ = em.EmitSync("value1", int64(1))

pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get("value1")
Expand All @@ -128,6 +134,90 @@ func TestProcessorVisit(t *testing.T) {
require.Error(t, <-done)
})

// Tests if a panic occurs while visiting, while the iterator is still pushing
// messages into the partition processor's visit-channel.
// Regression test for https://github.com/lovoo/goka/issues/433
t.Run("visit-panic-slow", func(t *testing.T) {
group, input := nextTopics()
em, finish := createEmitter(input)
defer finish()
proc, cancel, done := runProc(createProc(group, input, 500*time.Millisecond))

pollTimed(t, "recovered", proc.Recovered)

// create twice as many items in the table as the visit-channel's size.
// This way we can make sure that the visitor will have to block on
// pushing it to the partition-processor visitInputChannel.
numMsgs := visitChannelSize * numPartitions * 2
for i := 0; i < numMsgs; i++ {
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}

// wait for all messages to have propagated
pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get(fmt.Sprintf("value-%d", numMsgs-1))
return val1 != nil && val1.(int64) == 1
})

// pass wrong type to visitor -> which will be passed to the visit --> will panic
require.Error(t, proc.VisitAll(context.Background(), "visitor", "asdf"))

// no need to cancel, the visitAll will kill the processor.
_ = cancel
require.Error(t, <-done)
})

// Verifies a visit is gracefully shutdown when the processor is canceled while
// the visit is running.
t.Run("visit-shutdown-slow", func(t *testing.T) {
group, input := nextTopics()
em, finish := createEmitter(input)
defer finish()
proc, cancel, done := runProc(createProc(group, input, 1*time.Second))

pollTimed(t, "recovered", proc.Recovered)

// create twice as many items in the table as the visit-channel's size.
// This way we can make sure that the visitor will have to block on
// pushing it to the partition-processor visitInputChannel.
numMsgs := visitChannelSize * numPartitions * 2
for i := 0; i < numMsgs; i++ {
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}

// wait for all messages to have propagated
pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get(fmt.Sprintf("value-%d", numMsgs-1))
return val1 != nil && val1.(int64) == 1
})

visitCtx, visitCancel := context.WithCancel(context.Background())
defer visitCancel()

var (
visitErr error
visitDone = make(chan struct{})
)

// start the visitor
go func() {
defer close(visitDone)
visitErr = proc.VisitAll(visitCtx, "visitor", int64(25))
}()

// wait half of what the processor takes for message to process, so we can stop it in the middle
time.Sleep(500 * time.Millisecond)
// stop the visit
visitCancel()

// wait for visiting done
<-visitDone
require.ErrorContains(t, visitErr, "canceled")

cancel()
require.NoError(t, <-done)
})

t.Run("visit-shutdown", func(t *testing.T) {
group, input := nextTopics()
em, finish := createEmitter(input)
Expand All @@ -138,8 +228,8 @@ func TestProcessorVisit(t *testing.T) {

// emit two values where goka.DefaultHasher says they're in the same partition.
// We need to achieve this to test that a shutdown will visit one value but not the other
em.EmitSync("0", int64(1))
em.EmitSync("02", int64(1))
_ = em.EmitSync("0", int64(1))
_ = em.EmitSync("02", int64(1))

pollTimed(t, "value-ok", func() bool {
val1, _ := proc.Get("02")
Expand Down Expand Up @@ -196,7 +286,7 @@ func TestProcessorVisit(t *testing.T) {
defer emFinish()
// create the group table manually, otherwise the proc and the view are racing

tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
_ = tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
// scenario: sleep in visit, processor shuts down--> visit should cancel too
proc, cancel, done := runProc(createProc(group, input, 500*time.Millisecond))
view, viewCancel, viewDone := runView(createView(group))
Expand All @@ -207,7 +297,7 @@ func TestProcessorVisit(t *testing.T) {
// emit two values where goka.DefaultHasher says they're in the same partition.
// We need to achieve this to test that a shutdown will visit one value but not the other
for i := 0; i < 100; i++ {
em.Emit(fmt.Sprintf("value-%d", i), int64(1))
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}
// emFinish()

Expand Down Expand Up @@ -251,7 +341,7 @@ func TestProcessorVisit(t *testing.T) {
em, finish := createEmitter(input)
defer finish()
// create the group table manually, otherwise the proc and the view are racing
tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
_ = tm.EnsureTableExists(string(goka.GroupTable(group)), 10)
// scenario: sleep in visit, processor shuts down--> visit should cancel too
proc1, cancel1, done1 := runProc(createProc(group, input, 500*time.Millisecond))

Expand All @@ -260,7 +350,7 @@ func TestProcessorVisit(t *testing.T) {
// emit two values where goka.DefaultHasher says they're in the same partition.
// We need to achieve this to test that a shutdown will visit one value but not the other
for i := 0; i < 100; i++ {
em.Emit(fmt.Sprintf("value-%d", i), int64(1))
_, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1))
}

// poll until all values are there
Expand Down

0 comments on commit 3a79a99

Please sign in to comment.