From 6af82ada77427da08a176f390d8d0c686da5576a Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 16 Dec 2023 06:15:46 +0100 Subject: [PATCH] chore: Harmonize imports of sqlalchemy module, use `sa` where applicable It follows a convention to import SQLAlchemy like `import sqlalchemy as sa`. In this spirit, all references, even simple ones like symbols to SQLAlchemy base types like `TEXT`, or `BIGINT`, will be referenced by `sa.TEXT`, `sa.BIGINT`, etc., so it is easy to tell them apart when harmonizing type definitions coming from SA's built-in dialects vs. type definitions coming from 3rd-party dialects. --- target_postgres/connector.py | 161 ++++++++---------- target_postgres/sinks.py | 77 +++++---- target_postgres/tests/core.py | 4 +- target_postgres/tests/test_target_postgres.py | 89 +++++----- 4 files changed, 153 insertions(+), 178 deletions(-) diff --git a/target_postgres/connector.py b/target_postgres/connector.py index 59314d60..b10af6e2 100644 --- a/target_postgres/connector.py +++ b/target_postgres/connector.py @@ -12,24 +12,11 @@ import paramiko import simplejson -import sqlalchemy +import sqlalchemy as sa from singer_sdk import SQLConnector from singer_sdk import typing as th from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, JSONB -from sqlalchemy.engine import URL -from sqlalchemy.engine.url import make_url -from sqlalchemy.types import ( - BOOLEAN, - DATE, - DATETIME, - DECIMAL, - INTEGER, - TEXT, - TIME, - TIMESTAMP, - VARCHAR, - TypeDecorator, -) +from sqlalchemy.sql.type_api import TypeEngine from sshtunnel import SSHTunnelForwarder @@ -48,7 +35,7 @@ def __init__(self, config: dict) -> None: Args: config: Configuration for the connector. """ - url: URL = make_url(self.get_sqlalchemy_url(config=config)) + url: sa.engine.URL = sa.engine.make_url(self.get_sqlalchemy_url(config=config)) ssh_config = config.get("ssh_tunnel", {}) self.ssh_tunnel: SSHTunnelForwarder @@ -84,10 +71,10 @@ def prepare_table( # type: ignore[override] full_table_name: str, schema: dict, primary_keys: list[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Adapt target table to provided schema if possible. Args: @@ -102,8 +89,8 @@ def prepare_table( # type: ignore[override] The table object. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) - table: sqlalchemy.Table + meta = sa.MetaData(schema=schema_name) + table: sa.Table if not self.table_exists(full_table_name=full_table_name): table = self.create_empty_table( table_name=table_name, @@ -144,10 +131,10 @@ def prepare_table( # type: ignore[override] def copy_table_structure( self, full_table_name: str, - from_table: sqlalchemy.Table, - connection: sqlalchemy.engine.Connection, + from_table: sa.Table, + connection: sa.engine.Connection, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Copy table structure. Args: @@ -160,21 +147,19 @@ def copy_table_structure( The new table object. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) - new_table: sqlalchemy.Table + meta = sa.MetaData(schema=schema_name) + new_table: sa.Table columns = [] if self.table_exists(full_table_name=full_table_name): raise RuntimeError("Table already exists") for column in from_table.columns: columns.append(column._copy()) if as_temp_table: - new_table = sqlalchemy.Table( - table_name, meta, *columns, prefixes=["TEMPORARY"] - ) + new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"]) new_table.create(bind=connection) return new_table else: - new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table @@ -185,35 +170,33 @@ def _connect(self) -> t.Iterator[sqlalchemy.engine.Connection]: yield conn engine.dispose() - def drop_table( - self, table: sqlalchemy.Table, connection: sqlalchemy.engine.Connection - ): + def drop_table(self, table: sa.Table, connection: sa.engine.Connection): """Drop table data.""" table.drop(bind=connection) def clone_table( self, new_table_name, table, metadata, connection, temp_table - ) -> sqlalchemy.Table: + ) -> sa.Table: """Clone a table.""" new_columns = [] for column in table.columns: new_columns.append( - sqlalchemy.Column( + sa.Column( column.name, column.type, ) ) if temp_table is True: - new_table = sqlalchemy.Table( + new_table = sa.Table( new_table_name, metadata, *new_columns, prefixes=["TEMPORARY"] ) else: - new_table = sqlalchemy.Table(new_table_name, metadata, *new_columns) + new_table = sa.Table(new_table_name, metadata, *new_columns) new_table.create(bind=connection) return new_table @staticmethod - def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: + def to_sql_type(jsonschema_type: dict) -> TypeEngine: """Return a JSON Schema representation of the provided type. By default will call `typing.to_sql_type()`. @@ -282,10 +265,10 @@ def pick_individual_type(jsonschema_type: dict): if "array" in jsonschema_type["type"]: return ARRAY(JSONB()) if jsonschema_type.get("format") == "date-time": - return TIMESTAMP() + return sa.TIMESTAMP() individual_type = th.to_sql_type(jsonschema_type) - if isinstance(individual_type, VARCHAR): - return TEXT() + if isinstance(individual_type, sa.VARCHAR): + return sa.TEXT() return individual_type @staticmethod @@ -301,15 +284,15 @@ def pick_best_sql_type(sql_type_array: list): precedence_order = [ ARRAY, JSONB, - TEXT, - TIMESTAMP, - DATETIME, - DATE, - TIME, - DECIMAL, + sa.TEXT, + sa.TIMESTAMP, + sa.DATETIME, + sa.DATE, + sa.TIME, + sa.DECIMAL, BIGINT, - INTEGER, - BOOLEAN, + sa.INTEGER, + sa.BOOLEAN, NOTYPE, ] @@ -317,18 +300,18 @@ def pick_best_sql_type(sql_type_array: list): for obj in sql_type_array: if isinstance(obj, sql_type): return obj - return TEXT() + return sa.TEXT() def create_empty_table( # type: ignore[override] self, table_name: str, - meta: sqlalchemy.MetaData, + meta: sa.MetaData, schema: dict, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, primary_keys: list[str] | None = None, partition_keys: list[str] | None = None, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Create an empty target table. Args: @@ -342,7 +325,7 @@ def create_empty_table( # type: ignore[override] NotImplementedError: if temp tables are unsupported and as_temp_table=True. RuntimeError: if a variant schema is passed with no properties defined. """ - columns: list[sqlalchemy.Column] = [] + columns: list[sa.Column] = [] primary_keys = primary_keys or [] try: properties: dict = schema["properties"] @@ -355,7 +338,7 @@ def create_empty_table( # type: ignore[override] for property_name, property_jsonschema in properties.items(): is_primary_key = property_name in primary_keys columns.append( - sqlalchemy.Column( + sa.Column( property_name, self.to_sql_type(property_jsonschema), primary_key=is_primary_key, @@ -363,24 +346,22 @@ def create_empty_table( # type: ignore[override] ) ) if as_temp_table: - new_table = sqlalchemy.Table( - table_name, meta, *columns, prefixes=["TEMPORARY"] - ) + new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"]) new_table.create(bind=connection) return new_table - new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table def prepare_column( # type: ignore[override] self, schema_name: str, - table: sqlalchemy.Table, + table: sa.Table, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, - column_object: sqlalchemy.Column | None = None, + sql_type: TypeEngine, + connection: sa.engine.Connection, + column_object: sa.Column | None = None, ) -> None: """Adapt target table to provided schema if possible. @@ -397,7 +378,7 @@ def prepare_column( # type: ignore[override] if not column_exists: self._create_empty_column( - # We should migrate every function to use sqlalchemy.Table + # We should migrate every function to use sa.Table # instead of having to know what the function wants table_name=table.name, column_name=column_name, @@ -421,8 +402,8 @@ def _create_empty_column( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, + sql_type: TypeEngine, + connection: sa.engine.Connection, ) -> None: """Create a new column. @@ -451,8 +432,8 @@ def get_column_add_ddl( # type: ignore[override] table_name: str, schema_name: str, column_name: str, - column_type: sqlalchemy.types.TypeEngine, - ) -> sqlalchemy.DDL: + column_type: TypeEngine, + ) -> sa.DDL: """Get the create column DDL statement. Args: @@ -464,9 +445,9 @@ def get_column_add_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ - column = sqlalchemy.Column(column_name, column_type) + column = sa.Column(column_name, column_type) - return sqlalchemy.DDL( + return sa.DDL( ( 'ALTER TABLE "%(schema_name)s"."%(table_name)s"' "ADD COLUMN %(column_name)s %(column_type)s" @@ -484,9 +465,9 @@ def _adapt_column_type( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - sql_type: sqlalchemy.types.TypeEngine, - connection: sqlalchemy.engine.Connection, - column_object: sqlalchemy.Column | None, + sql_type: TypeEngine, + connection: sa.engine.Connection, + column_object: sa.Column | None, ) -> None: """Adapt table column type to support the new JSON schema type. @@ -498,9 +479,9 @@ def _adapt_column_type( # type: ignore[override] Raises: NotImplementedError: if altering columns is not supported. """ - current_type: sqlalchemy.types.TypeEngine + current_type: TypeEngine if column_object is not None: - current_type = t.cast(sqlalchemy.types.TypeEngine, column_object.type) + current_type = t.cast(TypeEngine, column_object.type) else: current_type = self._get_column_type( schema_name=schema_name, @@ -551,8 +532,8 @@ def get_column_alter_ddl( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - column_type: sqlalchemy.types.TypeEngine, - ) -> sqlalchemy.DDL: + column_type: TypeEngine, + ) -> sa.DDL: """Get the alter column DDL statement. Override this if your database uses a different syntax for altering columns. @@ -565,8 +546,8 @@ def get_column_alter_ddl( # type: ignore[override] Returns: A sqlalchemy DDL instance. """ - column = sqlalchemy.Column(column_name, column_type) - return sqlalchemy.DDL( + column = sa.Column(column_name, column_type) + return sa.DDL( ( 'ALTER TABLE "%(schema_name)s"."%(table_name)s"' "ALTER COLUMN %(column_name)s %(column_type)s" @@ -589,7 +570,7 @@ def get_sqlalchemy_url(self, config: dict) -> str: return cast(str, config["sqlalchemy_url"]) else: - sqlalchemy_url = URL.create( + sqlalchemy_url = sa.engine.URL.create( drivername=config["dialect+driver"], username=config["user"], password=config["password"], @@ -717,8 +698,8 @@ def _get_column_type( # type: ignore[override] schema_name: str, table_name: str, column_name: str, - connection: sqlalchemy.engine.Connection, - ) -> sqlalchemy.types.TypeEngine: + connection: sa.engine.Connection, + ) -> TypeEngine: """Get the SQL type of the declared column. Args: @@ -744,15 +725,15 @@ def _get_column_type( # type: ignore[override] ) raise KeyError(msg) from ex - return t.cast(sqlalchemy.types.TypeEngine, column.type) + return t.cast(TypeEngine, column.type) def get_table_columns( # type: ignore[override] self, schema_name: str, table_name: str, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, column_names: list[str] | None = None, - ) -> dict[str, sqlalchemy.Column]: + ) -> dict[str, sa.Column]: """Return a list of table columns. Overrode to support schema_name @@ -765,11 +746,11 @@ def get_table_columns( # type: ignore[override] Returns: An ordered list of column objects. """ - inspector = sqlalchemy.inspect(connection) + inspector = sa.inspect(connection) columns = inspector.get_columns(table_name, schema_name) return { - col_meta["name"]: sqlalchemy.Column( + col_meta["name"]: sa.Column( col_meta["name"], col_meta["type"], nullable=col_meta.get("nullable", False), @@ -783,7 +764,7 @@ def column_exists( # type: ignore[override] self, full_table_name: str, column_name: str, - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> bool: """Determine if the target column already exists. @@ -802,10 +783,10 @@ def column_exists( # type: ignore[override] ) -class NOTYPE(TypeDecorator): +class NOTYPE(sa.TypeDecorator): """Type to use when none is provided in the schema.""" - impl = TEXT + impl = sa.TEXT cache_ok = True def process_bind_param(self, value, dialect): @@ -824,4 +805,4 @@ def python_type(self): def as_generic(self, *args: t.Any, **kwargs: t.Any): """Return the generic type for this column.""" - return TEXT() + return sa.TEXT() diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 5b5f7c99..1b9a7c71 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -3,12 +3,9 @@ import uuid from typing import Any, Dict, Iterable, List, Optional, Union, cast -import sqlalchemy +import sqlalchemy as sa from pendulum import now from singer_sdk.sinks import SQLSink -from sqlalchemy import Column, MetaData, Table, insert, select, update -from sqlalchemy.sql import Executable -from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector @@ -72,10 +69,10 @@ def process_batch(self, context: dict) -> None: Args: context: Stream partition or context dictionary. """ - # Use one connection so we do this all in a single transaction + # Use one connection, so we do this all in a single transaction with self.connector._connect() as connection, connection.begin(): # Check structure of table - table: sqlalchemy.Table = self.connector.prepare_table( + table: sa.Table = self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, primary_keys=self.key_properties, @@ -83,7 +80,7 @@ def process_batch(self, context: dict) -> None: connection=connection, ) # Create a temp table (Creates from the table above) - temp_table: sqlalchemy.Table = self.connector.copy_table_structure( + temp_table: sa.Table = self.connector.copy_table_structure( full_table_name=self.temp_table_name, from_table=table, as_temp_table=True, @@ -119,11 +116,11 @@ def generate_temp_table_name(self): def bulk_insert_records( # type: ignore[override] self, - table: sqlalchemy.Table, + table: sa.Table, schema: dict, records: Iterable[Dict[str, Any]], primary_keys: List[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> Optional[int]: """Bulk insert records to an existing destination table. @@ -174,11 +171,11 @@ def bulk_insert_records( # type: ignore[override] def upsert( self, - from_table: sqlalchemy.Table, - to_table: sqlalchemy.Table, + from_table: sa.Table, + to_table: sa.Table, schema: dict, join_keys: List[str], - connection: sqlalchemy.engine.Connection, + connection: sa.engine.Connection, ) -> Optional[int]: """Merge upsert data from one table to another. @@ -196,33 +193,33 @@ def upsert( """ if self.append_only is True: # Insert - select_stmt = select(from_table.columns).select_from(from_table) + select_stmt = sa.select(from_table.columns).select_from(from_table) insert_stmt = to_table.insert().from_select( names=from_table.columns, select=select_stmt ) connection.execute(insert_stmt) else: join_predicates = [] - to_table_key: sqlalchemy.Column + to_table_key: sa.Column for key in join_keys: - from_table_key: sqlalchemy.Column = from_table.columns[key] + from_table_key: sa.Column = from_table.columns[key] to_table_key = to_table.columns[key] join_predicates.append(from_table_key == to_table_key) - join_condition = sqlalchemy.and_(*join_predicates) + join_condition = sa.and_(*join_predicates) where_predicates = [] for key in join_keys: to_table_key = to_table.columns[key] where_predicates.append(to_table_key.is_(None)) - where_condition = sqlalchemy.and_(*where_predicates) + where_condition = sa.and_(*where_predicates) select_stmt = ( - select(from_table.columns) + sa.select(from_table.columns) .select_from(from_table.outerjoin(to_table, join_condition)) .where(where_condition) ) - insert_stmt = insert(to_table).from_select( + insert_stmt = sa.insert(to_table).from_select( names=from_table.columns, select=select_stmt ) @@ -232,11 +229,13 @@ def upsert( where_condition = join_condition update_columns = {} for column_name in self.schema["properties"].keys(): - from_table_column: sqlalchemy.Column = from_table.columns[column_name] - to_table_column: sqlalchemy.Column = to_table.columns[column_name] + from_table_column: sa.Column = from_table.columns[column_name] + to_table_column: sa.Column = to_table.columns[column_name] update_columns[to_table_column] = from_table_column - update_stmt = update(to_table).where(where_condition).values(update_columns) + update_stmt = ( + sa.update(to_table).where(where_condition).values(update_columns) + ) connection.execute(update_stmt) return None @@ -244,12 +243,12 @@ def upsert( def column_representation( self, schema: dict, - ) -> List[Column]: + ) -> List[sa.Column]: """Return a sqlalchemy table representation for the current schema.""" - columns: list[Column] = [] + columns: list[sa.Column] = [] for property_name, property_jsonschema in schema["properties"].items(): columns.append( - Column( + sa.Column( property_name, self.connector.to_sql_type(property_jsonschema), ) @@ -259,8 +258,8 @@ def column_representation( def generate_insert_statement( self, full_table_name: str, - columns: List[Column], # type: ignore[override] - ) -> Union[str, Executable]: + columns: List[sa.Column], # type: ignore[override] + ) -> Union[str, sa.sql.Executable]: """Generate an insert statement for the given records. Args: @@ -270,9 +269,9 @@ def generate_insert_statement( Returns: An insert statement. """ - metadata = MetaData() - table = Table(full_table_name, metadata, *columns) - return insert(table) + metadata = sa.MetaData() + table = sa.Table(full_table_name, metadata, *columns) + return sa.insert(table) def conform_name(self, name: str, object_type: Optional[str] = None) -> str: """Conforming names of tables, schemas, column names.""" @@ -343,8 +342,8 @@ def activate_version(self, new_version: int) -> None: connection=connection, ) - metadata = MetaData() - target_table = Table( + metadata = sa.MetaData() + target_table = sa.Table( self.table_name, metadata, autoload_with=connection.engine, @@ -353,8 +352,8 @@ def activate_version(self, new_version: int) -> None: self.logger.info("Hard delete: %s", self.config.get("hard_delete")) if self.config["hard_delete"] is True: - delete_stmt = sqlalchemy.delete(target_table).where( - sqlalchemy.or_( + delete_stmt = sa.delete(target_table).where( + sa.or_( target_table.c[self.version_column_name].is_(None), target_table.c[self.version_column_name] <= new_version, ) @@ -375,19 +374,19 @@ def activate_version(self, new_version: int) -> None: ) # Need to deal with the case where data doesn't exist for the version column update_stmt = ( - update(target_table) + sa.update(target_table) .values( { - target_table.c[self.soft_delete_column_name]: bindparam( + target_table.c[self.soft_delete_column_name]: sa.bindparam( "deletedate" ) } ) .where( - sqlalchemy.and_( - sqlalchemy.or_( + sa.and_( + sa.or_( target_table.c[self.version_column_name] - < bindparam("version"), + < sa.bindparam("version"), target_table.c[self.version_column_name].is_(None), ), target_table.c[self.soft_delete_column_name].is_(None), diff --git a/target_postgres/tests/core.py b/target_postgres/tests/core.py index ba5662be..dc0ece69 100644 --- a/target_postgres/tests/core.py +++ b/target_postgres/tests/core.py @@ -1,6 +1,6 @@ """ Config and base values for target-postgres testing """ # flake8: noqa -import sqlalchemy +import sqlalchemy as sa from target_postgres.target import TargetPostgres @@ -53,7 +53,7 @@ def postgres_config_ssh_tunnel(): } -def create_engine(target_postgres: TargetPostgres) -> sqlalchemy.engine.Engine: +def create_engine(target_postgres: TargetPostgres) -> sa.engine.Engine: return TargetPostgres.default_sink_class.connector_class( config=target_postgres.config )._engine diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index d20fcf0a..81a0f0f6 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -8,11 +8,10 @@ import jsonschema import pytest -import sqlalchemy +import sqlalchemy as sa from singer_sdk.exceptions import MissingKeyPropertiesError from singer_sdk.testing import get_target_test_class, sync_end_to_end from sqlalchemy.dialects.postgresql import ARRAY, JSONB -from sqlalchemy.types import BIGINT, TEXT, TIMESTAMP from target_postgres.connector import PostgresConnector from target_postgres.target import TargetPostgres @@ -113,7 +112,7 @@ def verify_data( if primary_key is not None and check_data is not None: if isinstance(check_data, dict): result = connection.execute( - sqlalchemy.text( + sa.text( f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" ) ) @@ -122,7 +121,7 @@ def verify_data( assert result_dict == check_data elif isinstance(check_data, list): result = connection.execute( - sqlalchemy.text( + sa.text( f"SELECT * FROM {full_table_name} ORDER BY {primary_key}" ) ) @@ -136,7 +135,7 @@ def verify_data( raise ValueError("Invalid check_data - not dict or list of dicts") else: result = connection.execute( - sqlalchemy.text(f"SELECT COUNT(*) FROM {full_table_name}") + sa.text(f"SELECT COUNT(*) FROM {full_table_name}") ) assert result.first()[0] == number_of_rows @@ -156,8 +155,8 @@ def verify_schema( """ schema = self.target.config["default_target_schema"] with self.engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table( + meta = sa.MetaData() + table = sa.Table( table_name, meta, schema=schema, autoload_with=connection ) for column in table.c: @@ -224,7 +223,7 @@ def test_port_default_config(): target_config = TargetPostgres(config=config).config connector = PostgresConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5432/{database}" @@ -249,7 +248,7 @@ def test_port_config(): target_config = TargetPostgres(config=config).config connector = PostgresConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5433/{database}" @@ -431,7 +430,7 @@ def test_no_primary_keys(postgres_target, helper): table_name = "test_no_pk" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) + connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) file_name = f"{table_name}.singer" singer_file_to_target(file_name, postgres_target) @@ -467,7 +466,7 @@ def test_array_boolean(postgres_target, helper): helper.verify_schema( "array_boolean", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -481,7 +480,7 @@ def test_array_number(postgres_target, helper): helper.verify_schema( "array_number", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -495,7 +494,7 @@ def test_array_string(postgres_target, helper): helper.verify_schema( "array_string", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -509,7 +508,7 @@ def test_array_timestamp(postgres_target, helper): helper.verify_schema( "array_timestamp", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": ARRAY}, }, ) @@ -536,7 +535,7 @@ def test_object_mixed(postgres_target, helper): helper.verify_schema( "object_mixed", check_columns={ - "id": {"type": BIGINT}, + "id": {"type": sa.BIGINT}, "value": {"type": JSONB}, }, ) @@ -593,21 +592,21 @@ def test_anyof(postgres_target, helper): table_name, check_columns={ # {"type":"string"} - "id": {"type": TEXT}, + "id": {"type": sa.TEXT}, # Any of nullable date-time. # Note that postgres timestamp is equivalent to jsonschema date-time. # {"anyOf":[{"type":"string","format":"date-time"},{"type":"null"}]} - "authored_date": {"type": TIMESTAMP}, - "committed_date": {"type": TIMESTAMP}, + "authored_date": {"type": sa.TIMESTAMP}, + "committed_date": {"type": sa.TIMESTAMP}, # Any of nullable array of strings or single string. # {"anyOf":[{"type":"array","items":{"type":["null","string"]}},{"type":"string"},{"type":"null"}]} "parent_ids": {"type": ARRAY}, # Any of nullable string. # {"anyOf":[{"type":"string"},{"type":"null"}]} - "commit_message": {"type": TEXT}, + "commit_message": {"type": sa.TEXT}, # Any of nullable string or integer. # {"anyOf":[{"type":"string"},{"type":"integer"},{"type":"null"}]} - "legacy_id": {"type": TEXT}, + "legacy_id": {"type": sa.TEXT}, }, ) @@ -629,29 +628,29 @@ def test_activate_version_hard_delete(postgres_config_no_ssl): engine = create_engine(pg_hard_delete_true) singer_file_to_target(file_name, pg_hard_delete_true) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_hard_delete_true) # Should remove the 2 records we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 @@ -662,42 +661,40 @@ def test_activate_version_soft_delete(postgres_target): file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = False pg_soft_delete = TargetPostgres(config=postgres_config_soft_delete) singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_soft_delete) # Should have all records including the 2 we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 result = connection.execute( - sqlalchemy.text( + sa.text( f"SELECT * FROM {full_table_name} where {METADATA_COLUMN_PREFIX}_deleted_at is NOT NULL" ) ) @@ -711,9 +708,7 @@ def test_activate_version_deletes_data_properly(postgres_target): file_name = f"{table_name}.singer" full_table_name = postgres_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute( - sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}") - ) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(postgres_target._config) postgres_config_soft_delete["hard_delete"] = True @@ -721,27 +716,27 @@ def test_activate_version_deletes_data_properly(postgres_target): singer_file_to_target(file_name, pg_hard_delete) # Will populate us with 7 records with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')" ) ) result = connection.execute( - sqlalchemy.text( + sa.text( f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')" ) ) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 # Only has a schema and one activate_version message, should delete all records as it's a higher version than what's currently in the table file_name = f"{table_name}_2.singer" singer_file_to_target(file_name, pg_hard_delete) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 0 @@ -782,7 +777,7 @@ def test_postgres_ssl_no_config(postgres_config_no_ssl): postgres_config_modified = copy.deepcopy(postgres_config_no_ssl) postgres_config_modified["port"] = 5432 - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -808,8 +803,8 @@ def test_postgres_ssl_public_pkey(postgres_config): postgres_config_modified["ssl_client_private_key"] = "./ssl/public_pkey.key" # If the private key exists but access is too public, the target won't fail until - # the it attempts to establish a connection to the database. - with pytest.raises(sqlalchemy.exc.OperationalError): + # it attempts to establish a connection to the database. + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -838,7 +833,7 @@ def test_postgres_ssl_invalid_cn(postgres_config): postgres_config_modified["host"] = "127.0.0.1" postgres_config_modified["ssl_mode"] = "verify-full" - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target) @@ -871,7 +866,7 @@ def test_postgres_ssl_unsupported(postgres_config): postgres_config_modified = copy.deepcopy(postgres_config) postgres_config_modified["port"] = 5433 # Alternate service: postgres_no_ssl - with pytest.raises(sqlalchemy.exc.OperationalError): + with pytest.raises(sa.exc.OperationalError): target = TargetPostgres(config=postgres_config_modified) sync_end_to_end(tap, target)