Skip to content

Commit

Permalink
Merge pull request #518 from Shopify/offset-manager-close
Browse files Browse the repository at this point in the history
Add Close() OffsetManager interface
  • Loading branch information
wvanbergen committed Aug 19, 2015
2 parents 1e19f5c + 1e8c4f2 commit de8e312
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
9 changes: 9 additions & 0 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type OffsetManager interface {
// ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
// return an error if this OffsetManager is already managing the given topic/partition.
ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)

// Close stops the OffsetManager from managing offsets. It is required to call this function
// before an OffsetManager object passes out of scope, as it will otherwise
// leak memory. You must call this after all the PartitionOffsetManagers are closed.
Close() error
}

type offsetManager struct {
Expand Down Expand Up @@ -66,6 +71,10 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
return pom, nil
}

func (om *offsetManager) Close() error {
return nil
}

func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager {
om.lock.Lock()
defer om.lock.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
coordinator.Close()
newCoordinator.Close()
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
}

Expand Down Expand Up @@ -157,6 +158,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
broker.Close()
coordinator.Close()
safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
}

Expand All @@ -173,6 +175,7 @@ func TestPartitionOffsetManagerOffset(t *testing.T) {
}

safeClose(t, pom)
safeClose(t, om)
broker.Close()
coordinator.Close()
safeClose(t, testClient)
Expand All @@ -197,6 +200,7 @@ func TestPartitionOffsetManagerSetOffset(t *testing.T) {
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
Expand Down Expand Up @@ -271,6 +275,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
broker.Close()
coordinator.Close()
newCoordinator.Close()
safeClose(t, om)
safeClose(t, testClient)
}

Expand Down Expand Up @@ -298,6 +303,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) {
pom.SetOffset(100, "modified_meta")

safeClose(t, pom)
safeClose(t, om)
broker.Close()
safeClose(t, testClient)
}

0 comments on commit de8e312

Please sign in to comment.