Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Adds global endpoint manager policy and links GEM to client #22223

Merged
merged 13 commits into from
Jan 17, 2024
38 changes: 35 additions & 3 deletions sdk/data/azcosmos/cosmos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type Client struct {
endpoint string
pipeline azruntime.Pipeline
gem *globalEndpointManager
}

// Endpoint used to create the client.
Expand All @@ -36,7 +37,14 @@ func (c *Client) Endpoint() string {
// cred - The credential used to authenticate with the cosmos service.
// options - Optional Cosmos client options. Pass nil to accept default values.
func NewClientWithKey(endpoint string, cred KeyCredential, o *ClientOptions) (*Client, error) {
return &Client{endpoint: endpoint, pipeline: newPipeline(newSharedKeyCredPolicy(cred), o)}, nil
internalClient := &Client{endpoint: endpoint, pipeline: newInternalPipeline(newSharedKeyCredPolicy(cred), o), gem: &globalEndpointManager{}}

//need to pass in preferredRegions from options here once those changes are merged
gem, err := newGlobalEndpointManager(internalClient, []string{}, 0)
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, pipeline: newPipeline(newSharedKeyCredPolicy(cred), gem, o), gem: gem}, nil
}

// NewClient creates a new instance of Cosmos client with Azure AD access token authentication. It uses the default pipeline configuration.
Expand All @@ -48,7 +56,16 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
if err != nil {
return nil, err
}
return &Client{endpoint: endpoint, pipeline: newPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), o)}, nil

internalClient := &Client{endpoint: endpoint, pipeline: newInternalPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), o), gem: &globalEndpointManager{}}

//need to pass in preferredRegions from options here once those changes are merged
gem, err := newGlobalEndpointManager(internalClient, []string{}, 0)
if err != nil {
return nil, err
}

return &Client{endpoint: endpoint, pipeline: newPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o), gem: gem}, nil
}

// NewClientFromConnectionString creates a new instance of Cosmos client from connection string. It uses the default pipeline configuration.
Expand Down Expand Up @@ -87,7 +104,7 @@ func NewClientFromConnectionString(connectionString string, o *ClientOptions) (*
return NewClientWithKey(endpoint, cred, o)
}

func newPipeline(authPolicy policy.Policy, options *ClientOptions) azruntime.Pipeline {
func newPipeline(authPolicy policy.Policy, gem *globalEndpointManager, options *ClientOptions) azruntime.Pipeline {
if options == nil {
options = &ClientOptions{}
}
Expand All @@ -98,7 +115,21 @@ func newPipeline(authPolicy policy.Policy, options *ClientOptions) azruntime.Pip
&headerPolicies{
enableContentResponseOnWrite: options.EnableContentResponseOnWrite,
},
&globalEndpointManagerPolicy{gem: gem},
},
PerRetry: []policy.Policy{
authPolicy,
},
},
&options.ClientOptions)
}

func newInternalPipeline(authPolicy policy.Policy, options *ClientOptions) azruntime.Pipeline {
if options == nil {
options = &ClientOptions{}
}
return azruntime.NewPipeline("azcosmos", serviceLibVersion,
azruntime.PipelineOptions{
PerRetry: []policy.Policy{
authPolicy,
},
Expand Down Expand Up @@ -176,6 +207,7 @@ func (c *Client) CreateDatabase(
if err != nil {
return DatabaseResponse{}, err
}
fmt.Printf("- db create succeeded with code %d", azResponse.StatusCode)

return newDatabaseResponse(azResponse)
}
Expand Down
25 changes: 25 additions & 0 deletions sdk/data/azcosmos/cosmos_global_endpoint_manager_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package azcosmos

import (
"context"
"net/http"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
)

type globalEndpointManagerPolicy struct {
gem *globalEndpointManager
}

func (p *globalEndpointManagerPolicy) Do(req *policy.Request) (*http.Response, error) {
shouldRefresh := p.gem.ShouldRefresh()
if shouldRefresh {
go func() {
_ = p.gem.Update(context.Background())
}()
}
return req.Next()
}