Skip to content

Commit

Permalink
feat: Add support for Directed Reads (#1000)
Browse files Browse the repository at this point in the history
* changes

* changes

* docs

* docs

* linting

* feat(spanner): remove client side validations for directed read options

* feat(spanner): update the auto_failover_disabled field

* feat(spanner): update unit tests

* feat(spanner): update test

* feat(spanner): update documentation

* feat(spanner): add system test to validate exception in case of RW transaction

* feat(spanner): update unit test

* feat(spanner): add dro for batchsnapshot and update system tests

* feat(spanner): fix unit tests for batchsnapshot

* feat(spanner): add unit tests for partition read and query

* feat(spanner): lint fixes

* feat(spanner): code refactor remove TransactionType

* feat(spanner): comment refactor

* feat(spanner): remove comments

---------

Co-authored-by: Sri Harsha CH <[email protected]>
Co-authored-by: Sri Harsha CH <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent 07a0202 commit c4210b2
Show file tree
Hide file tree
Showing 14 changed files with 564 additions and 8 deletions.
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .types.spanner import CommitRequest
from .types.spanner import CreateSessionRequest
from .types.spanner import DeleteSessionRequest
from .types.spanner import DirectedReadOptions
from .types.spanner import ExecuteBatchDmlRequest
from .types.spanner import ExecuteBatchDmlResponse
from .types.spanner import ExecuteSqlRequest
Expand Down Expand Up @@ -108,6 +109,7 @@
"CommitResponse",
"CreateSessionRequest",
"DeleteSessionRequest",
"DirectedReadOptions",
"ExecuteBatchDmlRequest",
"ExecuteBatchDmlResponse",
"ExecuteSqlRequest",
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ class Client(ClientWithProject):
disable leader aware routing. Disabling leader aware routing would
route all requests in RW/PDML transactions to the closest region.
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.
:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -139,6 +145,7 @@ def __init__(
client_options=None,
query_options=None,
route_to_leader_enabled=True,
directed_read_options=None,
):
self._emulator_host = _get_spanner_emulator_host()

Expand Down Expand Up @@ -179,6 +186,7 @@ def __init__(
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)

self._route_to_leader_enabled = route_to_leader_enabled
self._directed_read_options = directed_read_options

@property
def credentials(self):
Expand Down Expand Up @@ -260,6 +268,17 @@ def route_to_leader_enabled(self):
"""
return self._route_to_leader_enabled

@property
def directed_read_options(self):
"""Getter for directed_read_options.
:rtype:
:class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:returns: The directed_read_options for the client.
"""
return self._directed_read_options

def copy(self):
"""Make a copy of this client.
Expand Down Expand Up @@ -383,3 +402,14 @@ def list_instances(self, filter_="", page_size=None):
request=request, metadata=metadata
)
return page_iter

