Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

physical/gcs: use separate client for updating locks #9424

Merged
merged 3 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions physical/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,23 @@ type Backend struct {
// chunkSize is the chunk size to use for requests.
chunkSize int

// client is the underlying API client for talking to gcs.
client *storage.Client
// client is the API client and permitPool is the allowed concurrent uses of
// the client.
client *storage.Client
permitPool *physical.PermitPool

// haEnabled indicates if HA is enabled.
haEnabled bool

// logger and permitPool are internal constructs
logger log.Logger
permitPool *physical.PermitPool
// haClient is the API client. This is managed separately from the main client
// because a flood of requests should not block refreshing the TTLs on the
// lock.
//
// This value will be nil if haEnabled is false.
haClient *storage.Client

// logger is an internal logger.
logger log.Logger
}

// NewBackend constructs a Google Cloud Storage backend with the given
Expand Down Expand Up @@ -115,6 +123,7 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
chunkSize = chunkSize * 1024

// HA configuration
haClient := (*storage.Client)(nil)
haEnabled := false
haEnabledStr := os.Getenv(envHAEnabled)
if haEnabledStr == "" {
Expand All @@ -127,6 +136,15 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
return nil, errwrap.Wrapf("failed to parse HA enabled: {{err}}", err)
}
}
if haEnabled {
logger.Debug("creating client")
var err error
ctx := context.Background()
haClient, err = storage.NewClient(ctx, option.WithUserAgent(useragent.String()))
if err != nil {
return nil, errwrap.Wrapf("failed to create HA storage client: {{err}}", err)
}
}

// Max parallel
maxParallel, err := extractInt(c["max_parallel"])
Expand All @@ -140,30 +158,24 @@ func NewBackend(c map[string]string, logger log.Logger) (physical.Backend, error
"ha_enabled", haEnabled,
"max_parallel", maxParallel,
)
logger.Debug("creating client")

// Client
opts := []option.ClientOption{option.WithUserAgent(useragent.String())}
if credentialsFile := c["credentials_file"]; credentialsFile != "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been deprecated for a very, very long time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kalafut this might be fine to remove since we've had this notice for ~2yrs, but we should call this out in the changelog.

logger.Warn("specifying credentials_file as an option is " +
"deprecated. Please use the GOOGLE_APPLICATION_CREDENTIALS environment " +
"variable or instance credentials instead.")
opts = append(opts, option.WithCredentialsFile(credentialsFile))
}

logger.Debug("creating client")
ctx := context.Background()
client, err := storage.NewClient(ctx, opts...)
client, err := storage.NewClient(ctx, option.WithUserAgent(useragent.String()))
if err != nil {
return nil, errwrap.Wrapf("failed to create storage client: {{err}}", err)
}

return &Backend{
bucket: bucket,
haEnabled: haEnabled,
chunkSize: chunkSize,
client: client,
permitPool: physical.NewPermitPool(maxParallel),
logger: logger,

haEnabled: haEnabled,
haClient: haClient,

logger: logger,
}, nil
}

Expand Down
16 changes: 4 additions & 12 deletions physical/gcs/gcs_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
// metricLockUnlock is the metric to register for a lock delete.
metricLockUnlock = []string{"gcs", "lock", "unlock"}

// metricLockGet is the metric to register for a lock get.
// metricLockLock is the metric to register for a lock get.
metricLockLock = []string{"gcs", "lock", "lock"}

// metricLockValue is the metric to register for a lock create/update.
Expand Down Expand Up @@ -194,7 +194,7 @@ func (l *Lock) Unlock() error {
MetagenerationMatch: r.attrs.Metageneration,
}

obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key)
obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key)
if err := obj.If(conds).Delete(ctx); err != nil {
// If the pre-condition failed, it means that someone else has already
// acquired the lock and we don't want to delete it.
Expand Down Expand Up @@ -324,10 +324,6 @@ OUTER:
// - if key is empty or identity is the same or timestamp exceeds TTL
// - update the lock to self
func (l *Lock) writeLock() (bool, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Create a transaction to read and the update (maybe)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -376,7 +372,7 @@ func (l *Lock) writeLock() (bool, error) {
}

// Write the object
obj := l.backend.client.Bucket(l.backend.bucket).Object(l.key)
obj := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key)
w := obj.If(conds).NewWriter(ctx)
w.ObjectAttrs.CacheControl = "no-cache; no-store; max-age=0"
w.ObjectAttrs.Metadata = map[string]string{
Expand All @@ -395,12 +391,8 @@ func (l *Lock) writeLock() (bool, error) {

// get retrieves the value for the lock.
func (l *Lock) get(ctx context.Context) (*LockRecord, error) {
// Pooling
l.backend.permitPool.Acquire()
defer l.backend.permitPool.Release()

// Read
attrs, err := l.backend.client.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx)
attrs, err := l.backend.haClient.Bucket(l.backend.bucket).Object(l.key).Attrs(ctx)
if err == storage.ErrObjectNotExist {
return nil, nil
}
Expand Down