Skip to content

Commit

Permalink
Consistently return ErrClosedClient when calling an exported function…
Browse files Browse the repository at this point in the history
… on a closed client
  • Loading branch information
wvanbergen committed Apr 11, 2015
1 parent fb1ac37 commit 987c0b9
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func (client *client) Config() *Config {
}

func (client *client) Close() error {
// Check to see whether the client is closed
if client.Closed() {
// Chances are this is being called from a defer() and the error will go unobserved
// so we go ahead and log the event in this case.
Expand Down Expand Up @@ -177,7 +176,6 @@ func (client *client) Closed() bool {
}

func (client *client) Topics() ([]string, error) {
// Check to see whether the client is closed
if client.Closed() {
return nil, ErrClosedClient
}
Expand All @@ -194,7 +192,6 @@ func (client *client) Topics() ([]string, error) {
}

func (client *client) Partitions(topic string) ([]int32, error) {
// Check to see whether the client is closed
if client.Closed() {
return nil, ErrClosedClient
}
Expand All @@ -217,7 +214,6 @@ func (client *client) Partitions(topic string) ([]int32, error) {
}

func (client *client) WritablePartitions(topic string) ([]int32, error) {
// Check to see whether the client is closed
if client.Closed() {
return nil, ErrClosedClient
}
Expand Down Expand Up @@ -271,6 +267,10 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error)
}

func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
}

leader, err := client.cachedLeader(topic, partitionID)

if leader == nil {
Expand Down Expand Up @@ -302,6 +302,10 @@ func (client *client) RefreshMetadata(topics ...string) error {
}

func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
if client.Closed() {
return -1, ErrClosedClient
}

offset, err := client.getOffset(topic, partitionID, time)

if err != nil {
Expand All @@ -315,6 +319,10 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
}

coordinator := client.cachedCoordinator(consumerGroup)

if coordinator == nil {
Expand All @@ -333,6 +341,10 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
}

func (client *client) RefreshCoordinator(consumerGroup string) error {
if client.Closed() {
return ErrClosedClient
}

response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
if err != nil {
return err
Expand Down

0 comments on commit 987c0b9

Please sign in to comment.