diff --git a/physical/gcs/gcs.go b/physical/gcs/gcs.go index da9c3b1fb0dc..2e63b37185a8 100644 --- a/physical/gcs/gcs.go +++ b/physical/gcs/gcs.go @@ -71,15 +71,23 @@ type Backend struct { // chunkSize is the chunk size to use for requests. chunkSize int - // client is the underlying API client for talking to gcs. - client *storage.Client + // client is the API client and permitPool is the allowed concurrent uses of + // the client. + client *storage.Client + permitPool *physical.PermitPool // haEnabled indicates if HA is enabled. haEnabled bool - // logger and permitPool are internal constructs - logger log.Logger - permitPool *physical.PermitPool + // haClient is the API client. This is managed separately from the main client + // because a flood of requests should not block refreshing the TTLs on the + // lock. + // + // This value will be nil if haEnabled is false. + haClient *storage.Client + + // logger is an internal logger. + logger log.Logger } // NewBackend constructs a Google Cloud Storage backend with the given @@ -115,6 +123,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error chunkSize = chunkSize * 1024 // HA configuration + haClient := (*storage.Client)(nil) haEnabled := false haEnabledStr := os.Getenv(envHAEnabled) if haEnabledStr == "" { @@ -127,6 +136,15 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error return nil, errwrap.Wrapf("failed to parse HA enabled: {{err}}", err) } } + if haEnabled { + logger.Debug("creating client") + var err error + ctx := context.Background() + haClient, err = storage.NewClient(ctx, option.WithUserAgent(useragent.String())) + if err != nil { + return nil, errwrap.Wrapf("failed to create HA storage client: {{err}}", err) + } + } // Max parallel maxParallel, err := extractInt(c["max_parallel"]) @@ -140,30 +158,24 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error "ha_enabled", haEnabled, "max_parallel", maxParallel, ) - logger.Debug("creating client") - - // Client - opts := []option.ClientOption{option.WithUserAgent(useragent.String())} - if credentialsFile := c["credentials_file"]; credentialsFile != "" { - logger.Warn("specifying credentials_file as an option is " + - "deprecated. Please use the GOOGLE_APPLICATION_CREDENTIALS environment " + - "variable or instance credentials instead.") - opts = append(opts, option.WithCredentialsFile(credentialsFile)) - } + logger.Debug("creating client") ctx := context.Background() - client, err := storage.NewClient(ctx, opts...) + client, err := storage.NewClient(ctx, option.WithUserAgent(useragent.String())) if err != nil { return nil, errwrap.Wrapf("failed to create storage client: {{err}}", err) } return &Backend{ bucket: bucket, - haEnabled: haEnabled, chunkSize: chunkSize, client: client, permitPool: physical.NewPermitPool(maxParallel), - logger: logger, + + haEnabled: haEnabled, + haClient: haClient, + + logger: logger, }, nil } diff --git a/physical/gcs/gcs_ha.go b/physical/gcs/gcs_ha.go index bb6105959002..84d2dcb3a91a 100644 --- a/physical/gcs/gcs_ha.go +++ b/physical/gcs/gcs_ha.go @@ -44,7 +44,7 @@ var ( // metricLockUnlock is the metric to register for a lock delete. metricLockUnlock = []string{"gcs", "lock", "unlock"} - // metricLockGet is the metric to register for a lock get. + // metricLockLock is the metric to register for a lock get. metricLockLock = []string{"gcs", "lock", "lock"} // metricLockValue is the metric to register for a lock create/update. @@ -194,7 +194,7 @@ func (l *Lock) Unlock() error { MetagenerationMatch: r.attrs.Metageneration, } - obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key) + obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key) if err := obj.If(conds).Delete(ctx); err != nil { // If the pre-condition failed, it means that someone else has already // acquired the lock and we don't want to delete it. @@ -324,10 +324,6 @@ OUTER: // - if key is empty or identity is the same or timestamp exceeds TTL // - update the lock to self func (l *Lock) writeLock() (bool, error) { - // Pooling - l.backend.permitPool.Acquire() - defer l.backend.permitPool.Release() - // Create a transaction to read and the update (maybe) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -376,7 +372,7 @@ func (l *Lock) writeLock() (bool, error) { } // Write the object - obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key) + obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key) w := obj.If(conds).NewWriter(ctx) w.ObjectAttrs.CacheControl = "no-cache; no-store; max-age=0" w.ObjectAttrs.Metadata = map[string]string{ @@ -395,12 +391,8 @@ func (l *Lock) writeLock() (bool, error) { // get retrieves the value for the lock. func (l *Lock) get(ctx context.Context) (*LockRecord, error) { - // Pooling - l.backend.permitPool.Acquire() - defer l.backend.permitPool.Release() - // Read - attrs, err := l.backend.client.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx) + attrs, err := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx) if err == storage.ErrObjectNotExist { return nil, nil }