Skip to content

Commit

Permalink
Merge pull request #937 from Shopify/optional-minimal-metadata-set
Browse files Browse the repository at this point in the history
Add a config option to store only minimal metadata
  • Loading branch information
eapache authored Aug 24, 2017
2 parents 3fee590 + 38673c0 commit 55b3f6a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
38 changes: 25 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,20 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}

// do an initial fetch of all cluster metadata by specifying an empty list of topics
err := client.RefreshMetadata()
switch err {
case nil:
break
case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
if conf.Metadata.Full {
// do an initial fetch of all cluster metadata by specifying an empty list of topics
err := client.RefreshMetadata()
switch err {
case nil:
break
case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
}
}
go withRecover(client.backgroundMetadataUpdater)

Expand Down Expand Up @@ -605,7 +607,17 @@ func (client *client) backgroundMetadataUpdater() {
for {
select {
case <-ticker.C:
if err := client.RefreshMetadata(); err != nil {
topics := []string{}
if !client.conf.Metadata.Full {
if specificTopics, err := client.Topics(); err != nil {
Logger.Println("Client background metadata topic load:", err)
break
} else {
topics = specificTopics
}
}

if err := client.RefreshMetadata(topics...); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type Config struct {
// Defaults to 10 minutes. Set to 0 to disable. Similar to
// `topic.metadata.refresh.interval.ms` in the JVM version.
RefreshFrequency time.Duration

// Whether to maintain a full set of metadata for all topics, or just
// the minimal set that has been necessary so far. The full set is simpler
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool
}

// Producer is the namespace for configuration related to producing messages,
Expand Down Expand Up @@ -263,6 +269,7 @@ func NewConfig() *Config {
c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = 250 * time.Millisecond
c.Metadata.RefreshFrequency = 10 * time.Minute
c.Metadata.Full = true

c.Producer.MaxMessageBytes = 1000000
c.Producer.RequiredAcks = WaitForLocal
Expand Down

0 comments on commit 55b3f6a

Please sign in to comment.