diff --git a/processor.go b/processor.go index db35e1f7..97653ad7 100644 --- a/processor.go +++ b/processor.go @@ -10,6 +10,7 @@ import ( "github.com/IBM/sarama" "github.com/hashicorp/go-multierror" + "github.com/lovoo/goka/multierr" "github.com/lovoo/goka/storage" ) @@ -25,6 +26,8 @@ const ( ProcStateRunning // ProcStateStopping indicates a stopping partition processor ProcStateStopping + // ProcStateStopped indicates a stopped partition processor + ProcStateStopped ) // ProcessCallback function is called for every message received by the @@ -128,7 +131,7 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) graph: gg, - state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping).SetState(ProcStateIdle), + state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping, ProcStateStopped).SetState(ProcStateIdle), done: make(chan struct{}), } @@ -711,7 +714,7 @@ func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error { defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID()) g.state.SetState(ProcStateStopping) - defer g.state.SetState(ProcStateIdle) + defer g.state.SetState(ProcStateStopped) errg, _ := multierr.NewErrGroup(session.Context()) g.mTables.RLock() for part, partition := range g.partitions { @@ -926,6 +929,11 @@ func (g *Processor) Stop() { g.cancel() } +// Done returns a channel that is closed when the processor is stopped. +func (g *Processor) Done() <-chan struct{} { + return g.done +} + // VisitAllWithStats visits all keys in parallel by passing the visit request // to all partitions. // The optional argument "meta" will be forwarded to the visit-function of each key of the table. diff --git a/processor_test.go b/processor_test.go index 7ec69aff..7962fd92 100644 --- a/processor_test.go +++ b/processor_test.go @@ -10,9 +10,10 @@ import ( "github.com/IBM/sarama" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/lovoo/goka/codec" "github.com/lovoo/goka/storage" - "github.com/stretchr/testify/require" ) func createMockBuilder(t *testing.T) (*gomock.Controller, *builderMock) { @@ -324,3 +325,111 @@ func TestProcessor_StateReader(t *testing.T) { require.Equal(t, ProcStateRunning, p.StateReader().State()) } + +func TestProcessor_Stop(t *testing.T) { + t.Run("expected-state", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + Input("input", new(codec.Int64), func(ctx Context, msg interface{}) { + // Do nothing + }), + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + var ( + procErr error + done = make(chan struct{}) + ) + + go func() { + defer close(done) + procErr = newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // if there was an error during startup, no point in continuing + // and waiting for the processor to be stopped + select { + case <-done: + require.NoError(t, procErr) + default: + } + + require.Equal(t, ProcStateRunning, newProc.StateReader().State()) + + // shutdown + newProc.Stop() + <-done + + select { + case <-done: + require.Equal(t, ProcStateStopped, newProc.StateReader().State()) + require.NoError(t, procErr) + case <-time.After(10 * time.Second): + t.Errorf("processor did not shut down as expected") + } + }) +} + +func TestProcessor_Done(t *testing.T) { + t.Run("done-closes", func(t *testing.T) { + ctrl, bm := createMockBuilder(t) + defer ctrl.Finish() + + bm.tmgr.EXPECT().Close().Times(1) + bm.tmgr.EXPECT().Partitions(gomock.Any()).Return([]int32{0}, nil).Times(1) + bm.producer.EXPECT().Close().Times(1) + + groupBuilder, _ := createTestConsumerGroupBuilder(t) + consBuilder, _ := createTestConsumerBuilder(t) + + graph := DefineGroup("test", + Input("input", new(codec.Int64), func(ctx Context, msg interface{}) { + // Do nothing + }), + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + defer cancel() + + newProc, err := NewProcessor([]string{"localhost:9092"}, graph, + bm.createProcessorOptions(consBuilder, groupBuilder)..., + ) + require.NoError(t, err) + var ( + procErr error + ) + + go func() { + procErr = newProc.Run(ctx) + }() + + newProc.WaitForReady() + + // shutdown + newProc.Stop() + + select { + case <-newProc.Done(): + require.NoError(t, procErr) + case <-time.After(10 * time.Second): + t.Errorf("processor did not shut down as expected") + } + }) +}