Skip to content

Commit

Permalink
Added custom streams based on provided sql query
Browse files Browse the repository at this point in the history
jlloyd-widen committed Jan 12, 2024
1 parent 0a4cd47 commit aebf20f
Showing 7 changed files with 1,271 additions and 578 deletions.
44 changes: 36 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
Built with the [Meltano Tap SDK](https://sdk.meltano.com) for Singer Taps.

The primary advantage of this version of `tap-mysql` is that it emphasizes and completes
the `LOG_BASED` replication method, whereas other variants have buggy or incomplete
the `LOG_BASED` replication method, whereas other variants have buggy or incomplete
implementations of such. Other advantages include inheriting the capabilities of a tap
built on the Meltano Tap SDK.

@@ -34,26 +34,52 @@ pipx install git+https://github.com/ORG_NAME/tap-mysql.git@main
### Accepted Config Options

<!--
Developer TODO: Provide a list of config options accepted by the tap.
This section can be created by copy-pasting the CLI output from:
```
tap-mysql --about --format=markdown
```
-->

| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:--------------------------------------------------------------------------------------------------------------------------------------------|
| host | True | None | The hostname of the MySQL instance. |
| port | False | 3306 | The port number of the MySQL instance. |
| user | True | None | The username |
| password | True | None | The password for the user |
| custom_streams | False | None | An array of customized streams to use. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| batch_config | False | None | |

A full list of supported settings and capabilities for this
tap is available by running:

```bash
tap-mysql --about
```

#### `custom_stream` configuration

Custom streams are defined in the `custom_streams` configuration option. This option is
an array of objects, each of which defines a custom stream. Each custom stream object
has the following properties:

| Property | Required | Default | Description |
|:-------------|:--------:|:-------:|:--------------------------------------------------------------------------------------------------------------------------------------------------------------|
| name | True | None | The name of the custom stream. |
| db_schemas | False | None | An array of schema names of the MySQL instance that is being queried. The same query will be run against each schema. |
| sql | True | None | The custom sql query to use for this stream. If provided, the string `{db_schema}` will be replaced with the schema name(s) from the `db_schemas` property.}` |
| primary_keys | False | None | The primary keys of the custom stream. |

### Configure using environment variables

This Singer tap will automatically import any environment variables within the working directory's
`.env` if the `--config=ENV` is provided, such that config values will be considered if a matching
This Singer tap will automatically import any environment variables within the working
directory's
`.env` if the `--config=ENV` is provided, such that config values will be considered if
a matching
environment variable is set either in the terminal context or in the `.env` file.

### Source Authentication and Authorization
@@ -64,7 +90,8 @@ Developer TODO: If your tap requires special access on the source system, or any

## Usage

You can easily run `tap-mysql` by itself or in a pipeline using [Meltano](https://meltano.com/).
You can easily run `tap-mysql` by itself or in a pipeline
using [Meltano](https://meltano.com/).

### Executing the Tap Directly

@@ -88,7 +115,7 @@ poetry install
### Create and Run Tests

Create tests within the `tests` subfolder and
then run:
then run:

```bash
poetry run pytest
@@ -132,5 +159,6 @@ meltano elt tap-mysql target-jsonl

### SDK Dev Guide

