diff --git a/target_cratedb/connector.py b/target_cratedb/connector.py index fca4181..a9d60f3 100644 --- a/target_cratedb/connector.py +++ b/target_cratedb/connector.py @@ -5,25 +5,10 @@ from builtins import issubclass from datetime import datetime -import sqlalchemy import sqlalchemy as sa from crate.client.sqlalchemy.types import ObjectType, ObjectTypeImpl, _ObjectArray from singer_sdk import typing as th from singer_sdk.helpers._typing import is_array_type, is_boolean_type, is_integer_type, is_number_type, is_object_type -from sqlalchemy.types import ( - ARRAY, - BIGINT, - BOOLEAN, - DATE, - DATETIME, - DECIMAL, - FLOAT, - INTEGER, - TEXT, - TIME, - TIMESTAMP, - VARCHAR, -) from target_postgres.connector import NOTYPE, PostgresConnector from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine @@ -39,7 +24,7 @@ class CrateDBConnector(PostgresConnector): allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported. allow_temp_tables: bool = False # Whether temp tables are supported. - def create_engine(self) -> sqlalchemy.Engine: + def create_engine(self) -> sa.Engine: """ Create an SQLAlchemy engine object. @@ -50,7 +35,7 @@ def create_engine(self) -> sqlalchemy.Engine: return engine @staticmethod - def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: + def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine: """Return a JSON Schema representation of the provided type. Note: Needs to be patched to invoke other static methods on `CrateDBConnector`. @@ -112,7 +97,7 @@ def pick_individual_type(jsonschema_type: dict): if "null" in jsonschema_type["type"]: return None if "integer" in jsonschema_type["type"]: - return BIGINT() + return sa.BIGINT() if "object" in jsonschema_type["type"]: return ObjectType if "array" in jsonschema_type["type"]: @@ -157,16 +142,16 @@ def pick_individual_type(jsonschema_type: dict): # Discover/translate inner types. inner_type = resolve_array_inner_type(jsonschema_type) if inner_type is not None: - return ARRAY(inner_type) + return sa.ARRAY(inner_type) # When type discovery fails, assume `TEXT`. - return ARRAY(TEXT()) + return sa.ARRAY(sa.TEXT()) if jsonschema_type.get("format") == "date-time": - return TIMESTAMP() + return sa.TIMESTAMP() individual_type = th.to_sql_type(jsonschema_type) - if isinstance(individual_type, VARCHAR): - return TEXT() + if isinstance(individual_type, sa.VARCHAR): + return sa.TEXT() return individual_type @staticmethod @@ -182,18 +167,18 @@ def pick_best_sql_type(sql_type_array: list): An instance of the best SQL type class based on defined precedence order. """ precedence_order = [ - TEXT, - TIMESTAMP, - DATETIME, - DATE, - TIME, - DECIMAL, - FLOAT, - BIGINT, - INTEGER, - BOOLEAN, + sa.TEXT, + sa.TIMESTAMP, + sa.DATETIME, + sa.DATE, + sa.TIME, + sa.DECIMAL, + sa.FLOAT, + sa.BIGINT, + sa.INTEGER, + sa.BOOLEAN, NOTYPE, - ARRAY, + sa.ARRAY, FloatVector, ObjectTypeImpl, ] @@ -202,12 +187,12 @@ def pick_best_sql_type(sql_type_array: list): for obj in sql_type_array: if isinstance(obj, sql_type): return obj - return TEXT() + return sa.TEXT() def _sort_types( self, - sql_types: t.Iterable[sqlalchemy.types.TypeEngine], - ) -> list[sqlalchemy.types.TypeEngine]: + sql_types: t.Iterable[sa.types.TypeEngine], + ) -> list[sa.types.TypeEngine]: """Return the input types sorted from most to least compatible. Note: Needs to be patched to supply handlers for `_ObjectArray` and `NOTYPE`. @@ -227,7 +212,7 @@ def _sort_types( """ def _get_type_sort_key( - sql_type: sqlalchemy.types.TypeEngine, + sql_type: sa.types.TypeEngine, ) -> tuple[int, int]: # return rank, with higher numbers ranking first @@ -257,10 +242,10 @@ def _get_type_sort_key( def copy_table_structure( self, full_table_name: str, - from_table: sqlalchemy.Table, - connection: sqlalchemy.engine.Connection, + from_table: sa.Table, + connection: sa.engine.Connection, as_temp_table: bool = False, - ) -> sqlalchemy.Table: + ) -> sa.Table: """Copy table structure. Note: Needs to be patched to prevent `Primary key columns cannot be nullable` errors. @@ -275,17 +260,17 @@ def copy_table_structure( The new table object. """ _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sqlalchemy.MetaData(schema=schema_name) + meta = sa.MetaData(schema=schema_name) columns = [] if self.table_exists(full_table_name=full_table_name): raise RuntimeError("Table already exists") - column: sqlalchemy.Column + column: sa.Column for column in from_table.columns: # CrateDB: Prevent `Primary key columns cannot be nullable` errors. if column.primary_key and column.nullable: column.nullable = False columns.append(column._copy()) - new_table = sqlalchemy.Table(table_name, meta, *columns) + new_table = sa.Table(table_name, meta, *columns) new_table.create(bind=connection) return new_table @@ -299,11 +284,11 @@ def prepare_schema(self, schema_name: str) -> None: def resolve_array_inner_type(jsonschema_type: dict) -> t.Union[sa.types.TypeEngine, None]: if "items" in jsonschema_type: if is_boolean_type(jsonschema_type["items"]): - return BOOLEAN() + return sa.BOOLEAN() if is_number_type(jsonschema_type["items"]): - return FLOAT() + return sa.FLOAT() if is_integer_type(jsonschema_type["items"]): - return BIGINT() + return sa.BIGINT() if is_object_type(jsonschema_type["items"]): return ObjectType() if is_array_type(jsonschema_type["items"]): diff --git a/target_cratedb/sinks.py b/target_cratedb/sinks.py index ea43aa9..7ff080b 100644 --- a/target_cratedb/sinks.py +++ b/target_cratedb/sinks.py @@ -3,9 +3,8 @@ import time from typing import List, Optional, Union -import sqlalchemy +import sqlalchemy as sa from pendulum import now -from sqlalchemy import Column, Executable, MetaData, Table, bindparam, insert, select, update from target_postgres.sinks import PostgresSink from target_cratedb.connector import CrateDBConnector @@ -116,7 +115,7 @@ def process_batch(self, context: dict) -> None: # Use one connection so we do this all in a single transaction with self.connector._connect() as connection, connection.begin(): # Check structure of table - table: sqlalchemy.Table = self.connector.prepare_table( + table: sa.Table = self.connector.prepare_table( full_table_name=self.full_table_name, schema=self.schema, primary_keys=self.key_properties, @@ -134,7 +133,7 @@ def process_batch(self, context: dict) -> None: # FIXME: Upserts do not work yet. """ # Create a temp table (Creates from the table above) - temp_table: sqlalchemy.Table = self.connector.copy_table_structure( + temp_table: sa.Table = self.connector.copy_table_structure( full_table_name=self.temp_table_name, from_table=table, as_temp_table=True, @@ -162,11 +161,11 @@ def process_batch(self, context: dict) -> None: def upsertX( self, - from_table: sqlalchemy.Table, - to_table: sqlalchemy.Table, + from_table: sa.Table, + to_table: sa.Table, schema: dict, - join_keys: List[Column], - connection: sqlalchemy.engine.Connection, + join_keys: List[sa.Column], + connection: sa.engine.Connection, ) -> Optional[int]: """Merge upsert data from one table to another. @@ -185,30 +184,30 @@ def upsertX( if self.append_only is True: # Insert - select_stmt = select(from_table.columns).select_from(from_table) + select_stmt = sa.select(from_table.columns).select_from(from_table) insert_stmt = to_table.insert().from_select(names=list(from_table.columns), select=select_stmt) connection.execute(insert_stmt) else: join_predicates = [] for key in join_keys: - from_table_key: sqlalchemy.Column = from_table.columns[key] # type: ignore[call-overload] - to_table_key: sqlalchemy.Column = to_table.columns[key] # type: ignore[call-overload] + from_table_key: sa.Column = from_table.columns[key] # type: ignore[call-overload] + to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload] join_predicates.append(from_table_key == to_table_key) # type: ignore[call-overload] - join_condition = sqlalchemy.and_(*join_predicates) + join_condition = sa.and_(*join_predicates) where_predicates = [] for key in join_keys: - to_table_key: sqlalchemy.Column = to_table.columns[key] # type: ignore[call-overload,no-redef] + to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload,no-redef] where_predicates.append(to_table_key.is_(None)) - where_condition = sqlalchemy.and_(*where_predicates) + where_condition = sa.and_(*where_predicates) select_stmt = ( - select(from_table.columns) + sa.select(from_table.columns) .select_from(from_table.outerjoin(to_table, join_condition)) .where(where_condition) ) - insert_stmt = insert(to_table).from_select(names=list(from_table.columns), select=select_stmt) + insert_stmt = sa.insert(to_table).from_select(names=list(from_table.columns), select=select_stmt) connection.execute(insert_stmt) @@ -216,14 +215,14 @@ def upsertX( where_condition = join_condition update_columns = {} for column_name in self.schema["properties"].keys(): - from_table_column: sqlalchemy.Column = from_table.columns[column_name] - to_table_column: sqlalchemy.Column = to_table.columns[column_name] + from_table_column: sa.Column = from_table.columns[column_name] + to_table_column: sa.Column = to_table.columns[column_name] # Prevent: `Updating a primary key is not supported` if to_table_column.primary_key: continue update_columns[to_table_column] = from_table_column - update_stmt = update(to_table).where(where_condition).values(update_columns) + update_stmt = sa.update(to_table).where(where_condition).values(update_columns) connection.execute(update_stmt) return None @@ -264,7 +263,7 @@ def activate_version(self, new_version: int) -> None: self.logger.info("Hard delete: %s", self.config.get("hard_delete")) if self.config["hard_delete"] is True: connection.execute( - sqlalchemy.text( + sa.text( f'DELETE FROM "{self.schema_name}"."{self.table_name}" ' # noqa: S608 f'WHERE "{self.version_column_name}" <= {new_version} ' f'OR "{self.version_column_name}" IS NULL' @@ -284,7 +283,7 @@ def activate_version(self, new_version: int) -> None: connection=connection, ) # Need to deal with the case where data doesn't exist for the version column - query = sqlalchemy.text( + query = sa.text( f'UPDATE "{self.schema_name}"."{self.table_name}"\n' f'SET "{self.soft_delete_column_name}" = :deletedate \n' f'WHERE "{self.version_column_name}" < :version ' @@ -292,16 +291,16 @@ def activate_version(self, new_version: int) -> None: f' AND "{self.soft_delete_column_name}" IS NULL\n' ) query = query.bindparams( - bindparam("deletedate", value=deleted_at, type_=datetime_type), - bindparam("version", value=new_version, type_=integer_type), + sa.bindparam("deletedate", value=deleted_at, type_=datetime_type), + sa.bindparam("version", value=new_version, type_=integer_type), ) connection.execute(query) def generate_insert_statement( self, full_table_name: str, - columns: List[Column], - ) -> Union[str, Executable]: + columns: List[sa.Column], + ) -> Union[str, sa.sql.Executable]: """Generate an insert statement for the given records. Args: @@ -312,6 +311,6 @@ def generate_insert_statement( An insert statement. """ # FIXME: - metadata = MetaData(schema=self.schema_name) - table = Table(full_table_name, metadata, *columns) - return insert(table) + metadata = sa.MetaData(schema=self.schema_name) + table = sa.Table(full_table_name, metadata, *columns) + return sa.insert(table) diff --git a/target_cratedb/tests/test_standard_target.py b/target_cratedb/tests/test_standard_target.py index 09b1e02..ca55faf 100644 --- a/target_cratedb/tests/test_standard_target.py +++ b/target_cratedb/tests/test_standard_target.py @@ -7,7 +7,6 @@ import jsonschema import pytest -import sqlalchemy import sqlalchemy as sa from crate.client.sqlalchemy.types import ObjectTypeImpl from singer_sdk.exceptions import MissingKeyPropertiesError @@ -87,7 +86,7 @@ def cratedb_target(cratedb_config) -> TargetCrateDB: return TargetCrateDB(config=cratedb_config) -def create_engine(target_cratedb: TargetCrateDB) -> sqlalchemy.engine.Engine: +def create_engine(target_cratedb: TargetCrateDB) -> sa.engine.Engine: engine = TargetCrateDB.default_sink_class.connector_class(config=target_cratedb.config)._engine polyfill_refresh_after_dml_engine(engine) return engine @@ -182,7 +181,7 @@ def test_port_default_config(): target_config = TargetCrateDB(config=config).config connector = CrateDBConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:5432/{database}" @@ -207,7 +206,7 @@ def test_port_config(): target_config = TargetCrateDB(config=config).config connector = CrateDBConnector(target_config) - engine: sqlalchemy.engine.Engine = connector._engine + engine: sa.engine.Engine = connector._engine assert ( engine.url.render_as_string(hide_password=False) == f"{dialect_driver}://{user}:{password}@{host}:4200/{database}" @@ -379,7 +378,7 @@ def test_no_primary_keys(cratedb_target, helper): table_name = "test_no_pk" full_table_name = cratedb_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) + connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) file_name = f"{table_name}.singer" singer_file_to_target(file_name, cratedb_target) @@ -417,7 +416,7 @@ def test_array_boolean(cratedb_target, helper): "array_boolean", check_columns={ "id": {"type": sa.BIGINT}, - "value": {"type": sqlalchemy.types.ARRAY}, + "value": {"type": sa.ARRAY}, }, ) @@ -448,7 +447,7 @@ def test_array_number(cratedb_target, helper): "array_number", check_columns={ "id": {"type": sa.BIGINT}, - "value": {"type": sqlalchemy.types.ARRAY}, + "value": {"type": sa.ARRAY}, }, ) @@ -462,7 +461,7 @@ def test_array_string(cratedb_target, helper): "array_string", check_columns={ "id": {"type": sa.BIGINT}, - "value": {"type": sqlalchemy.types.ARRAY}, + "value": {"type": sa.ARRAY}, }, ) @@ -476,7 +475,7 @@ def test_array_timestamp(cratedb_target, helper): "array_timestamp", check_columns={ "id": {"type": sa.BIGINT}, - "value": {"type": sqlalchemy.types.ARRAY}, + "value": {"type": sa.ARRAY}, }, ) @@ -566,8 +565,8 @@ def test_anyof(cratedb_target): schema = cratedb_target.config["default_target_schema"] singer_file_to_target(file_name, cratedb_target) with engine.connect() as connection: - meta = sqlalchemy.MetaData() - table = sqlalchemy.Table("commits", meta, schema=schema, autoload_with=connection) + meta = sa.MetaData() + table = sa.Table("commits", meta, schema=schema, autoload_with=connection) # ruff: noqa: ERA001 for column in table.c: # {"type":"string"} @@ -613,27 +612,27 @@ def test_activate_version_hard_delete(cratedb_config): engine = create_engine(pg_hard_delete_true) singer_file_to_target(file_name, pg_hard_delete_true) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')") + sa.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')") ) result = connection.execute( - sqlalchemy.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')") + sa.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')") ) # CrateDB-specific - connection.execute(sqlalchemy.text(f"REFRESH TABLE {full_table_name}")) + connection.execute(sa.text(f"REFRESH TABLE {full_table_name}")) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_hard_delete_true) # Should remove the 2 records we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 @@ -644,38 +643,38 @@ def test_activate_version_soft_delete(cratedb_target): file_name = f"{table_name}.singer" full_table_name = cratedb_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(cratedb_target._config) postgres_config_soft_delete["hard_delete"] = False pg_soft_delete = TargetCrateDB(config=postgres_config_soft_delete) singer_file_to_target(file_name, pg_soft_delete) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): # Add a record like someone would if they weren't using the tap target combo result = connection.execute( - sqlalchemy.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')") + sa.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual1', 'Meltano')") ) result = connection.execute( - sqlalchemy.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')") + sa.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')") ) # CrateDB-specific - connection.execute(sqlalchemy.text(f"REFRESH TABLE {full_table_name}")) + connection.execute(sa.text(f"REFRESH TABLE {full_table_name}")) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 singer_file_to_target(file_name, pg_soft_delete) # Should have all records including the 2 we added manually with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 result = connection.execute( - sqlalchemy.text(f'SELECT * FROM {full_table_name} where "{METADATA_COLUMN_PREFIX}_deleted_at" is NOT NULL') + sa.text(f'SELECT * FROM {full_table_name} where "{METADATA_COLUMN_PREFIX}_deleted_at" is NOT NULL') ) assert result.rowcount == 2 @@ -687,7 +686,7 @@ def test_activate_version_deletes_data_properly(cratedb_target): file_name = f"{table_name}.singer" full_table_name = cratedb_target.config["default_target_schema"] + "." + table_name with engine.connect() as connection, connection.begin(): - result = connection.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {full_table_name}")) + result = connection.execute(sa.text(f"DROP TABLE IF EXISTS {full_table_name}")) postgres_config_soft_delete = copy.deepcopy(cratedb_target._config) postgres_config_soft_delete["hard_delete"] = True @@ -695,19 +694,19 @@ def test_activate_version_deletes_data_properly(cratedb_target): singer_file_to_target(file_name, pg_hard_delete) # Will populate us with 7 records with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 7 with engine.connect() as connection, connection.begin(): result = connection.execute( - sqlalchemy.text(f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')") + sa.text(f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual1', 'Meltano')") ) result = connection.execute( - sqlalchemy.text(f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')") + sa.text(f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')") ) # CrateDB-specific - connection.execute(sqlalchemy.text(f"REFRESH TABLE {full_table_name}")) + connection.execute(sa.text(f"REFRESH TABLE {full_table_name}")) with engine.connect() as connection: - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 9 # Only has a schema and one activate_version message, should delete all records @@ -716,8 +715,8 @@ def test_activate_version_deletes_data_properly(cratedb_target): singer_file_to_target(file_name, pg_hard_delete) with engine.connect() as connection: # CrateDB-specific - connection.execute(sqlalchemy.text(f"REFRESH TABLE {full_table_name}")) - result = connection.execute(sqlalchemy.text(f"SELECT * FROM {full_table_name}")) + connection.execute(sa.text(f"REFRESH TABLE {full_table_name}")) + result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}")) assert result.rowcount == 0