Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add directed reads feature #7668

Merged
merged 32 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d232e11
feat(spanner): add auto generated proto changes for directed reads
harshachinta Mar 26, 2023
e1176f9
feat(spanner): add code changes for directed read feature
harshachinta Mar 31, 2023
85d88f2
feat(spanner): add integration tests for directed read options
harshachinta Apr 4, 2023
bb7aae1
feat(spanner): pass DirectedOptions set in client level for Query whe…
harshachinta Apr 5, 2023
1b0d41c
feat(spanner): add unit tests for directed reads
harshachinta Apr 6, 2023
07587e5
feat(spanner): reuse validate function for directed read options
harshachinta Apr 7, 2023
ab7b27a
feat(spanner): code refactoring and comments
harshachinta Apr 7, 2023
faef3af
feat(spanner): throw error when directed read options set in PDML
harshachinta Apr 13, 2023
9e899db
feat(spanner): add unit tests to validate errors during RW transactio…
harshachinta Apr 13, 2023
7794b62
feat(spanner): return error when Directed Read Options set for Partit…
harshachinta Apr 14, 2023
8f387d1
feat(spanner): modify error message
harshachinta Apr 14, 2023
797c2e1
feat(spanner): refactor unit test name
harshachinta Apr 15, 2023
2ba3551
Merge branch 'main' into directed-reads
harshachinta Sep 16, 2023
61bbb3f
fix(spanner): test case
harshachinta Sep 16, 2023
0fa1ea0
fix(spanner): test case
harshachinta Sep 16, 2023
75fbb32
feat(spanner): remove manual autogenerated code changes
harshachinta Nov 2, 2023
10d1971
Merge branch 'main' into directed-reads
harshachinta Nov 2, 2023
f4507bd
Merge branch 'main' into directed-reads
harshachinta Nov 2, 2023
da92956
feat(spanner): remove client side validations for directed read options
harshachinta Nov 19, 2023
1eef1e1
Merge branch 'main' into directed-reads
harshachinta Nov 19, 2023
5edff6d
Merge branch 'main' into directed-reads
harshachinta Dec 3, 2023
343327e
Merge branch 'main' into directed-reads
harshachinta Dec 6, 2023
c93e6fa
feat(spanner): skip test for emulator
harshachinta Dec 6, 2023
6ba864f
Merge branch 'main' into directed-reads
harshachinta Dec 7, 2023
783d5d6
feat(spanner): go mod tidy
harshachinta Dec 7, 2023
7da0d10
feat(spanner): consider client level dro for BatchReadOnlyTransaction…
harshachinta Dec 9, 2023
0bd2241
Merge branch 'main' into directed-reads
harshachinta Dec 9, 2023
a469eff
Merge branch 'main' into directed-reads
harshachinta Dec 22, 2023
6f8efb6
Merge branch 'main' into directed-reads
harshachinta Jan 7, 2024
123b06e
feat(spanner): code refactor
harshachinta Jan 7, 2024
eca341a
feat(spanner): remove unit test for partitioned update
harshachinta Jan 7, 2024
fd1c4f9
feat(spanner): rename test
harshachinta Jan 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module cloud.google.com/go
go 1.19

require (
cloud.google.com/go/iam v1.1.3
cloud.google.com/go/storage v1.30.1
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.6.0
Expand All @@ -25,6 +24,7 @@ require (
require (
cloud.google.com/go/compute v1.23.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down
76 changes: 40 additions & 36 deletions spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,15 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
DataBoostEnabled: readOptions.DataBoostEnabled,
Session: sid,
Transaction: ts,
Table: table,
Index: index,
Columns: columns,
KeySet: kset,
RequestOptions: createRequestOptions(readOptions.Priority, readOptions.RequestTag, ""),
DataBoostEnabled: readOptions.DataBoostEnabled,
DirectedReadOptions: readOptions.DirectedReadOptions,
}
// Generate partitions.
for _, p := range resp.GetPartitions() {
Expand Down Expand Up @@ -215,14 +216,15 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
DataBoostEnabled: qOpts.DataBoostEnabled,
Session: sid,
Transaction: ts,
Sql: statement.SQL,
Params: params,
ParamTypes: paramTypes,
QueryOptions: qOpts.Options,
RequestOptions: createRequestOptions(qOpts.Priority, qOpts.RequestTag, ""),
DataBoostEnabled: qOpts.DataBoostEnabled,
DirectedReadOptions: qOpts.DirectedReadOptions,
}

// generate Partitions
Expand Down Expand Up @@ -313,16 +315,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.rreq.DataBoostEnabled,
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Index: p.rreq.Index,
Columns: p.rreq.Columns,
KeySet: p.rreq.KeySet,
PartitionToken: p.pt,
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.rreq.DataBoostEnabled,
DirectedReadOptions: p.rreq.DirectedReadOptions,
})
if err != nil {
return client, err
Expand All @@ -338,16 +341,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.qreq.DataBoostEnabled,
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Params: p.qreq.Params,
ParamTypes: p.qreq.ParamTypes,
QueryOptions: p.qreq.QueryOptions,
PartitionToken: p.pt,
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
DataBoostEnabled: p.qreq.DataBoostEnabled,
DirectedReadOptions: p.qreq.DirectedReadOptions,
})
if err != nil {
return client, err
Expand Down
12 changes: 12 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Client struct {
bwo BatchWriteOptions
ct *commonTags
disableRouteToLeader bool
dro *sppb.DirectedReadOptions
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -186,6 +187,11 @@ type ClientConfig struct {

// BatchTimeout specifies the timeout for a batch of sessions managed sessionClient.
BatchTimeout time.Duration

// ClientConfig options used to set the DirectedReadOptions for all ReadRequests
// and ExecuteSqlRequests for the Client which indicate which replicas or regions
// should be used for non-transactional reads or queries.
DirectedReadOptions *sppb.DirectedReadOptions
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
}

func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
Expand Down Expand Up @@ -270,6 +276,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
if config.Compression == gzip.Name {
md.Append(requestsCompressionHeader, gzip.Name)
}

// Create a session client.
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions)

Expand All @@ -291,6 +298,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
bwo: config.BatchWriteOptions,
ct: getCommonTags(sc),
disableRouteToLeader: config.DisableRouteToLeader,
dro: config.DirectedReadOptions,
}
return c, nil
}
Expand Down Expand Up @@ -374,6 +382,8 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
t.txReadOnly.qo.DirectedReadOptions = c.dro
t.txReadOnly.ro.DirectedReadOptions = c.dro
t.ct = c.ct
return t
}
Expand All @@ -397,6 +407,8 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.qo = c.qo
t.txReadOnly.ro = c.ro
t.txReadOnly.disableRouteToLeader = true
t.txReadOnly.qo.DirectedReadOptions = c.dro
t.txReadOnly.ro.DirectedReadOptions = c.dro
t.ct = c.ct
return t
}
Expand Down
Loading
Loading