Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for new BATCH message type in taps and targets #904

Merged
merged 18 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions docs/batch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# PREVIEW - Batch Messages (A.K.A. Fast Sync)
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved

```{warning}
The `BATCH` message functionality is currently in preview and is subject to change.
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
```

[The Singer message specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#output) defines the three basic types of messages: `RECORD`, `STATE`, and `SCHEMA`. The `RECORD` message is used to send data from the tap to the target. The `STATE` message is used to send state data from the tap to the target. The `SCHEMA` message is used to send schema data from the tap to the target, and for example, create tables with the correct column types.

However, the Singer specification can be extended to support additional types of messages. For example, the [`ACTIVATE_VERSION`](https://sdk.meltano.com/en/latest/capabilities.html#singer_sdk.helpers.capabilities.PluginCapabilities.ACTIVATE_VERSION) message is used to manage hard deletes in the target.

This library's implementation of the `BATCH` message is used to send records in bulk from the tap to the target, using an intermediate filesystem to store _batch_ files. This is useful, for example

- when the tap outputs records at a much higher rate than the target can consume them, creating backpressure
- when the source system can directly export data in bulk (e.g. a database dump)

Currently only a local filesystem is supported, but other filesystems like AWS S3, FTP, etc. could be supported in the future.

## The `BATCH` Message

```json
{
"type": "BATCH",
"stream": "users",
"encoding": {
"format": "jsonl",
"compression": "gzip"
},
"manifest": [
"path/to/batch/file/1",
"path/to/batch/file/2"
]
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
}
```

### `encoding`

The `encoding` field is used to specify the format and compression of the batch files. Currently only `jsonl` and `gzip` are supported, respectively.

### `manifest`

The `manifest` field is used to specify the paths to the batch files. The paths are relative to the `root` directory specified in the [`batch_config`](#batch-configuration) storage configuration.

## Batch configuration

The batch configuration is used to specify the root directory for the batch files, and the maximum number of records per batch file.
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

```json
{
"encoding": {
"format": "jsonl",
"compression": "gzip",
},
"storage": {
"root": "file://tests/core/resources",
"prefix": "test-batch-",
}
}
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved
```

## Custom batch file creation and processing

### Tap side

The tap can customize the batch file creation by implementing the [`get_batches`](singer_sdk.Stream.get_batches). This method should return a _tuple_ of an encoding and a list of batch files:
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

```python
class MyStream(Stream):
def get_batches(self, records):
return (
ParquetEncoding(compression="snappy"),
[
"s3://my-bucket/my-batch-file-1.parquet",
"s3://my-bucket/my-batch-file-2.parquet",
]
)
```

### Target side

The target can customize the batch file processing by implementing the [`process_batch_files`](singer_sdk.Sink.process_batch_files).
edgarrmondragon marked this conversation as resolved.
Show resolved Hide resolved

```python
class MySink(Sink):
def process_batch_files(self, encoding, storage, files):
# process the batch files
```
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Advanced Topics
parent_streams
partitioning
stream_maps
batch
porting
sinks
CONTRIBUTING
Expand Down
40 changes: 36 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pipelinewise-singer-python = "1.2.0"
backoff = ">=1.8.0,<2.0"
pendulum = "^2.1.0"
click = "~=8.0"
fs = "^2.4.16"
PyJWT = "~=2.4"
requests = "^2.25.1"
cryptography = ">=3.4.6,<39.0.0"
Expand Down Expand Up @@ -148,6 +149,7 @@ exclude_lines = [
"if __name__ == .__main__.:",
'''class .*\bProtocol\):''',
'''@(abc\.)?abstractmethod''',
"if TYPE_CHECKING:",
]
fail_under = 82

Expand Down
4 changes: 4 additions & 0 deletions samples/sample_tap_countries/countries_tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ def discover_streams(self) -> List[Stream]:
CountriesStream(tap=self),
ContinentsStream(tap=self),
]


if __name__ == "__main__":
SampleTapCountries.cli()
7 changes: 5 additions & 2 deletions samples/sample_target_sqlite/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""A sample implementation for SQLite."""

from typing import Any, Dict
from __future__ import annotations

from typing import Any

import sqlalchemy
from sqlalchemy.dialects.sqlite import insert

from singer_sdk import SQLConnector, SQLSink, SQLTarget
from singer_sdk import typing as th
Expand All @@ -20,7 +23,7 @@ class SQLiteConnector(SQLConnector):
allow_column_alter = False
allow_merge_upsert = True

def get_sqlalchemy_url(self, config: Dict[str, Any]) -> str:
def get_sqlalchemy_url(self, config: dict[str, Any]) -> str:
"""Generates a SQLAlchemy URL for SQLite."""
return f"sqlite:///{config[DB_PATH_CONFIG]}"

Expand Down
Loading