Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a config option to store only minimal metadata #937

Merged
merged 1 commit into from
Aug 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,20 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}

// do an initial fetch of all cluster metadata by specifying an empty list of topics
err := client.RefreshMetadata()
switch err {
case nil:
break
case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
if conf.Metadata.Full {
// do an initial fetch of all cluster metadata by specifying an empty list of topics
err := client.RefreshMetadata()
switch err {
case nil:
break
case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
// indicates that maybe part of the cluster is down, but is not fatal to creating the client
Logger.Println(err)
default:
close(client.closed) // we haven't started the background updater yet, so we have to do this manually
_ = client.Close()
return nil, err
}
}
go withRecover(client.backgroundMetadataUpdater)

Expand Down Expand Up @@ -605,7 +607,17 @@ func (client *client) backgroundMetadataUpdater() {
for {
select {
case <-ticker.C:
if err := client.RefreshMetadata(); err != nil {
topics := []string{}
if !client.conf.Metadata.Full {
if specificTopics, err := client.Topics(); err != nil {
Logger.Println("Client background metadata topic load:", err)
break
} else {
topics = specificTopics
}
}

if err := client.RefreshMetadata(topics...); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
Expand Down
7 changes: 7 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type Config struct {
// Defaults to 10 minutes. Set to 0 to disable. Similar to
// `topic.metadata.refresh.interval.ms` in the JVM version.
RefreshFrequency time.Duration

// Whether to maintain a full set of metadata for all topics, or just
// the minimal set that has been necessary so far. The full set is simpler
// and usually more convenient, but can take up a substantial amount of
// memory if you have many topics and partitions. Defaults to true.
Full bool
}

// Producer is the namespace for configuration related to producing messages,
Expand Down Expand Up @@ -263,6 +269,7 @@ func NewConfig() *Config {
c.Metadata.Retry.Max = 3
c.Metadata.Retry.Backoff = 250 * time.Millisecond
c.Metadata.RefreshFrequency = 10 * time.Minute
c.Metadata.Full = true

c.Producer.MaxMessageBytes = 1000000
c.Producer.RequiredAcks = WaitForLocal
Expand Down