diff --git a/partition_table.go b/partition_table.go index 550adb64..770452fb 100644 --- a/partition_table.go +++ b/partition_table.go @@ -217,7 +217,7 @@ WaitLoop: case <-ticker.C: p.log.Printf("creating storage for topic %s/%d for %.1f minutes ...", p.topic, p.partition, time.Since(start).Minutes()) case <-done: - p.log.Printf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes()) + p.log.Debugf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes()) if err != nil { return nil, fmt.Errorf("error building storage: %v", err) } @@ -282,6 +282,8 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr return }() + p.state.SetState(State(PartitionConnecting)) + // fetch local offset storedOffset, err = p.st.GetOffset(offsetNotStored) if err != nil { @@ -333,8 +335,6 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr defer p.log.Debugf("... Loading done") - p.state.SetState(State(PartitionConnecting)) - partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset) if err != nil { errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err)) diff --git a/processor.go b/processor.go index 88927cfc..84c99239 100644 --- a/processor.go +++ b/processor.go @@ -438,8 +438,8 @@ func (g *Processor) assignmentFromSession(session sarama.ConsumerGroupSession) ( func (g *Processor) Setup(session sarama.ConsumerGroupSession) error { g.state.SetState(ProcStateSetup) defer g.state.SetState(ProcStateRunning) - g.log.Printf("setup generation %d, claims=%#v", session.GenerationID(), session.Claims()) - defer g.log.Printf("setup generation %d ... done", session.GenerationID()) + g.log.Debugf("setup generation %d, claims=%#v", session.GenerationID(), session.Claims()) + defer g.log.Debugf("setup generation %d ... done", session.GenerationID()) assignment, err := g.assignmentFromSession(session) if err != nil { @@ -479,8 +479,8 @@ func (g *Processor) Setup(session sarama.ConsumerGroupSession) error { // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited // but before the offsets are committed for the very last time. func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error { - g.log.Printf("Cleaning up for %d", session.GenerationID()) - defer g.log.Printf("Cleaning up for %d ... done", session.GenerationID()) + g.log.Debugf("Cleaning up for %d", session.GenerationID()) + defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID()) g.state.SetState(ProcStateStopping) defer g.state.SetState(ProcStateIdle) @@ -525,8 +525,8 @@ func (g *Processor) WaitForReady() { // Once the Messages() channel is closed, the Handler must finish its processing // loop and exit. func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - g.log.Printf("ConsumeClaim for topic/partition %s/%d, initialOffset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) - defer g.log.Printf("ConsumeClaim done for topic/partition %s/%d", claim.Topic(), claim.Partition()) + g.log.Debugf("ConsumeClaim for topic/partition %s/%d, initialOffset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset()) + defer g.log.Debugf("ConsumeClaim done for topic/partition %s/%d", claim.Topic(), claim.Partition()) part, has := g.partitions[claim.Partition()] if !has { return fmt.Errorf("No partition (%d) to handle input in topic %s", claim.Partition(), claim.Topic()) diff --git a/signal.go b/signal.go index 7414af7c..ee2e1fa1 100644 --- a/signal.go +++ b/signal.go @@ -17,7 +17,7 @@ type waiter struct { // Signal allows synchronization on a state, waiting for that state and checking // the current state type Signal struct { - m sync.Mutex + m sync.RWMutex state State waiters []*waiter stateChangeObservers []*StateChangeObserver @@ -45,8 +45,14 @@ func (s *Signal) SetState(state State) *Signal { panic(fmt.Errorf("trying to set illegal state %v", state)) } + // if we're already in the state, do not notify anyone + if s.state == state { + return s + } + // set the state and notify all channels waiting for it. s.state = state + var newWaiters []*waiter for _, w := range s.waiters { if w.state == state || (w.minState && state >= w.state) { @@ -67,11 +73,15 @@ func (s *Signal) SetState(state State) *Signal { // IsState returns if the signal is in the requested state func (s *Signal) IsState(state State) bool { + s.m.RLock() + defer s.m.RUnlock() return s.state == state } // State returns the current state func (s *Signal) State() State { + s.m.RLock() + defer s.m.RUnlock() return s.state } @@ -94,7 +104,7 @@ func (s *Signal) WaitForStateMin(state State) chan struct{} { // state. func (s *Signal) WaitForState(state State) chan struct{} { s.m.Lock() - defer s.m.Unlock() + s.m.Unlock() w := &waiter{ done: make(chan struct{}), @@ -108,7 +118,7 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} { // if the signal is currently in that state (or in a higher state if minState is set) // then close the waiter immediately - if curState := s.State(); state == curState || (w.minState && curState >= state) { + if curState := s.state; state == curState || (w.minState && curState >= state) { close(w.done) } else { s.waiters = append(s.waiters, w) @@ -157,7 +167,7 @@ func (s *Signal) ObserveStateChange() *StateChangeObserver { } // initialize the observer with the current state - observer.notify(s.State()) + observer.notify(s.state) // the stop funtion stops the observer by closing its channel // and removing it from the list of observers