diff --git a/options.go b/options.go index b289e4e5..d93711e7 100644 --- a/options.go +++ b/options.go @@ -50,6 +50,8 @@ const ( defaultBackoffStep = 10 * time.Second // maximum duration to wait for the backoff defaultBackoffMax = 120 * time.Second + + defaultPPVisitChannelSize = 100 ) // DefaultProcessorStoragePath is the default path where processor state diff --git a/partition_processor.go b/partition_processor.go index a4f8f7fb..d994f846 100644 --- a/partition_processor.go +++ b/partition_processor.go @@ -156,7 +156,7 @@ func newPartitionProcessor(partition int32, joins: make(map[string]*PartitionTable), input: make(chan *message, opts.partitionChannelSize), inputTopics: topicList, - visitInput: make(chan *visit, 100), + visitInput: make(chan *visit, defaultPPVisitChannelSize), visitCallbacks: visitCallbacks, graph: graph, stats: newPartitionProcStats(topicList, outputList), @@ -693,12 +693,25 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta } var wg sync.WaitGroup - // drain the channel and set all items to done we have added. - // Otherwise the caller will wait forever on the waitgroup - drainVisitInput := func() { + + // drains the channel and drops out when closed. + // This is done when the processor shuts down during visit + // and makes sure the waitgroup is fully counted down. + drainUntilClose := func() { + for range pp.visitInput { + wg.Done() + } + } + + // drains the input channel until there are no more items. + // does not wait for close, because the channel stays open for the next visit + drainUntilEmpty := func() { for { select { - case <-pp.visitInput: + case _, ok := <-pp.visitInput: + if !ok { + return + } wg.Done() default: return @@ -717,11 +730,11 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta wg.Add(1) select { case <-stopping: - drainVisitInput() + drainUntilClose() wg.Done() return ErrVisitAborted case <-ctx.Done(): - drainVisitInput() + drainUntilEmpty() wg.Done() return ctx.Err() // enqueue the visit @@ -747,9 +760,10 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta }() select { case <-stopping: - drainVisitInput() + drainUntilClose() return ErrVisitAborted case <-ctx.Done(): + drainUntilEmpty() return ctx.Err() case <-wgDone: } diff --git a/systemtest/processor_visit_test.go b/systemtest/processor_visit_test.go index efd8f48d..896044ec 100644 --- a/systemtest/processor_visit_test.go +++ b/systemtest/processor_visit_test.go @@ -13,6 +13,12 @@ import ( "github.com/stretchr/testify/require" ) +// size of the channel used for the visitor, defined in goka/config.go +var ( + visitChannelSize = 100 + numPartitions = 10 +) + // TestProcessorVisit tests the visiting functionality. func TestProcessorVisit(t *testing.T) { brokers := initSystemTest(t) @@ -34,7 +40,7 @@ func TestProcessorVisit(t *testing.T) { } createEmitter := func(topic goka.Stream) (*goka.Emitter, func()) { - err = tm.EnsureStreamExists(string(topic), 10) + err = tm.EnsureStreamExists(string(topic), numPartitions) require.NoError(t, err) em, err := goka.NewEmitter(brokers, topic, new(codec.Int64), @@ -90,7 +96,7 @@ func TestProcessorVisit(t *testing.T) { pollTimed(t, "recovered", proc.Recovered) - em.EmitSync("value1", int64(1)) + _ = em.EmitSync("value1", int64(1)) pollTimed(t, "value-ok", func() bool { val1, _ := proc.Get("value1") @@ -114,7 +120,7 @@ func TestProcessorVisit(t *testing.T) { pollTimed(t, "recovered", proc.Recovered) - em.EmitSync("value1", int64(1)) + _ = em.EmitSync("value1", int64(1)) pollTimed(t, "value-ok", func() bool { val1, _ := proc.Get("value1") @@ -128,6 +134,90 @@ func TestProcessorVisit(t *testing.T) { require.Error(t, <-done) }) + // Tests if a panic occurs while visiting, while the iterator is still pushing + // messages into the partition processor's visit-channel. + // Regression test for https://github.com/lovoo/goka/issues/433 + t.Run("visit-panic-slow", func(t *testing.T) { + group, input := nextTopics() + em, finish := createEmitter(input) + defer finish() + proc, cancel, done := runProc(createProc(group, input, 500*time.Millisecond)) + + pollTimed(t, "recovered", proc.Recovered) + + // create twice as many items in the table as the visit-channel's size. + // This way we can make sure that the visitor will have to block on + // pushing it to the partition-processor visitInputChannel. + numMsgs := visitChannelSize * numPartitions * 2 + for i := 0; i < numMsgs; i++ { + _, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1)) + } + + // wait for all messages to have propagated + pollTimed(t, "value-ok", func() bool { + val1, _ := proc.Get(fmt.Sprintf("value-%d", numMsgs-1)) + return val1 != nil && val1.(int64) == 1 + }) + + // pass wrong type to visitor -> which will be passed to the visit --> will panic + require.Error(t, proc.VisitAll(context.Background(), "visitor", "asdf")) + + // no need to cancel, the visitAll will kill the processor. + _ = cancel + require.Error(t, <-done) + }) + + // Verifies a visit is gracefully shutdown when the processor is canceled while + // the visit is running. + t.Run("visit-shutdown-slow", func(t *testing.T) { + group, input := nextTopics() + em, finish := createEmitter(input) + defer finish() + proc, cancel, done := runProc(createProc(group, input, 1*time.Second)) + + pollTimed(t, "recovered", proc.Recovered) + + // create twice as many items in the table as the visit-channel's size. + // This way we can make sure that the visitor will have to block on + // pushing it to the partition-processor visitInputChannel. + numMsgs := visitChannelSize * numPartitions * 2 + for i := 0; i < numMsgs; i++ { + _, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1)) + } + + // wait for all messages to have propagated + pollTimed(t, "value-ok", func() bool { + val1, _ := proc.Get(fmt.Sprintf("value-%d", numMsgs-1)) + return val1 != nil && val1.(int64) == 1 + }) + + visitCtx, visitCancel := context.WithCancel(context.Background()) + defer visitCancel() + + var ( + visitErr error + visitDone = make(chan struct{}) + ) + + // start the visitor + go func() { + defer close(visitDone) + visitErr = proc.VisitAll(visitCtx, "visitor", int64(25)) + }() + + // wait half of what the processor takes for message to process, so we can stop it in the middle + time.Sleep(500 * time.Millisecond) + // stop the visit + visitCancel() + + // wait for visiting done + <-visitDone + require.ErrorContains(t, visitErr, "canceled") + + cancel() + require.NoError(t, <-done) + }) + t.Run("visit-shutdown", func(t *testing.T) { group, input := nextTopics() em, finish := createEmitter(input) @@ -138,8 +228,8 @@ func TestProcessorVisit(t *testing.T) { // emit two values where goka.DefaultHasher says they're in the same partition. // We need to achieve this to test that a shutdown will visit one value but not the other - em.EmitSync("0", int64(1)) - em.EmitSync("02", int64(1)) + _ = em.EmitSync("0", int64(1)) + _ = em.EmitSync("02", int64(1)) pollTimed(t, "value-ok", func() bool { val1, _ := proc.Get("02") @@ -196,7 +286,7 @@ func TestProcessorVisit(t *testing.T) { defer emFinish() // create the group table manually, otherwise the proc and the view are racing - tm.EnsureTableExists(string(goka.GroupTable(group)), 10) + _ = tm.EnsureTableExists(string(goka.GroupTable(group)), 10) // scenario: sleep in visit, processor shuts down--> visit should cancel too proc, cancel, done := runProc(createProc(group, input, 500*time.Millisecond)) view, viewCancel, viewDone := runView(createView(group)) @@ -207,7 +297,7 @@ func TestProcessorVisit(t *testing.T) { // emit two values where goka.DefaultHasher says they're in the same partition. // We need to achieve this to test that a shutdown will visit one value but not the other for i := 0; i < 100; i++ { - em.Emit(fmt.Sprintf("value-%d", i), int64(1)) + _, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1)) } // emFinish() @@ -251,7 +341,7 @@ func TestProcessorVisit(t *testing.T) { em, finish := createEmitter(input) defer finish() // create the group table manually, otherwise the proc and the view are racing - tm.EnsureTableExists(string(goka.GroupTable(group)), 10) + _ = tm.EnsureTableExists(string(goka.GroupTable(group)), 10) // scenario: sleep in visit, processor shuts down--> visit should cancel too proc1, cancel1, done1 := runProc(createProc(group, input, 500*time.Millisecond)) @@ -260,7 +350,7 @@ func TestProcessorVisit(t *testing.T) { // emit two values where goka.DefaultHasher says they're in the same partition. // We need to achieve this to test that a shutdown will visit one value but not the other for i := 0; i < 100; i++ { - em.Emit(fmt.Sprintf("value-%d", i), int64(1)) + _, _ = em.Emit(fmt.Sprintf("value-%d", i), int64(1)) } // poll until all values are there