diff --git a/MIGRATION.md b/MIGRATION.md index 177af202..8bc24ae0 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -39,3 +39,19 @@ go func(){ cancel() ``` + + +## Offset bug in local storage +In 0.1.4 there was a bug that caused the table offset being stored in the local cache always be +1 compared the actual offset stored in kafka. +A second bug kind of evened it out so it never was an issue. + +From 0.9.x, both bugs are fixed. However, if you upgrade goka and restart a processor using the same cache files that were maintained by the old version you'll see a warning like this +``` +Error: local offset is higher than partition offset. topic some-topic, partition 0, hwm 1312, local offset 1314. This can have several reasons: +(1) The kafka topic storing the table is gone --> delete the local cache and restart! +(2) the processor crashed last time while writing to disk. +(3) You found a bug! +``` +This is because goka sees an offset that it is not expecting. +You should see this error only once per partition and processor. The offset will be fixed automatically. If it appears on every start or regularily, it might actually a bug or some error and should be further investigated +(or reported to goka :)). \ No newline at end of file diff --git a/examples/6-reconnecting-view/README.md b/examples/6-reconnecting-view/README.md new file mode 100644 index 00000000..8f1df471 --- /dev/null +++ b/examples/6-reconnecting-view/README.md @@ -0,0 +1,10 @@ +## Reconnecting View + +This example shows a reconnecting view by observing the state changes. +Run a local Kafka cluster by calling `make start` in folder `examples/`. + +Then run this example (`go run 6-reconnecting-views/main.go`). +You should see the view state changes upon starting. + +Now kill the kafka cluster `make stop`, you should see some error messages and the view +trying to reconnect using a default backoff \ No newline at end of file diff --git a/examples/6-reconnecting-view/main.go b/examples/6-reconnecting-view/main.go new file mode 100644 index 00000000..65e62db8 --- /dev/null +++ b/examples/6-reconnecting-view/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" +) + +func main() { + + view, err := goka.NewView( + // connect to example kafka cluster + []string{"localhost:9092"}, + // name does not matter, table will be empty + "restartable-view-test-table", + // codec doesn't matter, the table will be empty + new(codec.String), + // start the view autoconnecting + goka.WithViewAutoReconnect(), + ) + if err != nil { + log.Fatalf("Cannot create view: %v", err) + } + // context we'll use to run the view and the state change observer + ctx, cancel := context.WithCancel(context.Background()) + + // channel used to wait for the view to finish + done := make(chan struct{}) + go func() { + defer close(done) + err := view.Run(ctx) + if err != nil { + log.Printf("View finished with error: %v", err) + } + }() + + // Get a state change observer and + go func() { + obs := view.ObserveStateChanges() + defer obs.Stop() + for { + select { + case state, ok := <-obs.C(): + if !ok { + return + } + log.Printf("View is in state: %v", goka.ViewState(state)) + case <-ctx.Done(): + return + } + } + }() + + go func() { + waiter := make(chan os.Signal, 1) + signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM) + <-waiter + cancel() + }() + + <-done +} diff --git a/partition_processor.go b/partition_processor.go index 8992ae90..aa17aaae 100644 --- a/partition_processor.go +++ b/partition_processor.go @@ -182,10 +182,11 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error { defer pp.state.SetState(PPStateRunning) if pp.table != nil { + go pp.table.RunStatsLoop(runnerCtx) setupErrg.Go(func() error { pp.log.Debugf("catching up table") defer pp.log.Debugf("catching up table done") - return pp.table.SetupAndRecover(setupCtx) + return pp.table.SetupAndRecover(setupCtx, false) }) } @@ -202,8 +203,9 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error { ) pp.joins[join.Topic()] = table + go table.RunStatsLoop(runnerCtx) setupErrg.Go(func() error { - return table.SetupAndRecover(setupCtx) + return table.SetupAndRecover(setupCtx, false) }) } @@ -358,6 +360,10 @@ func (pp *PartitionProcessor) enqueueStatsUpdate(ctx context.Context, updater fu select { case pp.updateStats <- updater: case <-ctx.Done(): + default: + // going to default indicates the updateStats channel is not read, so so the stats + // loop is not actually running. + // We must not block here, so we'll skip the update } } diff --git a/partition_table.go b/partition_table.go index 2b04043c..770452fb 100644 --- a/partition_table.go +++ b/partition_table.go @@ -42,11 +42,10 @@ type PartitionTable struct { tmgr TopicManager updateCallback UpdateCallback - stats *TableStats - cancelStatsLoop context.CancelFunc - requestStats chan bool - responseStats chan *TableStats - updateStats chan func() + stats *TableStats + requestStats chan bool + responseStats chan *TableStats + updateStats chan func() offsetM sync.Mutex // current offset @@ -65,6 +64,7 @@ func newPartitionTableState() *Signal { return NewSignal( State(PartitionStopped), State(PartitionInitializing), + State(PartitionConnecting), State(PartitionRecovering), State(PartitionPreparing), State(PartitionRunning), @@ -81,8 +81,6 @@ func newPartitionTable(topic string, backoff Backoff, backoffResetTimeout time.Duration) *PartitionTable { - statsLoopCtx, cancel := context.WithCancel(context.Background()) - pt := &PartitionTable{ partition: partition, state: newPartitionTableState(), @@ -95,23 +93,21 @@ func newPartitionTable(topic string, stallPeriod: defaultStallPeriod, stalledTimeout: defaultStalledTimeout, - stats: newTableStats(), - requestStats: make(chan bool), - responseStats: make(chan *TableStats, 1), - updateStats: make(chan func(), 10), - cancelStatsLoop: cancel, + stats: newTableStats(), + requestStats: make(chan bool), + responseStats: make(chan *TableStats, 1), + updateStats: make(chan func(), 10), backoff: backoff, backoffResetTimeout: backoffResetTimeout, } - go pt.runStatsLoop(statsLoopCtx) - return pt } // SetupAndRecover sets up the partition storage and recovers to HWM -func (p *PartitionTable) SetupAndRecover(ctx context.Context) error { +func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error { + err := p.setup(ctx) if err != nil { return err @@ -125,6 +121,9 @@ func (p *PartitionTable) SetupAndRecover(ctx context.Context) error { default: } + if restartOnError { + return p.loadRestarting(ctx, true) + } return p.load(ctx, true) } @@ -132,27 +131,43 @@ func (p *PartitionTable) SetupAndRecover(ctx context.Context) error { // Option restartOnError allows the view to stay open/intact even in case of consumer errors func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error { if restartOnError { - var resetTimer *time.Timer - for { - err := p.load(ctx, false) - if err != nil { - p.log.Printf("Error while catching up, but we'll try to keep it running: %v", err) + return p.loadRestarting(ctx, false) + } + return p.load(ctx, false) +} - if resetTimer != nil { - resetTimer.Stop() - } - resetTimer = time.AfterFunc(p.backoffResetTimeout, p.backoff.Reset) - } +func (p *PartitionTable) loadRestarting(ctx context.Context, stopAfterCatchup bool) error { + var ( + resetTimer *time.Timer + retries int + ) - select { - case <-ctx.Done(): - return nil + for { + err := p.load(ctx, stopAfterCatchup) + if err != nil { + p.log.Printf("Error while starting up: %v", err) - case <-time.After(p.backoff.Duration()): + retries++ + if resetTimer != nil { + resetTimer.Stop() } + resetTimer = time.AfterFunc(p.backoffResetTimeout, func() { + p.backoff.Reset() + retries = 0 + }) + } else { + return nil + } + + retryDuration := p.backoff.Duration() + p.log.Printf("Will retry in %.0f seconds (retried %d times so far)", retryDuration.Seconds(), retries) + select { + case <-ctx.Done(): + return nil + + case <-time.After(retryDuration): } } - return p.load(ctx, false) } // Setup creates the storage for the partition table @@ -202,7 +217,7 @@ WaitLoop: case <-ticker.C: p.log.Printf("creating storage for topic %s/%d for %.1f minutes ...", p.topic, p.partition, time.Since(start).Minutes()) case <-done: - p.log.Printf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes()) + p.log.Debugf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes()) if err != nil { return nil, fmt.Errorf("error building storage: %v", err) } @@ -267,6 +282,8 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr return }() + p.state.SetState(State(PartitionConnecting)) + // fetch local offset storedOffset, err = p.st.GetOffset(offsetNotStored) if err != nil { @@ -318,10 +335,6 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr defer p.log.Debugf("... Loading done") - if stopAfterCatchup { - p.state.SetState(State(PartitionRecovering)) - } - partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset) if err != nil { errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err)) @@ -337,6 +350,12 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr p.drainConsumer(partConsumer, errs) }() + if stopAfterCatchup { + p.state.SetState(State(PartitionRecovering)) + } else { + p.state.SetState(State(PartitionRunning)) + } + // load messages and stop when you're at HWM loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup) @@ -354,6 +373,10 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr return } +func (p *PartitionTable) observeStateChanges() *StateChangeObserver { + return p.state.ObserveStateChange() +} + func (p *PartitionTable) markRecovered(ctx context.Context) error { var ( start = time.Now() @@ -520,10 +543,17 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func()) select { case p.updateStats <- updater: case <-ctx.Done(): + default: + // going to default indicates the updateStats channel is not read, so so the stats + // loop is not actually running. + // We must not block here, so we'll skip the update } } -func (p *PartitionTable) runStatsLoop(ctx context.Context) { +// RunStatsLoop starts the handler for stats requests. This loop runs detached from the +// recover/catchup mechanism so clients can always request stats even if the partition table is not +// running (like a processor table after it's recovered). +func (p *PartitionTable) RunStatsLoop(ctx context.Context) { updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval) defer updateHwmStatsTicker.Stop() @@ -617,6 +647,11 @@ func (p *PartitionTable) IsRecovered() bool { return p.state.IsState(State(PartitionRunning)) } +// CurrentState returns the partition's current status +func (p *PartitionTable) CurrentState() PartitionStatus { + return PartitionStatus(p.state.State()) +} + // WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning` func (p *PartitionTable) WaitRecovered() chan struct{} { return p.state.WaitForState(State(PartitionRunning)) diff --git a/partition_table_test.go b/partition_table_test.go index ab4a2d1d..e64a6cb0 100644 --- a/partition_table_test.go +++ b/partition_table_test.go @@ -895,7 +895,7 @@ func TestPT_SetupAndCatchupToHwm(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - err := pt.SetupAndRecover(ctx) + err := pt.SetupAndRecover(ctx, false) test.AssertNil(t, err) test.AssertTrue(t, count == msgsToRecover) }) @@ -920,7 +920,7 @@ func TestPT_SetupAndCatchupToHwm(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - err := pt.SetupAndRecover(ctx) + err := pt.SetupAndRecover(ctx, false) test.AssertNotNil(t, err) }) } @@ -973,7 +973,7 @@ func TestPT_SetupAndCatchupForever(t *testing.T) { } }() - err := pt.SetupAndRecover(ctx) + err := pt.SetupAndRecover(ctx, false) test.AssertNil(t, err) cancel() }) @@ -998,7 +998,7 @@ func TestPT_SetupAndCatchupForever(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - err := pt.SetupAndRecover(ctx) + err := pt.SetupAndRecover(ctx, false) test.AssertNotNil(t, err) cancel() }) diff --git a/processor.go b/processor.go index 88927cfc..84c99239 100644 --- a/processor.go +++ b/processor.go @@ -438,8 +438,8 @@ func (g *Processor) assignmentFromSession(session sarama.ConsumerGroupSession) ( func (g *Processor) Setup(session sarama.ConsumerGroupSession) error { g.state.SetState(ProcStateSetup) defer g.state.SetState(ProcStateRunning) - g.log.Printf("setup generation %d, claims=%#v", session.GenerationID(), session.Claims()) - defer g.log.Printf("setup generation %d ... done", session.GenerationID()) + g.log.Debugf("setup generation %d, claims=%#v", session.GenerationID(), session.Claims()) + defer g.log.Debugf("setup generation %d ... done", session.GenerationID()) assignment, err := g.assignmentFromSession(session) if err != nil { @@ -479,8 +479,8 @@ func (g *Processor) Setup(session sarama.ConsumerGroupSession) error { // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited // but before the offsets are committed for the very last time. func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error { - g.log.Printf("Cleaning up for %d", session.GenerationID()) - defer g.log.Printf("Cleaning up for %d ... done", session.GenerationID()) + g.log.Debugf("Cleaning up for %d", session.GenerationID()) + defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID()) g.state.SetState(ProcStateStopping) defer g.state.SetState(ProcStateIdle) @@ -525,8 +525,8 @@ func (g *Processor) WaitForReady() { // Once the Messages() channel is closed, the Handler must finish its processing // loop and exit. func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - g.log.Printf("ConsumeClaim for topic/partition %s/%d, initialOffset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) - defer g.log.Printf("ConsumeClaim done for topic/partition %s/%d", claim.Topic(), claim.Partition()) + g.log.Debugf("ConsumeClaim for topic/partition %s/%d, initialOffset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) + defer g.log.Debugf("ConsumeClaim done for topic/partition %s/%d", claim.Topic(), claim.Partition()) part, has := g.partitions[claim.Partition()] if !has { return fmt.Errorf("No partition (%d) to handle input in topic %s", claim.Partition(), claim.Topic()) diff --git a/signal.go b/signal.go index 81eead81..ee2e1fa1 100644 --- a/signal.go +++ b/signal.go @@ -17,10 +17,11 @@ type waiter struct { // Signal allows synchronization on a state, waiting for that state and checking // the current state type Signal struct { - m sync.Mutex - state State - waiters []*waiter - allowedStates map[State]bool + m sync.RWMutex + state State + waiters []*waiter + stateChangeObservers []*StateChangeObserver + allowedStates map[State]bool } // NewSignal creates a new Signal based on the states @@ -44,8 +45,14 @@ func (s *Signal) SetState(state State) *Signal { panic(fmt.Errorf("trying to set illegal state %v", state)) } + // if we're already in the state, do not notify anyone + if s.state == state { + return s + } + // set the state and notify all channels waiting for it. s.state = state + var newWaiters []*waiter for _, w := range s.waiters { if w.state == state || (w.minState && state >= w.state) { @@ -56,16 +63,25 @@ func (s *Signal) SetState(state State) *Signal { } s.waiters = newWaiters + // notify the state change observers + for _, obs := range s.stateChangeObservers { + obs.notify(state) + } + return s } // IsState returns if the signal is in the requested state func (s *Signal) IsState(state State) bool { + s.m.RLock() + defer s.m.RUnlock() return s.state == state } // State returns the current state func (s *Signal) State() State { + s.m.RLock() + defer s.m.RUnlock() return s.state } @@ -88,7 +104,7 @@ func (s *Signal) WaitForStateMin(state State) chan struct{} { // state. func (s *Signal) WaitForState(state State) chan struct{} { s.m.Lock() - defer s.m.Unlock() + s.m.Unlock() w := &waiter{ done: make(chan struct{}), @@ -102,7 +118,7 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} { // if the signal is currently in that state (or in a higher state if minState is set) // then close the waiter immediately - if curState := s.State(); state == curState || (w.minState && curState >= state) { + if curState := s.state; state == curState || (w.minState && curState >= state) { close(w.done) } else { s.waiters = append(s.waiters, w) @@ -110,3 +126,67 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} { return w.done } + +// StateChangeObserver wraps a channel that triggers when the signal's state changes +type StateChangeObserver struct { + // state notifier channel + c chan State + // closed is closed when the observer is closed to avoid sending to a closed channel + closed chan struct{} + // stop is a callback to stop the observer + stop func() +} + +// Stop stops the observer. Its update channel will be closed and +func (s *StateChangeObserver) Stop() { + s.stop() +} + +// C returns the channel to observer state changes +func (s *StateChangeObserver) C() <-chan State { + return s.c +} + +func (s *StateChangeObserver) notify(state State) { + select { + case <-s.closed: + case s.c <- state: + } +} + +// ObserveStateChange returns a channel that receives state changes. +// Note that the caller must take care of consuming that channel, otherwise the Signal +// will block upon state changes. +func (s *Signal) ObserveStateChange() *StateChangeObserver { + s.m.Lock() + defer s.m.Unlock() + + observer := &StateChangeObserver{ + c: make(chan State, 1), + closed: make(chan struct{}), + } + + // initialize the observer with the current state + observer.notify(s.state) + + // the stop funtion stops the observer by closing its channel + // and removing it from the list of observers + observer.stop = func() { + close(observer.closed) + s.m.Lock() + defer s.m.Unlock() + + // iterate over all observers and close *this* one + for idx, obs := range s.stateChangeObservers { + if obs == observer { + copy(s.stateChangeObservers[idx:], s.stateChangeObservers[idx+1:]) + s.stateChangeObservers[len(s.stateChangeObservers)-1] = nil + s.stateChangeObservers = s.stateChangeObservers[:len(s.stateChangeObservers)-1] + } + } + close(observer.c) + } + + s.stateChangeObservers = append(s.stateChangeObservers, observer) + return observer +} diff --git a/stats.go b/stats.go index 37ff8dce..709c62d2 100644 --- a/stats.go +++ b/stats.go @@ -14,6 +14,8 @@ const ( // PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files), // and has not actually started working yet. PartitionInitializing + // PartitionConnecting indicates the partition trying to (re-)connect to Kafka + PartitionConnecting // PartitionRecovering indicates the partition is recovering and the storage // is writing updates in bulk-mode (if the storage implementation supports it). PartitionRecovering diff --git a/view.go b/view.go index 28ab5d13..cf0694aa 100644 --- a/view.go +++ b/view.go @@ -12,15 +12,30 @@ import ( "github.com/lovoo/goka/storage" ) +// ViewState represents the state of the view +type ViewState int + const ( // ViewStateIdle - the view is not started yet - ViewStateIdle State = iota - // ViewStateCatchUp - the view is still catching up + ViewStateIdle ViewState = iota + // ViewStateInitializing - the view (i.e. at least one partition) is initializing + ViewStateInitializing + // ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting + ViewStateConnecting + // ViewStateCatchUp - the view (i.e. at least one partition) is still catching up ViewStateCatchUp - // ViewStateRunning - the view has caught up and is running + // ViewStateRunning - the view (i.e. all partitions) has caught up and is running ViewStateRunning ) +func newViewSignal() *Signal { + return NewSignal(State(ViewStateIdle), + State(ViewStateInitializing), + State(ViewStateConnecting), + State(ViewStateCatchUp), + State(ViewStateRunning)).SetState(State(ViewStateIdle)) +} + // Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors. type Getter func(string) (interface{}, error) @@ -75,7 +90,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) log: opts.log.Prefix(fmt.Sprintf("View %s", topic)), consumer: consumer, tmgr: tmgr, - state: NewSignal(ViewStateIdle, ViewStateCatchUp, ViewStateRunning).SetState(ViewStateIdle), + state: newViewSignal(), } if err = v.createPartitions(brokers); err != nil { @@ -87,7 +102,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) // WaitRunning returns a channel that will be closed when the view enters the running state func (v *View) WaitRunning() <-chan struct{} { - return v.state.WaitForState(ViewStateRunning) + return v.state.WaitForState(State(ViewStateRunning)) } func (v *View) createPartitions(brokers []string) (rerr error) { @@ -134,6 +149,77 @@ func (v *View) createPartitions(brokers []string) (rerr error) { return nil } +func (v *View) runStateMerger(ctx context.Context) { + + var ( + states = make(map[int]PartitionStatus) + m sync.Mutex + ) + + // internal callback that will be called when the state of any + // partition changes. + // Then the "lowest" state of all partitions will be selected and + // translated into the respective ViewState + updateViewState := func(idx int, state State) { + m.Lock() + defer m.Unlock() + states[idx] = PartitionStatus(state) + + var lowestState = PartitionStatus(-1) + + for _, partitionState := range states { + if lowestState == -1 || partitionState < lowestState { + lowestState = partitionState + } + } + var newState = ViewState(-1) + switch lowestState { + case PartitionStopped: + newState = ViewStateIdle + case PartitionInitializing: + newState = ViewStateInitializing + case PartitionConnecting: + newState = ViewStateConnecting + case PartitionRecovering: + newState = ViewStateCatchUp + case PartitionPreparing: + newState = ViewStateCatchUp + case PartitionRunning: + newState = ViewStateRunning + default: + v.log.Printf("State merger received unknown partition state: %v", lowestState) + } + + if newState != -1 { + v.state.SetState(State(newState)) + } + } + + // get a state change observer for all partitions + for idx, partition := range v.partitions { + idx := idx + partition := partition + + observer := partition.observeStateChanges() + // create a goroutine that updates the view state based all partition states + go func() { + for { + select { + case newState, ok := <-observer.C(): + if !ok { + return + } + // something has changed, so update the state + updateViewState(idx, newState) + case <-ctx.Done(): + observer.Stop() + return + } + } + }() + } +} + // Run starts consuming the view's topic and saving updates in the local persistent cache. // // The view will shutdown in case of errors or when the context is closed. @@ -145,8 +231,10 @@ func (v *View) Run(ctx context.Context) (rerr error) { v.log.Debugf("starting") defer v.log.Debugf("stopped") - v.state.SetState(ViewStateCatchUp) - defer v.state.SetState(ViewStateIdle) + // update the view state asynchronously by observing + // the partition's state and translating that to the view + v.runStateMerger(ctx) + defer v.state.SetState(State(ViewStateIdle)) // close the view after running defer func() { @@ -160,8 +248,9 @@ func (v *View) Run(ctx context.Context) (rerr error) { for _, partition := range v.partitions { partition := partition + go partition.RunStatsLoop(ctx) recoverErrg.Go(func() error { - return partition.SetupAndRecover(recoverCtx) + return partition.SetupAndRecover(recoverCtx, v.opts.autoreconnect) }) } @@ -177,8 +266,6 @@ func (v *View) Run(ctx context.Context) (rerr error) { default: } - v.state.SetState(ViewStateRunning) - catchupErrg, catchupCtx := multierr.NewErrGroup(ctx) for _, partition := range v.partitions { @@ -348,6 +435,38 @@ func (v *View) Recovered() bool { return true } +// CurrentState returns the current ViewState of the view +// This is useful for polling e.g. when implementing health checks or metrics +func (v *View) CurrentState() ViewState { + return ViewState(v.state.State()) +} + +// ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view +// by reading from a channel. +// It is crucial to continuously read from that channel, otherwise the View might deadlock upon +// state changes. +// If the observer is not needed, the caller must call observer.Stop() +// +// Example +// +// view := goka.NewView(...) +// go view.Run(ctx) +// +// go func(){ +// obs := view.ObserveStateChanges() +// defer obs.Stop() +// for { +// select{ +// case state, ok := <-obs.C: +// // handle state (or closed channel) +// case <-ctx.Done(): +// } +// } +// }() +func (v *View) ObserveStateChanges() *StateChangeObserver { + return v.state.ObserveStateChange() +} + // Stats returns a set of performance metrics of the view. func (v *View) Stats(ctx context.Context) *ViewStats { return v.statsWithContext(ctx) diff --git a/view_test.go b/view_test.go index 3c29e5bf..a4ebdb69 100644 --- a/view_test.go +++ b/view_test.go @@ -592,7 +592,7 @@ func TestView_Run(t *testing.T) { pt.consumer = consumer view.partitions = []*PartitionTable{pt} - view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateIdle)) + view.state = newViewSignal() bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetOldest).Return(oldest, nil).AnyTimes() bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetNewest).Return(newest, nil).AnyTimes() @@ -647,7 +647,7 @@ func TestView_Run(t *testing.T) { pt.consumer = consumer view.partitions = []*PartitionTable{pt} - view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateIdle)) + view.state = newViewSignal() bm.mst.EXPECT().GetOffset(gomock.Any()).Return(int64(0), retErr).AnyTimes() bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetNewest).Return(sarama.OffsetNewest, retErr).AnyTimes() @@ -697,7 +697,7 @@ func TestView_WaitRunning(t *testing.T) { view, _, ctrl := createTestView(t, NewMockAutoConsumer(t, DefaultConfig())) defer ctrl.Finish() - view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateRunning)) + view.state = newViewSignal().SetState(State(ViewStateRunning)) var isRunning bool select {