diff --git a/offset_manager.go b/offset_manager.go index e63f083a0..07b614372 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -12,6 +12,11 @@ type OffsetManager interface { // ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will // return an error if this OffsetManager is already managing the given topic/partition. ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) + + // Close stops the OffsetManager from managing offsets. It is required to call this function + // before an OffsetManager object passes out of scope, as it will otherwise + // leak memory. You must call this after all the PartitionOffsetManagers are closed. + Close() error } type offsetManager struct { @@ -66,6 +71,10 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti return pom, nil } +func (om *offsetManager) Close() error { + return nil +} + func (om *offsetManager) refBrokerOffsetManager(broker *Broker) *brokerOffsetManager { om.lock.Lock() defer om.lock.Unlock() diff --git a/offset_manager_test.go b/offset_manager_test.go index 574571cde..6e291c595 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -124,6 +124,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) { coordinator.Close() newCoordinator.Close() safeClose(t, pom) + safeClose(t, om) safeClose(t, testClient) } @@ -157,6 +158,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) { broker.Close() coordinator.Close() safeClose(t, pom) + safeClose(t, om) safeClose(t, testClient) } @@ -173,6 +175,7 @@ func TestPartitionOffsetManagerOffset(t *testing.T) { } safeClose(t, pom) + safeClose(t, om) broker.Close() coordinator.Close() safeClose(t, testClient) @@ -197,6 +200,7 @@ func TestPartitionOffsetManagerSetOffset(t *testing.T) { } safeClose(t, pom) + safeClose(t, om) safeClose(t, testClient) broker.Close() coordinator.Close() @@ -271,6 +275,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) { broker.Close() coordinator.Close() newCoordinator.Close() + safeClose(t, om) safeClose(t, testClient) } @@ -298,6 +303,7 @@ func TestAbortPartitionOffsetManager(t *testing.T) { pom.SetOffset(100, "modified_meta") safeClose(t, pom) + safeClose(t, om) broker.Close() safeClose(t, testClient) }