Skip to content

Commit

Permalink
docs: add sample for connectorx (#2228)
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite authored Aug 27, 2024
1 parent 17c620d commit 99e1486
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .github/workflows/samples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ jobs:
run: |
pip install -r requirements.txt
python sample.py
- name: Run connectorx sample tests
working-directory: ./samples/python/connectorx
run: |
pip install -r requirements.txt
python connectorx_sample.py
nodejs-samples:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ PGAdapter can be used with the following drivers and clients:
1. `pgx`: Version 4.15 and higher are supported. See [pgx support](docs/pgx.md) for more details.
1. `psycopg2`: Version 2.9.3 and higher are supported. See [psycopg2](docs/psycopg2.md) for more details.
1. `psycopg3`: Version 3.1.x and higher are supported. See [psycopg3 support](docs/psycopg3.md) for more details.
1. `connectorx`: Version 0.3.3 and higher have experimental support. See [connectorx sample](samples/python/connectorx) for more details.
1. `node-postgres`: Version 8.8.0 and higher are supported. See [node-postgres support](docs/node-postgres.md) for more details.
1. `npgsql`: Version 6.0.x and higher have experimental support. See [npgsql support](docs/npgsql.md) for more details.
1. `PDO_PGSQL`: The PHP PDO driver has experimental support. See [PHP PDO](docs/pdo.md) for more details.
Expand Down
20 changes: 20 additions & 0 deletions samples/python/connectorx/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# connectorx sample

This sample application shows how to connect to Cloud Spanner through PGAdapter using
[connectorx](https://sfu-db.github.io/connector-x/intro.html).
PGAdapter is automatically started in a Docker container by the sample application.

Run the sample on the Spanner Emulator with the following command:

```shell
python connectorx_sample.py
```

Run the sample on a real Spanner database with the following command:

```shell
python connectorx_sample.py \
-p my-project \
-i my-instance \
-i my-database
```
98 changes: 98 additions & 0 deletions samples/python/connectorx/connectorx_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2024 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Sample for connecting to PGAdapter with connectorx
This sample application shows how to connect to PGAdapter and Cloud Spanner
with connectorx.
The sample starts PGAdapter in an embedded Docker container and then connects
through this container to the Cloud Spanner emulator or real Cloud Spanner.
The sample uses the Cloud Spanner emulator by default.
This sample requires Docker to be installed on the local environment.
Usage (emulator):
python connectorx_sample.py
Usage (Cloud Spanner):
python connectorx_sample.py -p my-project -i my-instance -d my-database
"""

import argparse
import connectorx as cx
from pgadapter import start_pgadapter

parser = argparse.ArgumentParser(
prog='PGAdapter connectorx sample',
description='Sample application for using connectorx with PGAdapter')
parser.add_argument('-e', '--emulator', required=False, default=None,
help="Set this option to 'True' to force PGAdapter to connect to the emulator.")
parser.add_argument('-p', '--project', default="my-project",
help="The Google Cloud project containing the Cloud Spanner instance that PGAdapter should connect to.")
parser.add_argument('-i', '--instance', default="my-instance",
help="The Cloud Spanner instance that PGAdapter should connect to.")
parser.add_argument('-d', '--database', default="my-database",
help="The Cloud Spanner database that connectorx should connect to.")
parser.add_argument('-c', '--credentials', required=False,
help="The credentials file that PGAdapter should use to connect to Cloud Spanner. If None, then the sample application will try to use the default credentials in the environment.")
args = parser.parse_args()

if (args.project == "my-project"
and args.instance == "my-instance"
and args.database == "my-database"
and args.emulator is None):
use_emulator = True
else:
if args.emulator is None:
use_emulator = False
else:
use_emulator = args.emulator == "True"

# Start PGAdapter in an embedded container.
container, port = start_pgadapter(args.project,
args.instance,
use_emulator,
args.credentials)
try:
print("PGAdapter running on port ", port, "\n")

# Connect to Cloud Spanner using connectorx by connecting to PGAdapter that is
# running in the embedded container. The username/password combination is
# ignored by PGAdapter.
postgres_url = ("postgresql://localhost:{port}/{database}?sslmode=disable"
.format(port=port, database=args.database))
query = "SELECT 'Hello World!' as greeting"
result = cx.read_sql(postgres_url, query)
print("Greeting from Cloud Spanner PostgreSQL:\n", result, "\n")

# connectorx by default uses COPY TO STDOUT. PGAdapter by default tries to
# use PartitionQuery for COPY TO STDOUT operations. This is the most efficient
# for large data sets, as the partitions can be read in parallel by PGAdapter.
# You can instruct PGAdapter to use the ExecuteStreamingSql endpoint directly
# for queries that you know will not benefit from PartitionQuery.
# See https://cloud.google.com/spanner/docs/reads#read_data_in_parallel for
# more information on PartitionQuery.
# Add 'options=-c spanner.copy_partition_query=false' to the connection URL
# to use ExecuteStreamingSql directly.
postgres_url = ("postgresql://localhost:{port}/{database}?sslmode=disable"
"&options=-c spanner.copy_partition_query=false"
.format(port=port, database=args.database))
query = "SELECT 'Hello World - Non Partitioned!' as non_partitioned_greeting"
result = cx.read_sql(postgres_url, query)
print("Greeting from Cloud Spanner PostgreSQL:\n", result, "\n")

finally:
if container is not None:
container.stop()
123 changes: 123 additions & 0 deletions samples/python/connectorx/pgadapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright 2023 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utility for starting and stopping PGAdapter in an embedded container
Defines functions for starting and stopping PGAdapter in an embedded Docker
container. Requires that Docker is installed on the local system.
"""

import io
import json
import os
import socket
import time
import google.auth
import google.oauth2.credentials
import google.oauth2.service_account
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs


def start_pgadapter(project: str,
instance: str,
emulator: bool = False,
credentials: str = None) -> (DockerContainer, str):
"""Starts PGAdapter in an embedded Docker container
Starts PGAdapter in an embedded Docker container and returns the TCP port
number where PGAdapter is listening for incoming connections. You can Use any
standard PostgreSQL driver to connect to this port.
Parameters
----------
project : str
The Google Cloud project that PGAdapter should connect to.
instance : str
The Cloud Spanner instance that PGAdapter should connect to.
emulator: bool
Whether PGAdapter should connect to the Cloud Spanner emulator or real
Cloud Spanner.
credentials : str or None
The credentials file that PGAdapter should use. If None, then this
function will try to load the default credentials from the environment.
Returns
-------
container, port : tuple[DockerContainer, str]
The Docker container running PGAdapter and
the port where PGAdapter is listening. Connect to this port on localhost
with a standard PostgreSQL driver to connect to Cloud Spanner.
"""

if emulator:
# Start PGAdapter with the Cloud Spanner emulator in a Docker container
container =(
DockerContainer("gcr.io/cloud-spanner-pg-adapter/pgadapter-emulator")
.with_exposed_ports(5432)
.with_command("-p " + project + " -i " + instance))
container.start()
else:
# Start PGAdapter in a Docker container
container = DockerContainer("gcr.io/cloud-spanner-pg-adapter/pgadapter") \
.with_exposed_ports(5432) \
.with_command(" -p " + project
+ " -i " + instance
+ " -x -c /credentials.json")
container.start()
# Determine the credentials that should be used by PGAdapter and write these
# to a file in the container.
credentials_info = _determine_credentials(credentials)
container.exec("sh -c 'cat <<EOT >> /credentials.json\n"
+ json.dumps(credentials_info, indent=0)
+ "\nEOT'")
# Wait until PGAdapter has started and is listening on the exposed port.
wait_for_logs(container, "PostgreSQL version:")
port = container.get_exposed_port("5432")
_wait_for_port(port=int(port))
return container, port


def _determine_credentials(credentials: str):
if credentials is None:
explicit_file = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
else:
explicit_file = credentials
if explicit_file is None:
credentials, _ = google.auth.default()
if type(credentials).__name__ == \
google.oauth2.credentials.Credentials.__name__:
info = json.loads(credentials.to_json())
info["type"] = "authorized_user"
else:
raise ValueError("GOOGLE_APPLICATION_CREDENTIALS has not been set "
"and no explicit credentials were supplied")
else:
with io.open(explicit_file, "r") as file_obj:
info = json.load(file_obj)
return info


def _wait_for_port(port: int, poll_interval: float = 0.1, timeout: float = 5.0):
start = time.time()
while True:
try:
with socket.create_connection(("localhost", port), timeout=timeout):
break
except OSError:
duration = time.time() - start
if timeout and duration > timeout:
raise TimeoutError("container did not listen on port {} in {} seconds"
.format(port, timeout))
time.sleep(poll_interval)
6 changes: 6 additions & 0 deletions samples/python/connectorx/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
connectorx==0.3.3
pandas==2.2.2
testcontainers~=4.8.0
requests==2.32.3
google~=3.0.0
google.auth~=2.34.0
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,8 @@ public void initSessionSetting(String name, String value) {
AbstractStatementParser statementParser =
AbstractStatementParser.getInstance(Dialect.POSTGRESQL);
if ("options".equalsIgnoreCase(name)) {
String[] commands = value.split("-c\\s+");
// Some drivers encode spaces as '+'.
String[] commands = value.split("-c[\\s+]+");
for (String command : commands) {
// Special case: If the setting is one that is handled by the Connection API, then we need
// to execute the statement on the connection instead.
Expand Down

0 comments on commit 99e1486

Please sign in to comment.