@directed_read_options.setter
def directed_read_options(self, directed_read_options):
"""Sets directed_read_options for the client
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: Client options used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.
"""
self._directed_read_options = directed_read_options
17 changes: 17 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def __init__(
self._route_to_leader_enabled = self._instance._client.route_to_leader_enabled
self._enable_drop_protection = enable_drop_protection
self._reconciling = False
self._directed_read_options = self._instance._client.directed_read_options

if pool is None:
pool = BurstyPool(database_role=database_role)
Expand Down Expand Up @@ -1226,6 +1227,7 @@ def generate_read_batches(
partition_size_bytes=None,
max_partitions=None,
data_boost_enabled=False,
directed_read_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
Expand Down Expand Up @@ -1265,6 +1267,12 @@ def generate_read_batches(
(Optional) If this is for a partitioned read and this field is
set ``true``, the request will be executed via offline access.
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for ReadRequests that indicates which replicas
or regions should be used for non-transactional reads.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
Expand Down Expand Up @@ -1293,6 +1301,7 @@ def generate_read_batches(
"keyset": keyset._to_dict(),
"index": index,
"data_boost_enabled": data_boost_enabled,
"directed_read_options": directed_read_options,
}
for partition in partitions:
yield {"partition": partition, "read": read_info.copy()}
Expand Down Expand Up @@ -1337,6 +1346,7 @@ def generate_query_batches(
max_partitions=None,
query_options=None,
data_boost_enabled=False,
directed_read_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
Expand Down Expand Up @@ -1388,6 +1398,12 @@ def generate_query_batches(
(Optional) If this is for a partitioned query and this field is
set ``true``, the request will be executed via offline access.
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional queries.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
Expand All @@ -1412,6 +1428,7 @@ def generate_query_batches(
query_info = {
"sql": sql,
"data_boost_enabled": data_boost_enabled,
"directed_read_options": directed_read_options,
}
if params:
query_info["params"] = params
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def read(
partition=None,
request_options=None,
data_boost_enabled=False,
directed_read_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
Expand Down Expand Up @@ -224,6 +225,12 @@ def read(
``partition_token``, the API will return an
``INVALID_ARGUMENT`` error.
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
Expand Down Expand Up @@ -253,6 +260,11 @@ def read(
if self._read_only:
# Transaction tags are not supported for read only transactions.
request_options.transaction_tag = None
if (
directed_read_options is None
and database._directed_read_options is not None
):
directed_read_options = database._directed_read_options
elif self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag

Expand All @@ -266,6 +278,7 @@ def read(
partition_token=partition,
request_options=request_options,
data_boost_enabled=data_boost_enabled,
directed_read_options=directed_read_options,
)
restart = functools.partial(
api.streaming_read,
Expand Down Expand Up @@ -322,6 +335,7 @@ def execute_sql(
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
data_boost_enabled=False,
directed_read_options=None,
):
"""Perform an ``ExecuteStreamingSql`` API request.
Expand Down Expand Up @@ -379,6 +393,12 @@ def execute_sql(
``partition_token``, the API will return an
``INVALID_ARGUMENT`` error.
:type directed_read_options: :class:`~google.cloud.spanner_v1.DirectedReadOptions`
or :class:`dict`
:param directed_read_options: (Optional) Request level option used to set the directed_read_options
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.
:raises ValueError:
for reuse of single-use snapshots, or if a transaction ID is
already pending for multiple-use snapshots.
Expand Down Expand Up @@ -419,6 +439,11 @@ def execute_sql(
if self._read_only:
# Transaction tags are not supported for read only transactions.
request_options.transaction_tag = None
if (
directed_read_options is None
and database._directed_read_options is not None
):
directed_read_options = database._directed_read_options
elif self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag

Expand All @@ -433,6 +458,7 @@ def execute_sql(
query_options=query_options,
request_options=request_options,
data_boost_enabled=data_boost_enabled,
directed_read_options=directed_read_options,
)
restart = functools.partial(
api.execute_streaming_sql,
Expand Down
76 changes: 76 additions & 0 deletions samples/samples/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from google.cloud import spanner
from google.cloud.spanner_admin_instance_v1.types import spanner_instance_admin
from google.cloud.spanner_v1 import param_types
from google.cloud.spanner_v1 import DirectedReadOptions
from google.type import expr_pb2
from google.iam.v1 import policy_pb2
from google.cloud.spanner_v1.data_types import JsonObject
Expand Down Expand Up @@ -2723,6 +2724,78 @@ def drop_sequence(instance_id, database_id):

# [END spanner_drop_sequence]


def directed_read_options(
instance_id,
database_id,
):
"""
Shows how to run an execute sql request with directed read options.
Only one of exclude_replicas or include_replicas can be set
Each accepts a list of replicaSelections which contains location and type
* `location` - The location must be one of the regions within the
multi-region configuration of your database.
* `type_` - The type of the replica
Some examples of using replica_selectors are:
* `location:us-east1` --> The "us-east1" replica(s) of any available type
will be used to process the request.
* `type:READ_ONLY` --> The "READ_ONLY" type replica(s) in nearest
available location will be used to process the
request.
* `location:us-east1 type:READ_ONLY` --> The "READ_ONLY" type replica(s)
in location "us-east1" will be used to process
the request.
include_replicas also contains an option for auto_failover_disabled which when set
Spanner will not route requests to a replica outside the
include_replicas list when all the specified replicas are unavailable
or unhealthy. The default value is `false`
"""
# [START spanner_directed_read]
# instance_id = "your-spanner-instance"
# database_id = "your-spanner-db-id"

directed_read_options_for_client = {
"exclude_replicas": {
"replica_selections": [
{
"location": "us-east4",
},
],
},
}

# directed_read_options can be set at client level and will be used in all
# read-only transaction requests
spanner_client = spanner.Client(
directed_read_options=directed_read_options_for_client
)
instance = spanner_client.instance(instance_id)
database = instance.database(database_id)

directed_read_options_for_request = {
"include_replicas": {
"replica_selections": [
{
"type_": DirectedReadOptions.ReplicaSelection.Type.READ_ONLY,
},
],
"auto_failover_disabled": True,
},
}

with database.snapshot() as snapshot:
# Read rows while passing directed_read_options directly to the query.
# These will override the options passed at Client level.
results = snapshot.execute_sql(
"SELECT SingerId, AlbumId, AlbumTitle FROM Albums",
directed_read_options=directed_read_options_for_request,
)

for row in results:
print("SingerId: {}, AlbumId: {}, AlbumTitle: {}".format(*row))
# [END spanner_directed_read]


if __name__ == "__main__": # noqa: C901
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
Expand Down Expand Up @@ -2862,6 +2935,7 @@ def drop_sequence(instance_id, database_id):
"--database_role", default="new_parent"
)
enable_fine_grained_access_parser.add_argument("--title", default="condition title")
subparsers.add_parser("directed_read_options", help=directed_read_options.__doc__)

args = parser.parse_args()

Expand Down Expand Up @@ -2993,3 +3067,5 @@ def drop_sequence(instance_id, database_id):
args.database_role,
args.title,
)
elif args.command == "directed_read_options":
directed_read_options(args.instance_id, args.database_id)
7 changes: 7 additions & 0 deletions samples/samples/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,3 +852,10 @@ def test_drop_sequence(capsys, instance_id, bit_reverse_sequence_database):
"Altered Customers table to drop DEFAULT from CustomerId column and dropped the Seq sequence on database"
in out
)


@pytest.mark.dependency(depends=["insert_data"])
def test_directed_read_options(capsys, instance_id, sample_database):
snippets.directed_read_options(instance_id, sample_database.database_id)
out, _ = capsys.readouterr()
assert "SingerId: 1, AlbumId: 1, AlbumTitle: Total Junk" in out
Loading

0 comments on commit c4210b2

Please sign in to comment.