Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump memcache expiration #1640

Merged
merged 6 commits into from
Jul 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions app/multitenant/aws_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ import (
)

const (
hourField = "hour"
tsField = "ts"
reportField = "report"
reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users
reportCacheExpiration = 15 * time.Second
natsTimeout = 10 * time.Second
hourField = "hour"
tsField = "ts"
reportField = "report"
natsTimeout = 10 * time.Second
)

var (
Expand Down Expand Up @@ -102,6 +100,7 @@ type AWSCollectorConfig struct {
S3Store *S3Store
NatsHost string
MemcacheClient *MemcacheClient
Window time.Duration
}

type awsCollector struct {
Expand All @@ -112,6 +111,7 @@ type awsCollector struct {
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient
window time.Duration

nats *nats.Conn
waitersLock sync.Mutex
Expand Down Expand Up @@ -144,14 +144,17 @@ func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) {
}
}

// (window * report rate) * number of hosts per user * number of users
reportCacheSize := (int(config.Window.Seconds()) / 3) * 10 * 5
return &awsCollector{
db: dynamodb.New(session.New(config.DynamoDBConfig)),
s3: config.S3Store,
userIDer: config.UserIDer,
tableName: config.DynamoTable,
merger: app.NewSmartMerger(),
inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration),
inProcess: newInProcessStore(reportCacheSize, config.Window),
memcache: config.MemcacheClient,
window: config.Window,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
}, nil
Expand Down Expand Up @@ -294,7 +297,7 @@ func (c *awsCollector) getReports(reportKeys []string) ([]report.Report, error)
func (c *awsCollector) Report(ctx context.Context) (report.Report, error) {
var (
now = time.Now()
start = now.Add(-15 * time.Second)
start = now.Add(-c.window)
rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds()
userid, err = c.userIDer(ctx)
)
Expand Down Expand Up @@ -323,6 +326,7 @@ func (c *awsCollector) Report(ctx context.Context) (report.Report, error) {
}
}

log.Debugf("Fetching %d reports from %v to %v", len(reportKeys), start, now)
reports, err := c.getReports(reportKeys)
if err != nil {
return report.MakeReport(), err
Expand Down
28 changes: 21 additions & 7 deletions app/multitenant/memcache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,37 @@ type MemcacheClient struct {
wait sync.WaitGroup
}

// MemcacheConfig defines how a MemcacheClient should be constructed.
type MemcacheConfig struct {
Host string
Service string
Timeout time.Duration
UpdateInterval time.Duration
Expiration time.Duration
}

// NewMemcacheClient creates a new MemcacheClient that gets its server list
// from SRV and updates the server list on a regular basis.
func NewMemcacheClient(host string, timeout time.Duration, service string, updateInterval time.Duration, expiration int32) *MemcacheClient {
func NewMemcacheClient(config MemcacheConfig) *MemcacheClient {
var servers memcache.ServerList
client := memcache.NewFromSelector(&servers)
client.Timeout = timeout
client.Timeout = config.Timeout

newClient := &MemcacheClient{
client: client,
serverList: &servers,
expiration: expiration,
hostname: host,
service: service,
expiration: int32(config.Expiration.Seconds()),
hostname: config.Host,
service: config.Service,
quit: make(chan struct{}),
}
err := newClient.updateMemcacheServers()
if err != nil {
log.Errorf("Error setting memcache servers to '%v': %v", host, err)
log.Errorf("Error setting memcache servers to '%v': %v", config.Host, err)
}

newClient.wait.Add(1)
go newClient.updateLoop(updateInterval)
go newClient.updateLoop(config.UpdateInterval)
return newClient
}

Expand Down Expand Up @@ -179,6 +188,11 @@ func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report,
}
}

if len(missing) > 0 {
sort.Strings(missing)
log.Warningf("Missing %d reports from memcache: %v", len(missing), missing)
}

memcacheHits.Add(float64(len(reports)))
memcacheRequests.Add(float64(len(keys)))
return reports, missing, nil
Expand Down
15 changes: 10 additions & 5 deletions prog/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
)

const (
memcacheExpiration = 15 // seconds
memcacheUpdateInterval = 1 * time.Minute
)

Expand Down Expand Up @@ -81,7 +80,7 @@ func awsConfigFromURL(url *url.URL) (*aws.Config, error) {
return config, nil
}

func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname, memcachedHostname string, memcachedTimeout time.Duration, memcachedService string, window time.Duration, createTables bool) (app.Collector, error) {
func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHostname, memcachedHostname string, memcachedTimeout time.Duration, memcachedService string, memcachedExpiration, window time.Duration, createTables bool) (app.Collector, error) {
if collectorURL == "local" {
return app.NewCollector(window), nil
}
Expand Down Expand Up @@ -110,8 +109,13 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
var memcacheClient *multitenant.MemcacheClient
if memcachedHostname != "" {
memcacheClient = multitenant.NewMemcacheClient(
memcachedHostname, memcachedTimeout, memcachedService,
memcacheUpdateInterval, memcacheExpiration,
multitenant.MemcacheConfig{
Host: memcachedHostname,
Timeout: memcachedTimeout,
Expiration: memcachedExpiration,
UpdateInterval: memcacheUpdateInterval,
Service: memcachedService,
},
)
}
awsCollector, err := multitenant.NewAWSCollector(
Expand All @@ -122,6 +126,7 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo
S3Store: &s3Store,
NatsHost: natsHostname,
MemcacheClient: memcacheClient,
Window: window,
},
)
if err != nil {
Expand Down Expand Up @@ -205,7 +210,7 @@ func appMain(flags appFlags) {

collector, err := collectorFactory(
userIDer, flags.collectorURL, flags.s3URL, flags.natsHostname, flags.memcachedHostname,
flags.memcachedTimeout, flags.memcachedService, flags.window, flags.awsCreateTables)
flags.memcachedTimeout, flags.memcachedService, flags.memcachedExpiration, flags.window, flags.awsCreateTables)
if err != nil {
log.Fatalf("Error creating collector: %v", err)
return
Expand Down
20 changes: 11 additions & 9 deletions prog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,16 @@ type appFlags struct {
containerName string
dockerEndpoint string

collectorURL string
s3URL string
controlRouterURL string
pipeRouterURL string
natsHostname string
memcachedHostname string
memcachedTimeout time.Duration
memcachedService string
userIDHeader string
collectorURL string
s3URL string
controlRouterURL string
pipeRouterURL string
natsHostname string
memcachedHostname string
memcachedTimeout time.Duration
memcachedService string
memcachedExpiration time.Duration
userIDHeader string

awsCreateTables bool
consulInf string
Expand Down Expand Up @@ -190,6 +191,7 @@ func main() {
flag.StringVar(&flags.app.natsHostname, "app.nats", "", "Hostname for NATS service to use for shortcut reports. If empty, shortcut reporting will be disabled.")
flag.StringVar(&flags.app.memcachedHostname, "app.memcached.hostname", "", "Hostname for memcached service to use when caching reports. If empty, no memcached will be used.")
flag.DurationVar(&flags.app.memcachedTimeout, "app.memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
flag.DurationVar(&flags.app.memcachedExpiration, "app.memcached.expiration", 2*15*time.Second, "How long reports stay in the memcache.")
flag.StringVar(&flags.app.memcachedService, "app.memcached.service", "memcached", "SRV service used to discover memcache servers.")
flag.StringVar(&flags.app.userIDHeader, "app.userid.header", "", "HTTP header to use as userid")

Expand Down