See the [dev guide](https://sdk.meltano.com/en/latest/dev_guide.html) for more instructions on how to use the SDK to
See the [dev guide](https://sdk.meltano.com/en/latest/dev_guide.html) for more
instructions on how to use the SDK to
develop your own taps and targets.
16 changes: 16 additions & 0 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -21,6 +21,22 @@ plugins:
- name: user
- name: password
kind: password
- name: custom_streams
kind: array
config:
custom_streams:
- name: example_table_name
db_schemas:
- example1
- example2
sql: SELECT * FROM {db_schema}.filesondisk LIMIT 5
- name: example_table_name2
db_schemas:
- example1
sql: SELECT * FROM example1.filesondisk LIMIT 5
select:
- '*-example_table_name.*'
- 'example1-example_table_name2.*'
loaders:
- name: target-jsonl
variant: andyh1203
34 changes: 34 additions & 0 deletions plugins/loaders/target-jsonl--andyh1203.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"plugin_type": "loaders",
"name": "target-jsonl",
"namespace": "target_jsonl",
"variant": "andyh1203",
"label": "JSON Lines (JSONL)",
"docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203",
"repo": "https://github.com/andyh1203/target-jsonl",
"pip_url": "target-jsonl",
"description": "JSONL loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png",
"settings": [
{
"name": "destination_path",
"kind": "string",
"value": "output",
"label": "Destination Path",
"description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n"
},
{
"name": "do_timestamp_file",
"kind": "boolean",
"value": false,
"label": "Include Timestamp in File Names",
"description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n"
},
{
"name": "custom_name",
"kind": "string",
"label": "Custom File Name Override",
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
1,496 changes: 962 additions & 534 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ packages = [

[tool.poetry.dependencies]
python = "<3.12,>=3.7.1"
singer-sdk = { version="^0.24.0" }
singer-sdk = { version="^0.34.0" }
fs-s3fs = { version = "^1.1.1", optional = true }
pymysql = "^1.0.3"

@@ -27,7 +27,7 @@ black = "^23.1.0"
pyupgrade = "^3.3.1"
mypy = "^1.0.0"
isort = "^5.11.5"
singer-sdk = { version="^0.24.0", extras = ["testing"] }
singer-sdk = { version="^0.34.0", extras = ["testing"] }
sqlalchemy-stubs = "^0.4"

[tool.poetry.extras]
185 changes: 152 additions & 33 deletions tap_mysql/client.py
Original file line number Diff line number Diff line change
@@ -6,8 +6,8 @@
from __future__ import annotations

import collections
import datetime
import itertools
import typing as t
from typing import Iterator, cast

import sqlalchemy
@@ -32,6 +32,9 @@
class MySQLConnector(SQLConnector):
"""Connects to the MySQL SQL source."""

def __init__(self, config: dict | None = None):
super().__init__(config, self.get_sqlalchemy_url(config))

def get_sqlalchemy_url(cls, config: dict) -> str:
"""Concatenate a SQLAlchemy URL for use in connecting to the source.
@@ -167,6 +170,8 @@ def discover_catalog_entries(self) -> list[dict]:
entries: list[dict] = []

with self._engine.connect() as connection:
# brake discovery into 2 queries for performance
# Get the table definition
table_query = text(
"""
SELECT
@@ -192,6 +197,7 @@ def discover_catalog_entries(self) -> list[dict]:

table_defs[mysql_schema][table] = {"is_view": table_type == "VIEW"}

# Get the column definitions
col_query = text(
"""
SELECT
@@ -209,31 +215,65 @@ def discover_catalog_entries(self) -> list[dict]:
, 'sys'
)
ORDER BY table_schema, table_name, column_name
-- LIMIT 40
"""
)
col_result = connection.execute(col_query)

# Parse data into useable python objects
# Parse data into useable python objects and append to entries
columns = []
rec = col_result.fetchone()
while rec is not None:
columns.append(Column(*rec))
rec = col_result.fetchone()

for k, cols in itertools.groupby(
columns, lambda c: (c.table_schema, c.table_name)
):
# cols = list(cols)
mysql_schema, table_name = k

entry = self.create_catalog_entry(
db_schema_name=mysql_schema,
table_name=table_name,
table_def=table_defs,
columns=cols,
)
entries.append(entry.to_dict())
for k, cols in itertools.groupby(
columns, lambda c: (c.table_schema, c.table_name)
):
mysql_schema, table_name = k

entry = self.create_catalog_entry(
db_schema_name=mysql_schema,
table_name=table_name,
table_def=table_defs,
columns=cols,
)
entries.append(entry.to_dict())

# append custom stream catalog entries
custom_streams = self.config.get("custom_streams")
for stream_config in custom_streams:
for table_schema in stream_config.get("db_schemas"):
table_name = stream_config.get("name")
primary_keys = stream_config.get("primary_keys")

query = text(
stream_config.get("sql").replace("{db_schema}", table_schema)
)
custom_result = connection.execute(query)
custom_rec = custom_result.fetchone()
# inject the table_schema into the list of columns
custom_rec_keys = list(custom_rec.keys()) + ["mysql_schema"]

custom_columns = []
for col in custom_rec_keys:
custom_columns.append(
Column(
table_schema=table_schema,
table_name=table_name,
column_name=col,
column_type="STRING",
is_nullable="YES",
column_key="PRI" if col in primary_keys else None,
)
)

entry = self.create_catalog_entry(
db_schema_name=table_schema,
table_name=table_name,
table_def={table_schema: {table_name: {"is_view": False}}},
columns=iter(custom_columns),
)
entries.append(entry.to_dict())

return entries

@@ -243,20 +283,99 @@ class MySQLStream(SQLStream):

connector_class = MySQLConnector

# def get_records(self, partition: dict | None) -> Iterable[dict[str, Any]]:
# """Return a generator of record-type dictionary objects.
#
# Developers may optionally add custom logic before calling the default
# implementation inherited from the base class.
#
# Args:
# partition: If provided, will read specifically from this data slice.
#
# Yields:
# One dict per record.
# """
# # Optionally, add custom logic instead of calling the super().
# # This is helpful if the source database provides batch-optimized record
# # retrieval.
# # If no overrides or optimizations are needed, you may delete this method.
# yield from super().get_records(partition)

class CustomMySQLStream(SQLStream):
"""Custom stream class for MySQL streams."""

connector_class = MySQLConnector
name = None
query = None

def __init__(
self,
tap,
catalog_entry: dict,
stream_config: dict,
mysql_schema: str,
) -> None:
"""Initialize the stream."""
super().__init__(
tap=tap,
catalog_entry=catalog_entry,
)
self.mysql_schema = mysql_schema
self.query = stream_config.get("sql").replace("{db_schema}", mysql_schema)

def get_records(self, context: dict | None) -> t.Iterable[dict[str, t.Any]]:
"""Return a generator of record-type dictionary objects.
If the stream has a replication_key value defined, records will be sorted by the
incremental key. If the stream also has an available starting bookmark, the
records will be filtered for values greater than or equal to the bookmark value.
Args:
context: If partition context is provided, will read specifically from this
data slice.
Yields:
One dict per record.
Raises:
NotImplementedError: If partition is passed in context and the stream does
not support partitioning.
"""
if context:
msg = f"Stream '{self.name}' does not support partitioning."
raise NotImplementedError(msg)

query = text(self.query)

if self.replication_key:
self.logger.info(
f"A replication key was provided but will be ignored for "
f"the custom stream '{self.tap_stream_id}'."
)

if self.ABORT_AT_RECORD_COUNT is not None:
# Limit record count to one greater than the abort threshold. This ensures
# `MaxRecordsLimitException` exception is properly raised by caller
# `Stream._sync_records()` if more records are available than can be
# processed.
query = query.limit(self.ABORT_AT_RECORD_COUNT + 1)

with self.connector._connect() as conn: # noqa: SLF001
for record in conn.execute(query).mappings():
# TODO: Standardize record mapping type
# https://github.com/meltano/sdk/issues/2096
transformed_record = self.post_process(dict(record))
if transformed_record is None:
# Record filtered out during post_process()
continue
yield transformed_record

def post_process(
self,
row: dict,
context: dict | None = None, # noqa: ARG002
) -> dict | None:
"""As needed, append or transform raw data to match expected structure.
Optional. This method gives developers an opportunity to "clean up" the results
prior to returning records to the downstream tap - for instance: cleaning,
renaming, or appending properties to the raw record result returned from the
API.
Developers may also return `None` from this method to filter out
invalid or not-applicable records from the stream.
Args:
row: Individual record in the stream.
context: Stream partition or context dictionary.
Returns:
The resulting record dict, or `None` if the record should be excluded.
"""
new_row = {k: str(v) for k, v in row.items()}
# inject the mysql_schema into the record
new_row["mysql_schema"] = self.mysql_schema
return new_row
70 changes: 69 additions & 1 deletion tap_mysql/tap.py
Original file line number Diff line number Diff line change
@@ -7,14 +7,46 @@
from singer_sdk import SQLTap, Stream
from singer_sdk import typing as th

from tap_mysql.client import MySQLStream
from tap_mysql.client import CustomMySQLStream, MySQLStream


class TapMySQL(SQLTap):
"""MySQL tap class."""

name = "tap-mysql"
default_stream_class = MySQLStream
custom_stream_class = CustomMySQLStream

custom_stream_config = th.PropertiesList(
th.Property(
"name",
th.StringType,
required=False,
description="The name of the custom stream",
),
th.Property(
"db_schemas",
th.ArrayType(th.StringType),
required=False,
description="An array of schema names of the MySQL instance that is being "
"queried. The same query will be run against each schema.",
),
th.Property(
"sql",
th.StringType,
required=False,
description="The custom sql query to use for this stream. If provided, the "
"string `{db_schema}` will be replaced with the schema name(s) "
"from the `db_schemas` property.}`",
),
th.Property(
"primary_keys",
th.ArrayType(th.StringType),
default=[],
required=False,
description="The primary keys of the custom stream.",
),
)

config_jsonschema = th.PropertiesList(
th.Property(
@@ -42,8 +74,44 @@ class TapMySQL(SQLTap):
secret=True,
description="The password for the user",
),
th.Property(
"custom_streams",
th.ArrayType(custom_stream_config),
required=False,
description="An array of customized streams to use.",
),
).to_dict()

def discover_streams(self) -> list[Stream]:
"""Initialize all available streams and return them as a list.
Returns:
List of discovered Stream objects.
"""
result: list[Stream] = []
custom_configs = self.config.get("custom_streams")
custom_stream_names = []
for stream in custom_configs:
for db_schema in stream.get("db_schemas"):
custom_stream_names.append(f"{db_schema}-{stream['name']}")

for catalog_entry in self.catalog_dict["streams"]:
stream_id = catalog_entry["tap_stream_id"]
# if it's a custom stream treat it differently
if stream_id in custom_stream_names:
for stream in custom_configs:
for db_schema in stream.get("db_schemas"):
if stream_id == f"{db_schema}-{stream['name']}":
result.append(
self.custom_stream_class(
self, catalog_entry, stream, db_schema
)
)
else:
result.append(self.default_stream_class(self, catalog_entry))

return result

# not supposed to do this but the logs of deselected streams are a drag
@final
def sync_all(self) -> None:

0 comments on commit aebf20f

Please sign in to comment.