From 65646fdb326c6bd2bd1e91a172fbdfc95d888005 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Mon, 29 Aug 2022 16:51:59 -0500 Subject: [PATCH 01/17] Add SDK Batch message type --- pyproject.toml | 1 + singer_sdk/helpers/_singer.py | 133 ++++++++++++++++-- singer_sdk/io_base.py | 18 ++- singer_sdk/mapper_base.py | 14 ++ singer_sdk/streams/core.py | 219 ++++++++++++++++++++++++----- singer_sdk/target_base.py | 11 ++ tests/core/test_singer_messages.py | 57 ++++++++ 7 files changed, 394 insertions(+), 59 deletions(-) create mode 100644 tests/core/test_singer_messages.py diff --git a/pyproject.toml b/pyproject.toml index 759ba85b1..9293be739 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -148,6 +148,7 @@ exclude_lines = [ "if __name__ == .__main__.:", '''class .*\bProtocol\):''', '''@(abc\.)?abstractmethod''', + "if TYPE_CHECKING:", ] fail_under = 82 diff --git a/singer_sdk/helpers/_singer.py b/singer_sdk/helpers/_singer.py index 580491d9e..bcdca9dc2 100644 --- a/singer_sdk/helpers/_singer.py +++ b/singer_sdk/helpers/_singer.py @@ -1,20 +1,40 @@ from __future__ import annotations +import enum import logging -from dataclasses import dataclass, fields -from enum import Enum -from typing import Any, Dict, Iterable, Tuple, Union, cast +import sys +from dataclasses import asdict, dataclass, field, fields +from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, Tuple, Union, cast from singer.catalog import Catalog as BaseCatalog from singer.catalog import CatalogEntry as BaseCatalogEntry +from singer.messages import Message from singer_sdk.helpers._schema import SchemaPlus +if TYPE_CHECKING: + from typing_extensions import TypeAlias + + if sys.version_info >= (3, 8): + from typing import Literal + else: + from typing_extensions import Literal + Breadcrumb = Tuple[str, ...] logger = logging.getLogger(__name__) +class SingerMessageType(str, enum.Enum): + """Singer specification message types.""" + + RECORD = "RECORD" + SCHEMA = "SCHEMA" + STATE = "STATE" + ACTIVATE_VERSION = "ACTIVATE_VERSION" + BATCH = "BATCH" + + class SelectionMask(Dict[Breadcrumb, bool]): """Boolean mask for property selection in schemas and records.""" @@ -35,7 +55,7 @@ def __missing__(self, breadcrumb: Breadcrumb) -> bool: class Metadata: """Base stream or property metadata.""" - class InclusionType(str, Enum): + class InclusionType(str, enum.Enum): """Catalog inclusion types.""" AVAILABLE = "available" @@ -51,8 +71,8 @@ def from_dict(cls, value: dict[str, Any]): """Parse metadata dictionary.""" return cls( **{ - field.name: value.get(field.name.replace("_", "-")) - for field in fields(cls) + object_field.name: value.get(object_field.name.replace("_", "-")) + for object_field in fields(cls) } ) @@ -60,10 +80,10 @@ def to_dict(self) -> dict[str, Any]: """Convert metadata to a JSON-encodeable dictionary.""" result = {} - for field in fields(self): - value = getattr(self, field.name) + for object_field in fields(self): + value = getattr(self, object_field.name) if value is not None: - result[field.name.replace("_", "-")] = value + result[object_field.name.replace("_", "-")] = value return result @@ -78,13 +98,16 @@ class StreamMetadata(Metadata): schema_name: str | None = None -class MetadataMapping(Dict[Breadcrumb, Union[Metadata, StreamMetadata]]): +AnyMetadata: TypeAlias = Union[Metadata, StreamMetadata] + + +class MetadataMapping(Dict[Breadcrumb, AnyMetadata]): """Stream metadata mapping.""" @classmethod def from_iterable(cls, iterable: Iterable[dict[str, Any]]): """Create a metadata mapping from an iterable of metadata dictionaries.""" - mapping = cls() + mapping: dict[Breadcrumb, AnyMetadata] = cls() for d in iterable: breadcrumb = tuple(d["breadcrumb"]) metadata = d["metadata"] @@ -280,3 +303,91 @@ def add_stream(self, entry: CatalogEntry) -> None: def get_stream(self, stream_id: str) -> CatalogEntry | None: """Retrieve a stream entry from the catalog.""" return self.get(stream_id) + + +class BatchFileFormat(str, enum.Enum): + """Batch file format.""" + + JSONL = "jsonl" + """JSON Lines format.""" + + +@dataclass +class BaseBatchFileEncoding: + """Base class for batch file encodings.""" + + registered_encodings: ClassVar[dict[str, type[BaseBatchFileEncoding]]] = {} + __encoding_format__: ClassVar[str] = "OVERRIDE_ME" + + # Base encoding fields + format: str = field(init=False) + """The format of the batch file.""" + + compression: str | None = None + """The compression of the batch file.""" + + def __init_subclass__(cls, **kwargs: Any) -> None: + """Register subclasses.""" + super().__init_subclass__(**kwargs) + cls.registered_encodings[cls.__encoding_format__] = cls + + def __post_init__(self) -> None: + self.format = self.__encoding_format__ + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> BaseBatchFileEncoding: + """Create an encoding from a dictionary.""" + encoding_format = data.pop("format") + encoding_cls = cls.registered_encodings[encoding_format] + return encoding_cls(**data) + + +@dataclass +class JSONLinesEncoding(BaseBatchFileEncoding): + """JSON Lines encoding for batch files.""" + + __encoding_format__ = "jsonl" + + +@dataclass +class SDKBatchMessage(Message): + """Singer batch message in the Meltano SDK flavor.""" + + type: Literal[SingerMessageType.BATCH] = field(init=False) + """The message type.""" + + stream: str + """The stream name.""" + + encoding: BaseBatchFileEncoding + """The file encoding of the batch.""" + + manifest: list[str] = field(default_factory=list) + """The manifest of files in the batch.""" + + def __post_init__(self): + if isinstance(self.encoding, dict): + self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) + + self.type = SingerMessageType.BATCH + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> SDKBatchMessage: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + data.pop("type") + return cls(**data) diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index f5da20d19..32f48ae36 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -3,7 +3,6 @@ from __future__ import annotations import abc -import enum import json import logging import sys @@ -12,19 +11,11 @@ from typing import Counter as CounterType from singer_sdk.helpers._compat import final +from singer_sdk.helpers._singer import SingerMessageType logger = logging.getLogger(__name__) -class SingerMessageType(str, enum.Enum): - """Singer specification message types.""" - - RECORD = "RECORD" - SCHEMA = "SCHEMA" - STATE = "STATE" - ACTIVATE_VERSION = "ACTIVATE_VERSION" - - class SingerReader(metaclass=abc.ABCMeta): """Interface for all plugins reading Singer messages from stdin.""" @@ -95,6 +86,9 @@ def _process_lines(self, file_input: IO[str]) -> CounterType[str]: elif record_type == SingerMessageType.STATE: self._process_state_message(line_dict) + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) + else: self._process_unknown_message(line_dict) @@ -118,6 +112,10 @@ def _process_state_message(self, message_dict: dict) -> None: def _process_activate_version_message(self, message_dict: dict) -> None: ... + @abc.abstractmethod + def _process_batch_message(self, message_dict: dict) -> None: + ... + def _process_unknown_message(self, message_dict: dict) -> None: """Internal method to process unknown message types from a Singer tap. diff --git a/singer_sdk/mapper_base.py b/singer_sdk/mapper_base.py index c09d39255..abe218e5a 100644 --- a/singer_sdk/mapper_base.py +++ b/singer_sdk/mapper_base.py @@ -50,6 +50,9 @@ def _process_state_message(self, message_dict: dict) -> None: def _process_activate_version_message(self, message_dict: dict) -> None: self._write_messages(self.map_activate_version_message(message_dict)) + def _process_batch_message(self, message_dict: dict) -> None: + self._write_messages(self.map_batch_message(message_dict)) + @abc.abstractmethod def map_schema_message(self, message_dict: dict) -> Iterable[singer.Message]: """Map a schema message to zero or more new messages. @@ -89,6 +92,17 @@ def map_activate_version_message( """ ... + def map_batch_message( + self, + message_dict: dict, + ) -> Iterable[singer.Message]: + """Map a version message to zero or more new messages. + + Args: + message_dict: An ACTIVATE_VERSION message JSON dictionary. + """ + pass + @classproperty def cli(cls) -> Callable: """Execute standard CLI handler for inline mappers. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index c843c0ede..6195e4cab 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -5,12 +5,28 @@ import abc import copy import datetime +import gzip +import itertools import json import logging +import os from os import PathLike from pathlib import Path from types import MappingProxyType -from typing import Any, Callable, Generator, Iterable, Mapping, TypeVar, cast +from typing import ( + Any, + Callable, + Generator, + Iterable, + Iterator, + List, + Mapping, + Optional, + Tuple, + TypeVar, + cast, +) +from uuid import uuid4 import pendulum import requests @@ -23,9 +39,12 @@ from singer_sdk.helpers._flattening import get_flattening_options from singer_sdk.helpers._schema import SchemaPlus from singer_sdk.helpers._singer import ( + BaseBatchFileEncoding, Catalog, CatalogEntry, + JSONLinesEncoding, MetadataMapping, + SDKBatchMessage, SelectionMask, ) from singer_sdk.helpers._state import ( @@ -50,10 +69,32 @@ REPLICATION_LOG_BASED = "LOG_BASED" FactoryType = TypeVar("FactoryType", bound="Stream") +_T = TypeVar("_T") METRICS_LOG_LEVEL_SETTING = "metrics_log_level" +def lazy_chunked_generator( + iterable: Iterable[_T], + chunk_size: int, +) -> Generator[Iterator[_T], None, None]: + """Yield a generator for each chunk of the given iterable. + + Args: + iterable: The iterable to chunk. + chunk_size: The size of each chunk. + + Yields: + A generator for each chunk of the given iterable. + """ + iterator = iter(iterable) + while True: + chunk = list(itertools.islice(iterator, chunk_size)) + if not chunk: + break + yield iter(chunk) + + class Stream(metaclass=abc.ABCMeta): """Abstract base class for tap streams.""" @@ -67,6 +108,9 @@ class Stream(metaclass=abc.ABCMeta): # Internal API cost aggregator _sync_costs: dict[str, int] = {} + # Batch attributes + batch_size: int = 100 + def __init__( self, tap: TapBaseClass, @@ -787,6 +831,25 @@ def _write_record_message(self, record: dict) -> None: for record_message in self._generate_record_messages(record): singer.write_message(record_message) + def _write_batch_message( + self, + encoding: BaseBatchFileEncoding, + manifest: List[str], + ) -> None: + """Write out a BATCH message. + + Args: + encoding: The encoding to use for the batch. + manifest: A list of filenames for the batch. + """ + singer.write_message( + SDKBatchMessage( + stream=self.name, + encoding=encoding, + manifest=manifest, + ) + ) + @property def _metric_logging_function(self) -> Callable | None: """Return the metrics logging function. @@ -952,16 +1015,48 @@ def finalize_state_progress_markers(self, state: dict | None = None) -> None: # Private sync methods: - def _sync_records( # noqa C901 # too complex - self, context: dict | None = None + def _process_record( + self, + record: dict, + child_context: Optional[dict] = None, + partition_context: Optional[dict] = None, ) -> None: + """Process a record. + + Args: + record: The record to process. + child_context: The child context. + partition_context: The partition context. + """ + partition_context = partition_context or {} + child_context = copy.copy( + self.get_child_context(record=record, context=child_context) + ) + for key, val in partition_context.items(): + # Add state context to records if not already present + if key not in record: + record[key] = val + + # Sync children, except when primary mapper filters out the record + if self.stream_maps[0].get_filter_result(record): + self._sync_children(child_context) + + def _sync_records( + self, + context: Optional[dict] = None, + write_messages: bool = True, + ) -> Generator[dict, Any, Any]: """Sync records, emitting RECORD and STATE messages. Args: context: Stream partition or context dictionary. + write_messages: Whether to write Singer messages to stdout. Raises: InvalidStreamSortException: TODO + + Yields: + Each record from the source. """ record_count = 0 current_context: dict | None @@ -978,44 +1073,45 @@ def _sync_records( # noqa C901 # too complex child_context: dict | None = ( None if current_context is None else copy.copy(current_context) ) + for record_result in self.get_records(current_context): if isinstance(record_result, tuple): # Tuple items should be the record and the child context record, child_context = record_result else: record = record_result - child_context = copy.copy( - self.get_child_context(record=record, context=child_context) - ) - for key, val in (state_partition_context or {}).items(): - # Add state context to records if not already present - if key not in record: - record[key] = val - - # Sync children, except when primary mapper filters out the record - if self.stream_maps[0].get_filter_result(record): - self._sync_children(child_context) - self._check_max_record_limit(record_count) + try: + self._process_record( + record, + child_context=child_context, + partition_context=state_partition_context, + ) + except InvalidStreamSortException as ex: + log_sort_error( + log_fn=self.logger.error, + ex=ex, + record_count=record_count + 1, + partition_record_count=partition_record_count + 1, + current_context=current_context, + state_partition_context=state_partition_context, + stream_name=self.name, + ) + raise ex + if selected: - if (record_count - 1) % self.STATE_MSG_FREQUENCY == 0: + if ( + record_count - 1 + ) % self.STATE_MSG_FREQUENCY == 0 and write_messages: self._write_state_message() - self._write_record_message(record) - try: - self._increment_stream_state(record, context=current_context) - except InvalidStreamSortException as ex: - log_sort_error( - log_fn=self.logger.error, - ex=ex, - record_count=record_count + 1, - partition_record_count=partition_record_count + 1, - current_context=current_context, - state_partition_context=state_partition_context, - stream_name=self.name, - ) - raise ex - - record_count += 1 - partition_record_count += 1 + if write_messages: + self._write_record_message(record) + self._increment_stream_state(record, context=current_context) + + yield record + + record_count += 1 + partition_record_count += 1 + if current_context == state_partition_context: # Finalize per-partition state only if 1:1 with context finalize_state_progress_markers(state) @@ -1024,8 +1120,20 @@ def _sync_records( # noqa C901 # too complex # Otherwise will be finalized by tap at end of sync. finalize_state_progress_markers(self.stream_state) self._write_record_count_log(record_count=record_count, context=context) - # Reset interim bookmarks before emitting final STATE message: - self._write_state_message() + + if write_messages: + # Reset interim bookmarks before emitting final STATE message: + self._write_state_message() + + def _sync_batches(self, context: Optional[dict] = None) -> None: + """Sync batches, emitting BATCH messages. + + Args: + context: Stream partition or context dictionary. + """ + for encoding, manifest in self.get_batches(context): + self._write_batch_message(encoding=encoding, manifest=manifest) + self._write_state_message() # Public methods ("final", not recommended to be overridden) @@ -1051,8 +1159,16 @@ def sync(self, context: dict | None = None) -> None: # Send a SCHEMA message to the downstream target: if self.selected: self._write_schema_message() - # Sync the records themselves: - self._sync_records(context) + + # TODO: This is a temporary hack to toggle BATCH mode during development. + batch_mode = os.getenv("SINGER_BATCH_MODE", "false") == "true" + + if batch_mode: + self._sync_batches(context=context) + else: + # Sync the records themselves: + for _ in self._sync_records(context=context): + pass def _sync_children(self, child_context: dict) -> None: for child_stream in self.child_streams: @@ -1161,7 +1277,34 @@ def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict] """ pass - def post_process(self, row: dict, context: dict | None = None) -> dict | None: + def get_batches( + self, + context: Optional[dict] = None, + ) -> Iterable[Tuple[BaseBatchFileEncoding, List[str]]]: + """Batch generator function. + + Developers are encouraged to override this method to customize batching + behavior for databases, bulk APIs, etc. + + Args: + context: Stream partition or context dictionary. + + Yields: + A tuple of (encoding, manifest) for each batch. + """ + for chunk in lazy_chunked_generator( + self._sync_records(context, write_messages=False), + self.batch_size, + ): + filename = f"output/{self.name}-{uuid4()}.json.gz" + + # TODO: Determine compression from config. + with gzip.open(filename, "wb") as f: + f.writelines((json.dumps(record) + "\n").encode() for record in chunk) + + yield JSONLinesEncoding(compression="gzip"), [filename] + + def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]: """As needed, append or transform raw data to match expected structure. Optional. This method gives developers an opportunity to "clean up" the results diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index 1b58e88d6..fdb99e8fc 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -16,6 +16,7 @@ from singer_sdk.exceptions import RecordsWithoutSchemaException from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._compat import final +from singer_sdk.helpers._singer import BaseBatchFileEncoding from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities from singer_sdk.io_base import SingerMessageType, SingerReader from singer_sdk.mapper import PluginMapper @@ -267,6 +268,7 @@ def _process_lines(self, file_input: IO[str]) -> Counter[str]: self.logger.info( f"Target '{self.name}' completed reading {line_count} lines of input " f"({counter[SingerMessageType.RECORD]} records, " + f"({counter[SingerMessageType.BATCH]} batch manifests, " f"{counter[SingerMessageType.STATE]} state messages)." ) @@ -401,6 +403,15 @@ def _process_activate_version_message(self, message_dict: dict) -> None: sink = self.get_sink(stream_name) sink.activate_version(message_dict["version"]) + def _process_batch_message(self, message_dict: dict) -> None: + """Handle the optional BATCH message extension. + + Args: + message_dict: TODO + """ + encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"]) + self.logger.info("Processing record batch encoded as %s", encoding) + # Sink drain methods @final diff --git a/tests/core/test_singer_messages.py b/tests/core/test_singer_messages.py new file mode 100644 index 000000000..185606631 --- /dev/null +++ b/tests/core/test_singer_messages.py @@ -0,0 +1,57 @@ +from dataclasses import asdict + +import pytest + +from singer_sdk.helpers._singer import ( + BaseBatchFileEncoding, + JSONLinesEncoding, + SDKBatchMessage, + SingerMessageType, +) + + +@pytest.mark.parametrize( + "encoding,expected", + [ + (JSONLinesEncoding("gzip"), {"compression": "gzip", "format": "jsonl"}), + (JSONLinesEncoding(), {"compression": None, "format": "jsonl"}), + ], + ids=["jsonl-compression-gzip", "jsonl-compression-none"], +) +def test_encoding_as_dict(encoding: BaseBatchFileEncoding, expected: dict) -> None: + """Test encoding as dict.""" + assert asdict(encoding) == expected + + +@pytest.mark.parametrize( + "message,expected", + [ + ( + SDKBatchMessage( + stream="test_stream", + encoding=JSONLinesEncoding("gzip"), + manifest=[ + "path/to/file1.jsonl.gz", + "path/to/file2.jsonl.gz", + ], + ), + { + "type": SingerMessageType.BATCH, + "stream": "test_stream", + "encoding": {"compression": "gzip", "format": "jsonl"}, + "manifest": [ + "path/to/file1.jsonl.gz", + "path/to/file2.jsonl.gz", + ], + }, + ) + ], + ids=["batch-message-jsonl"], +) +def test_batch_message_as_dict(message, expected): + """Test batch message as dict.""" + + dumped = message.asdict() + assert dumped == expected + + assert message.from_dict(dumped) == message From 478a3e88959f044c7df45c8b84385b72647d03d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Tue, 30 Aug 2022 18:28:33 -0500 Subject: [PATCH 02/17] Fix docstring --- singer_sdk/mapper_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/mapper_base.py b/singer_sdk/mapper_base.py index abe218e5a..dca03cb78 100644 --- a/singer_sdk/mapper_base.py +++ b/singer_sdk/mapper_base.py @@ -99,7 +99,7 @@ def map_batch_message( """Map a version message to zero or more new messages. Args: - message_dict: An ACTIVATE_VERSION message JSON dictionary. + message_dict: A BATCH message JSON dictionary. """ pass From 64fbc197a2d48dc132167d018a6ec7d13ca2706f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Tue, 30 Aug 2022 21:47:43 -0500 Subject: [PATCH 03/17] Restore _check_max_record_limit call --- singer_sdk/streams/core.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 6195e4cab..b7ba7f001 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -1098,6 +1098,8 @@ def _sync_records( ) raise ex + self._check_max_record_limit(record_count) + if selected: if ( record_count - 1 From 700242d29ba327feb31b397a184889e8c678b5ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Wed, 31 Aug 2022 20:15:03 -0500 Subject: [PATCH 04/17] Set default batch size to 1000 --- singer_sdk/streams/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index b7ba7f001..fd6f20721 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -109,7 +109,8 @@ class Stream(metaclass=abc.ABCMeta): _sync_costs: dict[str, int] = {} # Batch attributes - batch_size: int = 100 + batch_size: int = 1000 + """Max number of records to write to each batch file.""" def __init__( self, From 1c237f10d9757a9853712681a21219446b7cd306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Wed, 31 Aug 2022 20:35:05 -0500 Subject: [PATCH 05/17] Implement Stream.get_batch_encoding --- singer_sdk/streams/core.py | 30 +++++++++++++++++++---- tests/core/test_countries_sync.py | 40 +++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index fd6f20721..1cc7c7ff6 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -1280,6 +1280,20 @@ def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict] """ pass + def get_batch_encoding(self, config: dict) -> BaseBatchFileEncoding: + """Return the batch file encoding for this stream. + + Encoding can be determined from the tap config or hardcoded to a specific + encoding supported by the tap. + + Args: + config: Tap configuration dictionary. + + Returns: + Batch file encoding for this stream. + """ + return JSONLinesEncoding(compression="gzip") + def get_batches( self, context: Optional[dict] = None, @@ -1295,17 +1309,23 @@ def get_batches( Yields: A tuple of (encoding, manifest) for each batch. """ - for chunk in lazy_chunked_generator( - self._sync_records(context, write_messages=False), - self.batch_size, + encoding = self.get_batch_encoding(self.config) + prefix = f"{self.tap_name}--{self.name}-{uuid4()}" + + for i, chunk in enumerate( + lazy_chunked_generator( + self._sync_records(context, write_messages=False), + self.batch_size, + ), + start=1, ): - filename = f"output/{self.name}-{uuid4()}.json.gz" + filename = f".output/{prefix}-{i}.json.gz" # TODO: Determine compression from config. with gzip.open(filename, "wb") as f: f.writelines((json.dumps(record) + "\n").encode() for record in chunk) - yield JSONLinesEncoding(compression="gzip"), [filename] + yield encoding, [filename] def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]: """As needed, append or transform raw data to match expected structure. diff --git a/tests/core/test_countries_sync.py b/tests/core/test_countries_sync.py index 39abc58fe..922a13a58 100644 --- a/tests/core/test_countries_sync.py +++ b/tests/core/test_countries_sync.py @@ -1,7 +1,12 @@ """Test sample sync.""" import copy +import io +import json import logging +from contextlib import redirect_stdout +from re import I +from typing import Counter from samples.sample_tap_countries.countries_tap import SampleTapCountries from singer_sdk.helpers._catalog import ( @@ -82,3 +87,38 @@ def test_with_catalog_entry(): logger=logging.getLogger(), ) assert new_schema == stream.schema + + +def test_batch_mode(monkeypatch, outdir): + """Test batch mode.""" + tap = SampleTapCountries(config=None) + tap.batch_size = 100 + + # TODO: This is a hack to get the tap to run in batch mode. + monkeypatch.setenv("SINGER_BATCH_MODE", "true") + + buf = io.StringIO() + with redirect_stdout(buf): + tap.sync_all() + + buf.seek(0) + lines = buf.read().splitlines() + messages = [json.loads(line) for line in lines] + + def tally_messages(messages: list) -> Counter: + """Tally messages.""" + return Counter( + (message["type"], message["stream"]) + if message["type"] != "STATE" + else (message["type"],) + for message in messages + ) + + counter = tally_messages(messages) + assert counter["SCHEMA", "continents"] == 1 + assert counter["BATCH", "continents"] == 1 + + assert counter["SCHEMA", "countries"] == 1 + assert counter["BATCH", "countries"] == 3 + + assert counter[("STATE",)] == 4 From 75e2af8206333723b96c0329ecb0a162c0aff398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Wed, 31 Aug 2022 22:20:31 -0500 Subject: [PATCH 06/17] Implement Stream.get_batch_config --- singer_sdk/helpers/_singer.py | 70 +++++++++++++++++++++++++++++++ singer_sdk/streams/core.py | 24 +++++------ tests/core/test_countries_sync.py | 20 +++++++-- 3 files changed, 98 insertions(+), 16 deletions(-) diff --git a/singer_sdk/helpers/_singer.py b/singer_sdk/helpers/_singer.py index bcdca9dc2..12e4e217c 100644 --- a/singer_sdk/helpers/_singer.py +++ b/singer_sdk/helpers/_singer.py @@ -337,6 +337,7 @@ def __post_init__(self) -> None: @classmethod def from_dict(cls, data: dict[str, Any]) -> BaseBatchFileEncoding: """Create an encoding from a dictionary.""" + data = data.copy() encoding_format = data.pop("format") encoding_cls = cls.registered_encodings[encoding_format] return encoding_cls(**data) @@ -391,3 +392,72 @@ def from_dict(cls, data: dict[str, Any]) -> SDKBatchMessage: """ data.pop("type") return cls(**data) + + +@dataclass +class StorageTarget: + """Storage target.""" + + root: str + """"The root directory of the storage target.""" + + prefix: str + """"The file prefix.""" + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> StorageTarget: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + return cls(**data) + + +@dataclass +class BatchConfig: + """Batch configuration.""" + + encoding: BaseBatchFileEncoding + """The encoding of the batch file.""" + + storage: StorageTarget + """The storage target of the batch file.""" + + def __post_init__(self): + if isinstance(self.encoding, dict): + self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) + + if isinstance(self.storage, dict): + self.storage = StorageTarget.from_dict(self.storage) + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> BatchConfig: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + return cls(**data) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 1cc7c7ff6..565db22ac 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -40,9 +40,9 @@ from singer_sdk.helpers._schema import SchemaPlus from singer_sdk.helpers._singer import ( BaseBatchFileEncoding, + BatchConfig, Catalog, CatalogEntry, - JSONLinesEncoding, MetadataMapping, SDKBatchMessage, SelectionMask, @@ -1280,19 +1280,16 @@ def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict] """ pass - def get_batch_encoding(self, config: dict) -> BaseBatchFileEncoding: - """Return the batch file encoding for this stream. - - Encoding can be determined from the tap config or hardcoded to a specific - encoding supported by the tap. + def get_batch_config(self, config: Mapping) -> BatchConfig: + """Return the batch config for this stream. Args: config: Tap configuration dictionary. Returns: - Batch file encoding for this stream. + Batch config for this stream. """ - return JSONLinesEncoding(compression="gzip") + return BatchConfig.from_dict(config["batch_config"]) def get_batches( self, @@ -1309,8 +1306,11 @@ def get_batches( Yields: A tuple of (encoding, manifest) for each batch. """ - encoding = self.get_batch_encoding(self.config) - prefix = f"{self.tap_name}--{self.name}-{uuid4()}" + batch_config = self.get_batch_config(self.config) + sync_id = f"{self.tap_name}--{self.name}-{uuid4()}" + + prefix = batch_config.storage.prefix + root = batch_config.storage.root for i, chunk in enumerate( lazy_chunked_generator( @@ -1319,13 +1319,13 @@ def get_batches( ), start=1, ): - filename = f".output/{prefix}-{i}.json.gz" + filename = f"{root}/{prefix}{sync_id}-{i}.json.gz" # TODO: Determine compression from config. with gzip.open(filename, "wb") as f: f.writelines((json.dumps(record) + "\n").encode() for record in chunk) - yield encoding, [filename] + yield batch_config.encoding, [filename] def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]: """As needed, append or transform raw data to match expected structure. diff --git a/tests/core/test_countries_sync.py b/tests/core/test_countries_sync.py index 922a13a58..9228c9287 100644 --- a/tests/core/test_countries_sync.py +++ b/tests/core/test_countries_sync.py @@ -91,8 +91,20 @@ def test_with_catalog_entry(): def test_batch_mode(monkeypatch, outdir): """Test batch mode.""" - tap = SampleTapCountries(config=None) - tap.batch_size = 100 + tap = SampleTapCountries( + config={ + "batch_config": { + "encoding": { + "format": "jsonl", + "compression": "gzip", + }, + "storage": { + "root": outdir, + "prefix": "pytest-countries-", + }, + } + } + ) # TODO: This is a hack to get the tap to run in batch mode. monkeypatch.setenv("SINGER_BATCH_MODE", "true") @@ -119,6 +131,6 @@ def tally_messages(messages: list) -> Counter: assert counter["BATCH", "continents"] == 1 assert counter["SCHEMA", "countries"] == 1 - assert counter["BATCH", "countries"] == 3 + assert counter["BATCH", "countries"] == 1 - assert counter[("STATE",)] == 4 + assert counter[("STATE",)] == 2 From f8aa6e531f77167dae12e1dd74872e69cdf46046 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Thu, 8 Sep 2022 16:22:43 -0500 Subject: [PATCH 07/17] Implement batch for sinks --- poetry.lock | 40 +++++++++- pyproject.toml | 1 + samples/sample_tap_countries/countries_tap.py | 4 + samples/sample_target_sqlite/__init__.py | 30 ++++++- singer_sdk/helpers/_singer.py | 36 ++++++++- singer_sdk/sinks/core.py | 73 +++++++++++++++--- singer_sdk/sinks/sql.py | 8 +- singer_sdk/streams/core.py | 44 ++++++----- singer_sdk/target_base.py | 14 +++- tests/core/resources/batch.1.jsonl.gz | Bin 0 -> 71 bytes tests/core/resources/batch.2.jsonl.gz | Bin 0 -> 71 bytes tests/core/test_countries_sync.py | 3 - tests/core/test_sqlite.py | 62 +++++++++++++++ 13 files changed, 267 insertions(+), 48 deletions(-) create mode 100644 tests/core/resources/batch.1.jsonl.gz create mode 100644 tests/core/resources/batch.2.jsonl.gz diff --git a/poetry.lock b/poetry.lock index d1665b2dd..a3b380eda 100644 --- a/poetry.lock +++ b/poetry.lock @@ -6,6 +6,14 @@ category = "main" optional = true python-versions = "*" +[[package]] +name = "appdirs" +version = "1.4.4" +description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." +category = "main" +optional = false +python-versions = "*" + [[package]] name = "arrow" version = "1.2.2" @@ -212,7 +220,7 @@ python-versions = ">=3.6" cffi = ">=1.12" [package.extras] -docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx-rtd-theme"] +docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx_rtd_theme"] docstest = ["pyenchant (>=1.6.11)", "sphinxcontrib-spelling (>=4.0.1)", "twine (>=1.12.0)"] pep8test = ["black", "flake8", "flake8-import-order", "pep8-naming"] sdist = ["setuptools-rust (>=0.11.4)"] @@ -307,6 +315,22 @@ python-versions = ">=3.6" [package.dependencies] python-dateutil = ">=2.7" +[[package]] +name = "fs" +version = "2.4.16" +description = "Python's filesystem abstraction layer" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +appdirs = ">=1.4.3,<1.5.0" +setuptools = "*" +six = ">=1.10,<2.0" + +[package.extras] +scandir = ["scandir (>=1.5,<2.0)"] + [[package]] name = "greenlet" version = "1.1.2" @@ -1262,8 +1286,8 @@ all = ["IPython", "IPython", "Pygments", "Pygments", "attrs", "cmake", "codecov" all-strict = ["IPython (==7.10.0)", "IPython (==7.23.1)", "Pygments (==2.0.0)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "cmake (==3.21.2)", "codecov (==2.0.15)", "colorama (==0.4.1)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.3.0)", "debugpy (==1.6.0)", "ipykernel (==5.2.0)", "ipykernel (==6.0.0)", "ipython-genutils (==0.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-client (==6.1.5)", "jupyter-client (==7.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "ninja (==1.10.2)", "pybind11 (==2.7.1)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==6.2.5)", "pytest-cov (==2.8.1)", "pytest-cov (==2.8.1)", "pytest-cov (==2.9.0)", "pytest-cov (==3.0.0)", "scikit-build (==0.11.1)", "six (==1.11.0)", "typing (==3.7.4)"] colors = ["Pygments", "Pygments", "colorama"] jupyter = ["IPython", "IPython", "attrs", "debugpy", "debugpy", "debugpy", "debugpy", "debugpy", "ipykernel", "ipykernel", "ipython-genutils", "jedi", "jinja2", "jupyter-client", "jupyter-client", "jupyter-core", "nbconvert"] -optional = ["IPython", "IPython", "Pygments", "Pygments", "attrs", "colorama", "debugpy", "debugpy", "debugpy", "debugpy", "debugpy", "ipykernel", "ipykernel", "ipython-genutils", "jedi", "jinja2", "jupyter-client", "jupyter-client", "jupyter-core", "nbconvert", "pyflakes", "tomli"] -optional-strict = ["IPython (==7.10.0)", "IPython (==7.23.1)", "Pygments (==2.0.0)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "colorama (==0.4.1)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.3.0)", "debugpy (==1.6.0)", "ipykernel (==5.2.0)", "ipykernel (==6.0.0)", "ipython-genutils (==0.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-client (==6.1.5)", "jupyter-client (==7.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "pyflakes (==2.2.0)", "tomli (==0.2.0)"] +optional = ["IPython", "IPython", "Pygments", "Pygments", "attrs", "colorama", "debugpy", "debugpy", "debugpy", "debugpy", "debugpy", "ipykernel", "ipykernel", "ipython-genutils", "jedi", "jinja2", "jupyter-client", "jupyter-client", "jupyter-core", "nbconvert", "tomli"] +optional-strict = ["IPython (==7.10.0)", "IPython (==7.23.1)", "Pygments (==2.0.0)", "Pygments (==2.4.1)", "attrs (==19.2.0)", "colorama (==0.4.1)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.0.0)", "debugpy (==1.3.0)", "debugpy (==1.6.0)", "ipykernel (==5.2.0)", "ipykernel (==6.0.0)", "ipython-genutils (==0.2.0)", "jedi (==0.16)", "jinja2 (==3.0.0)", "jupyter-client (==6.1.5)", "jupyter-client (==7.0.0)", "jupyter-core (==4.7.0)", "nbconvert (==6.0.0)", "tomli (==0.2.0)"] runtime-strict = ["six (==1.11.0)"] tests = ["cmake", "codecov", "ninja", "pybind11", "pytest", "pytest", "pytest", "pytest", "pytest", "pytest", "pytest-cov", "pytest-cov", "pytest-cov", "pytest-cov", "scikit-build", "typing"] tests-strict = ["cmake (==3.21.2)", "codecov (==2.0.15)", "ninja (==1.10.2)", "pybind11 (==2.7.1)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==6.2.5)", "pytest-cov (==2.8.1)", "pytest-cov (==2.8.1)", "pytest-cov (==2.9.0)", "pytest-cov (==3.0.0)", "scikit-build (==0.11.1)", "typing (==3.7.4)"] @@ -1286,13 +1310,17 @@ docs = ["sphinx", "sphinx-rtd-theme", "sphinx-copybutton", "myst-parser", "sphin [metadata] lock-version = "1.1" python-versions = "<3.11,>=3.7.1" -content-hash = "13b5614e8ae831dfb3f6d9bb07ab8a43c16954be79487c55ddfa58098ee06d91" +content-hash = "61985ae9e6f47d8b374106f977297a42233fcaeb9fbc8dfb6a29f7ee4917b13c" [metadata.files] alabaster = [ {file = "alabaster-0.7.12-py2.py3-none-any.whl", hash = "sha256:446438bdcca0e05bd45ea2de1668c1d9b032e1a9154c2c259092d77031ddd359"}, {file = "alabaster-0.7.12.tar.gz", hash = "sha256:a661d72d58e6ea8a57f7a86e37d86716863ee5e92788398526d58b26a4e4dc02"}, ] +appdirs = [ + {file = "appdirs-1.4.4-py2.py3-none-any.whl", hash = "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128"}, + {file = "appdirs-1.4.4.tar.gz", hash = "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41"}, +] arrow = [ {file = "arrow-1.2.2-py3-none-any.whl", hash = "sha256:d622c46ca681b5b3e3574fcb60a04e5cc81b9625112d5fb2b44220c36c892177"}, {file = "arrow-1.2.2.tar.gz", hash = "sha256:05caf1fd3d9a11a1135b2b6f09887421153b94558e5ef4d090b567b47173ac2b"}, @@ -1544,6 +1572,10 @@ freezegun = [ {file = "freezegun-1.2.2-py3-none-any.whl", hash = "sha256:ea1b963b993cb9ea195adbd893a48d573fda951b0da64f60883d7e988b606c9f"}, {file = "freezegun-1.2.2.tar.gz", hash = "sha256:cd22d1ba06941384410cd967d8a99d5ae2442f57dfafeff2fda5de8dc5c05446"}, ] +fs = [ + {file = "fs-2.4.16-py2.py3-none-any.whl", hash = "sha256:660064febbccda264ae0b6bace80a8d1be9e089e0a5eb2427b7d517f9a91545c"}, + {file = "fs-2.4.16.tar.gz", hash = "sha256:ae97c7d51213f4b70b6a958292530289090de3a7e15841e108fbe144f069d313"}, +] greenlet = [ {file = "greenlet-1.1.2-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:58df5c2a0e293bf665a51f8a100d3e9956febfbf1d9aaf8c0677cf70218910c6"}, {file = "greenlet-1.1.2-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:aec52725173bd3a7b56fe91bc56eccb26fbdff1386ef123abb63c84c5b43b63a"}, diff --git a/pyproject.toml b/pyproject.toml index 9293be739..1d4f4ab2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,7 @@ pipelinewise-singer-python = "1.2.0" backoff = ">=1.8.0,<2.0" pendulum = "^2.1.0" click = "~=8.0" +fs = "^2.4.16" PyJWT = "~=2.4" requests = "^2.25.1" cryptography = ">=3.4.6,<39.0.0" diff --git a/samples/sample_tap_countries/countries_tap.py b/samples/sample_tap_countries/countries_tap.py index 842686117..f2ff836c2 100644 --- a/samples/sample_tap_countries/countries_tap.py +++ b/samples/sample_tap_countries/countries_tap.py @@ -28,3 +28,7 @@ def discover_streams(self) -> List[Stream]: CountriesStream(tap=self), ContinentsStream(tap=self), ] + + +if __name__ == "__main__": + SampleTapCountries.cli() diff --git a/samples/sample_target_sqlite/__init__.py b/samples/sample_target_sqlite/__init__.py index d4e4372bd..bb565892e 100644 --- a/samples/sample_target_sqlite/__init__.py +++ b/samples/sample_target_sqlite/__init__.py @@ -1,8 +1,11 @@ """A sample implementation for SQLite.""" -from typing import Any, Dict +from __future__ import annotations + +from typing import Any import sqlalchemy +from sqlalchemy.dialects.sqlite import insert from singer_sdk import SQLConnector, SQLSink, SQLTarget from singer_sdk import typing as th @@ -20,7 +23,7 @@ class SQLiteConnector(SQLConnector): allow_column_alter = False allow_merge_upsert = True - def get_sqlalchemy_url(self, config: Dict[str, Any]) -> str: + def get_sqlalchemy_url(self, config: dict[str, Any]) -> str: """Generates a SQLAlchemy URL for SQLite.""" return f"sqlite:///{config[DB_PATH_CONFIG]}" @@ -53,6 +56,29 @@ class SQLiteSink(SQLSink): connector_class = SQLiteConnector + def generate_insert_statement(self, full_table_name: str, schema: dict) -> str: + """Generate an insert statement for the given table and schema. + + Args: + full_table_name: The full table name to insert into. + schema: The schema of the table. + """ + engine = self.connector.create_sqlalchemy_engine() + meta = sqlalchemy.MetaData(bind=engine) + table = sqlalchemy.Table(full_table_name, meta, autoload=True) + statement = insert(table) + + if self.key_properties: + statement = statement.on_conflict_do_update( + index_elements=table.primary_key.columns, + set_={ + column.name: getattr(statement.excluded, column.name) + for column in table.columns + }, + ) + + return statement + class SQLiteTarget(SQLTarget): """The Tap class for SQLite.""" diff --git a/singer_sdk/helpers/_singer.py b/singer_sdk/helpers/_singer.py index 12e4e217c..326c7d168 100644 --- a/singer_sdk/helpers/_singer.py +++ b/singer_sdk/helpers/_singer.py @@ -3,9 +3,22 @@ import enum import logging import sys +from contextlib import contextmanager from dataclasses import asdict, dataclass, field, fields -from typing import TYPE_CHECKING, Any, ClassVar, Dict, Iterable, Tuple, Union, cast - +from typing import ( + IO, + TYPE_CHECKING, + Any, + ClassVar, + Dict, + Generator, + Iterable, + Tuple, + Union, + cast, +) + +import fs from singer.catalog import Catalog as BaseCatalog from singer.catalog import CatalogEntry as BaseCatalogEntry from singer.messages import Message @@ -424,6 +437,25 @@ def from_dict(cls, data: dict[str, Any]) -> StorageTarget: """ return cls(**data) + @contextmanager + def open(self, filename: str, mode: str = "rb") -> Generator[IO, None, None]: + """Open a file in the storage target. + + Args: + filename: The filename to open. + mode: The mode to open the file in. + + Returns: + The opened file. + """ + filesystem = fs.open_fs(self.root, writeable=True, create=True) + fo = filesystem.open(filename, mode=mode) + try: + yield fo + finally: + fo.close() + filesystem.close() + @dataclass class BatchConfig: diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 1b5a50f0f..d5b5626eb 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -1,16 +1,27 @@ """Sink classes load data to a target.""" +from __future__ import annotations + import abc import datetime +import json import time +from gzip import GzipFile +from gzip import open as gzip_open from logging import Logger from types import MappingProxyType -from typing import Any, Dict, List, Mapping, Optional, Union +from typing import IO, Any, Mapping from dateutil import parser from jsonschema import Draft4Validator, FormatChecker from singer_sdk.helpers._compat import final +from singer_sdk.helpers._singer import ( + BaseBatchFileEncoding, + BatchConfig, + BatchFileFormat, + StorageTarget, +) from singer_sdk.helpers._typing import ( DatetimeErrorTreatmentEnum, get_datelike_property_type, @@ -34,8 +45,8 @@ def __init__( self, target: PluginBase, stream_name: str, - schema: Dict, - key_properties: Optional[List[str]], + schema: dict, + key_properties: list[str] | None, ) -> None: """Initialize target sink. @@ -47,7 +58,7 @@ def __init__( """ self.logger = target.logger self._config = dict(target.config) - self._pending_batch: Optional[dict] = None + self._pending_batch: dict | None = None self.stream_name = stream_name self.logger.info(f"Initializing target sink for stream '{stream_name}'...") self.schema = schema @@ -55,11 +66,11 @@ def __init__( self._add_sdc_metadata_to_schema() else: self._remove_sdc_metadata_from_schema() - self.records_to_drain: Union[List[dict], Any] = [] - self._context_draining: Optional[dict] = None - self.latest_state: Optional[dict] = None - self._draining_state: Optional[dict] = None - self.drained_state: Optional[dict] = None + self.records_to_drain: list[dict] | Any = [] + self._context_draining: dict | None = None + self.latest_state: dict | None = None + self._draining_state: dict | None = None + self.drained_state: dict | None = None self.key_properties = key_properties or [] # Tally counters @@ -163,6 +174,16 @@ def config(self) -> Mapping[str, Any]: """ return MappingProxyType(self._config) + @property + def batch_config(self) -> BatchConfig | None: + """Get batch configuration. + + Returns: + A frozen (read-only) config dictionary map. + """ + raw = self.config.get("batch_config") + return BatchConfig.from_dict(raw) if raw else None + @property def include_sdc_metadata_properties(self) -> bool: """Check if metadata columns should be added. @@ -260,7 +281,7 @@ def _remove_sdc_metadata_from_record(self, record: dict) -> None: # Record validation - def _validate_and_parse(self, record: Dict) -> Dict: + def _validate_and_parse(self, record: dict) -> dict: """Validate or repair the record, parsing to python-native types as needed. Args: @@ -276,7 +297,7 @@ def _validate_and_parse(self, record: Dict) -> Dict: return record def _parse_timestamps_in_record( - self, record: Dict, schema: Dict, treatment: DatetimeErrorTreatmentEnum + self, record: dict, schema: dict, treatment: DatetimeErrorTreatmentEnum ) -> None: """Parse strings to datetime.datetime values, repairing or erroring on failure. @@ -318,7 +339,7 @@ def _after_process_record(self, context: dict) -> None: # SDK developer overrides: - def preprocess_record(self, record: Dict, context: dict) -> dict: + def preprocess_record(self, record: dict, context: dict) -> dict: """Process incoming record and return a modified result. Args: @@ -410,3 +431,31 @@ def clean_up(self) -> None: should not be relied on, it's recommended to use a uuid as well. """ pass + + def process_batch_file( + self, + encoding: BaseBatchFileEncoding, + storage: StorageTarget, + path: str, + ) -> None: + """Process a batch file with the given batch context. + + Args: + encoding: The batch file encoding. + storage: The storage target. + path: The path to the batch file. + + Raises: + NotImplementedError: If the batch file encoding is not supported. + """ + file: GzipFile | IO + if encoding.format == BatchFileFormat.JSONL: + with storage.open(path) as file: + if encoding.compression == "gzip": + file = gzip_open(file) + context = {"records": [json.loads(line) for line in file]} + self.process_batch(context) + else: + raise NotImplementedError( + f"Unsupported batch encoding format: {encoding.format}" + ) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 5faaf2ca4..568cd969b 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -213,11 +213,11 @@ def bulk_insert_records( full_table_name, schema, ) + if isinstance(insert_sql, str): + insert_sql = sqlalchemy.text(insert_sql) + self.logger.info("Inserting with SQL: %s", insert_sql) - self.connector.connection.execute( - sqlalchemy.text(insert_sql), - records, - ) + self.connector.connection.execute(insert_sql, records) if isinstance(records, list): return len(records) # If list, we can quickly return record count. diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 565db22ac..9e0200feb 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -9,7 +9,6 @@ import itertools import json import logging -import os from os import PathLike from pathlib import Path from types import MappingProxyType @@ -1128,13 +1127,18 @@ def _sync_records( # Reset interim bookmarks before emitting final STATE message: self._write_state_message() - def _sync_batches(self, context: Optional[dict] = None) -> None: + def _sync_batches( + self, + batch_config: BatchConfig, + context: dict | None = None, + ) -> None: """Sync batches, emitting BATCH messages. Args: + batch_config: The batch configuration. context: Stream partition or context dictionary. """ - for encoding, manifest in self.get_batches(context): + for encoding, manifest in self.get_batches(batch_config, context): self._write_batch_message(encoding=encoding, manifest=manifest) self._write_state_message() @@ -1163,11 +1167,9 @@ def sync(self, context: dict | None = None) -> None: if self.selected: self._write_schema_message() - # TODO: This is a temporary hack to toggle BATCH mode during development. - batch_mode = os.getenv("SINGER_BATCH_MODE", "false") == "true" - - if batch_mode: - self._sync_batches(context=context) + batch_config = self.get_batch_config(self.config) + if batch_config: + self._sync_batches(batch_config, context=context) else: # Sync the records themselves: for _ in self._sync_records(context=context): @@ -1280,7 +1282,7 @@ def get_records(self, context: dict | None) -> Iterable[dict | tuple[dict, dict] """ pass - def get_batch_config(self, config: Mapping) -> BatchConfig: + def get_batch_config(self, config: Mapping) -> BatchConfig | None: """Return the batch config for this stream. Args: @@ -1289,28 +1291,28 @@ def get_batch_config(self, config: Mapping) -> BatchConfig: Returns: Batch config for this stream. """ - return BatchConfig.from_dict(config["batch_config"]) + raw = config.get("batch_config") + return BatchConfig.from_dict(raw) if raw else None def get_batches( self, - context: Optional[dict] = None, - ) -> Iterable[Tuple[BaseBatchFileEncoding, List[str]]]: + batch_config: BatchConfig, + context: dict | None = None, + ) -> Iterable[tuple[BaseBatchFileEncoding, list[str]]]: """Batch generator function. Developers are encouraged to override this method to customize batching behavior for databases, bulk APIs, etc. Args: + batch_config: Batch config for this stream. context: Stream partition or context dictionary. Yields: A tuple of (encoding, manifest) for each batch. """ - batch_config = self.get_batch_config(self.config) sync_id = f"{self.tap_name}--{self.name}-{uuid4()}" - prefix = batch_config.storage.prefix - root = batch_config.storage.root for i, chunk in enumerate( lazy_chunked_generator( @@ -1319,11 +1321,13 @@ def get_batches( ), start=1, ): - filename = f"{root}/{prefix}{sync_id}-{i}.json.gz" - - # TODO: Determine compression from config. - with gzip.open(filename, "wb") as f: - f.writelines((json.dumps(record) + "\n").encode() for record in chunk) + filename = f"{prefix}{sync_id}-{i}.json.gz" + with batch_config.storage.open(filename, "wb") as f: + # TODO: Determine compression from config. + with gzip.GzipFile(fileobj=f, mode="wb") as gz: + gz.writelines( + (json.dumps(record) + "\n").encode() for record in chunk + ) yield batch_config.encoding, [filename] diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index fdb99e8fc..bebfcd4c7 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -408,9 +408,21 @@ def _process_batch_message(self, message_dict: dict) -> None: Args: message_dict: TODO + + Raises: + RuntimeError: If the batch message can not be processed. """ + sink = self.get_sink(message_dict["stream"]) + if sink.batch_config is None: + raise RuntimeError( + f"Received BATCH message for stream '{sink.stream_name}' " + "but no batch config was provided." + ) + encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"]) - self.logger.info("Processing record batch encoded as %s", encoding) + + for file in message_dict["manifest"]: + sink.process_batch_file(encoding, sink.batch_config.storage, file) # Sink drain methods diff --git a/tests/core/resources/batch.1.jsonl.gz b/tests/core/resources/batch.1.jsonl.gz new file mode 100644 index 0000000000000000000000000000000000000000..393bd295385180d04526346243b67c18657f2499 GIT binary patch literal 71 zcmV-N0J#4jiwFo}RP|y217cxxV`wfhE^2dcZfpRnR?19KvQjYAQBcZD%uNL{m7Ft* dGK-aJxvC*jMo3b=nc1m8830Cjc5O8P006O%8eRYZ literal 0 HcmV?d00001 diff --git a/tests/core/resources/batch.2.jsonl.gz b/tests/core/resources/batch.2.jsonl.gz new file mode 100644 index 0000000000000000000000000000000000000000..c06fcd59dc751d7662d9a8cbabf2cc2f3eb0b499 GIT binary patch literal 71 zcmV-N0J#4jiwFqTRP|y217cxxV`wfiE^2dcZfpRnR?19KvQjYCQBcZD%uNL{m7MaE dlxn%EArdA?5{^ZA`8g>-IRJI(2Ol*6007Z48V~>g literal 0 HcmV?d00001 diff --git a/tests/core/test_countries_sync.py b/tests/core/test_countries_sync.py index 9228c9287..9161e2c06 100644 --- a/tests/core/test_countries_sync.py +++ b/tests/core/test_countries_sync.py @@ -106,9 +106,6 @@ def test_batch_mode(monkeypatch, outdir): } ) - # TODO: This is a hack to get the tap to run in batch mode. - monkeypatch.setenv("SINGER_BATCH_MODE", "true") - buf = io.StringIO() with redirect_stdout(buf): tap.sync_all() diff --git a/tests/core/test_sqlite.py b/tests/core/test_sqlite.py index b85ff7d67..9b8d5555a 100644 --- a/tests/core/test_sqlite.py +++ b/tests/core/test_sqlite.py @@ -1,6 +1,7 @@ """Typing tests.""" import json +import sqlite3 from copy import deepcopy from io import StringIO from pathlib import Path @@ -109,6 +110,24 @@ def sqlite_sample_target_soft_delete(sqlite_target_test_config): return SQLiteTarget(conf) +@pytest.fixture +def sqlite_sample_target_batch(sqlite_target_test_config): + """Get a sample target object with hard_delete disabled.""" + conf = sqlite_target_test_config + conf["batch_config"] = { + "encoding": { + "format": "jsonl", + "compression": "gzip", + }, + "storage": { + "root": "file://tests/core/resources", + "prefix": "test-batch-", + }, + } + + return SQLiteTarget(conf) + + def _discover_and_select_all(tap: SQLTap) -> None: """Discover catalog and auto-select all streams.""" for catalog_entry in tap.catalog_dict["streams"]: @@ -388,6 +407,49 @@ def test_sqlite_column_morph(sqlite_sample_target: SQLTarget): ) +def test_sqlite_process_batch_message( + sqlite_target_test_config: dict, + sqlite_sample_target_batch: SQLiteTarget, +): + """Test handling the batch message for the SQLite target. + + Test performs the following actions: + + - Sends a batch message for a table that doesn't exist (which should + have no effect) + """ + schema_message = { + "type": "SCHEMA", + "stream": "users", + "key_properties": ["id"], + "schema": { + "required": ["id"], + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": ["null", "string"]}, + }, + }, + } + batch_message = { + "type": "BATCH", + "stream": "users", + "encoding": {"format": "jsonl", "compression": "gzip"}, + "manifest": ["batch.1.jsonl.gz", "batch.2.jsonl.gz"], + } + tap_output = "\n".join([json.dumps(schema_message), json.dumps(batch_message)]) + + target_sync_test( + sqlite_sample_target_batch, + input=StringIO(tap_output), + finalize=True, + ) + db = sqlite3.connect(sqlite_target_test_config["path_to_db"]) + cursor = db.cursor() + cursor.execute("SELECT COUNT(*) as count FROM users") + assert cursor.fetchone()[0] == 4 + + def test_sqlite_column_no_morph(sqlite_sample_target: SQLTarget): """End-to-end-to-end test for SQLite tap and target. From 82531e43a3fd84e4b7380b1e3c417711f79d9c5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Thu, 8 Sep 2022 20:04:08 -0500 Subject: [PATCH 08/17] Remove custom SQLite insert --- samples/sample_target_sqlite/__init__.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/samples/sample_target_sqlite/__init__.py b/samples/sample_target_sqlite/__init__.py index bb565892e..11843a101 100644 --- a/samples/sample_target_sqlite/__init__.py +++ b/samples/sample_target_sqlite/__init__.py @@ -56,29 +56,6 @@ class SQLiteSink(SQLSink): connector_class = SQLiteConnector - def generate_insert_statement(self, full_table_name: str, schema: dict) -> str: - """Generate an insert statement for the given table and schema. - - Args: - full_table_name: The full table name to insert into. - schema: The schema of the table. - """ - engine = self.connector.create_sqlalchemy_engine() - meta = sqlalchemy.MetaData(bind=engine) - table = sqlalchemy.Table(full_table_name, meta, autoload=True) - statement = insert(table) - - if self.key_properties: - statement = statement.on_conflict_do_update( - index_elements=table.primary_key.columns, - set_={ - column.name: getattr(statement.excluded, column.name) - for column in table.columns - }, - ) - - return statement - class SQLiteTarget(SQLTarget): """The Tap class for SQLite.""" From 808ebfac8ee104feca0effcdaa846f8c4463863b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Thu, 8 Sep 2022 21:44:50 -0500 Subject: [PATCH 09/17] Fix type annotation --- singer_sdk/sinks/sql.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 568cd969b..5f37a0236 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -1,10 +1,11 @@ """Sink classes load data to SQL targets.""" from textwrap import dedent -from typing import Any, Dict, Iterable, List, Optional, Type +from typing import Any, Dict, Iterable, List, Optional, Type, Union import sqlalchemy from pendulum import now +from sqlalchemy.sql import Executable from sqlalchemy.sql.expression import bindparam from singer_sdk.plugin_base import PluginBase @@ -167,7 +168,7 @@ def generate_insert_statement( self, full_table_name: str, schema: dict, - ) -> str: + ) -> Union[str, Executable]: """Generate an insert statement for the given records. Args: From 3e81324448b947819919c5e27eaf3994ac2a511d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Fri, 9 Sep 2022 17:30:34 -0500 Subject: [PATCH 10/17] Refactor Sink.process_batch_file -> Sink.process_batch_files --- singer_sdk/sinks/core.py | 29 +++++++++++++++-------------- singer_sdk/target_base.py | 8 +++++--- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index d5b5626eb..470b4e6a6 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -10,7 +10,7 @@ from gzip import open as gzip_open from logging import Logger from types import MappingProxyType -from typing import IO, Any, Mapping +from typing import IO, Any, Mapping, Sequence from dateutil import parser from jsonschema import Draft4Validator, FormatChecker @@ -432,30 +432,31 @@ def clean_up(self) -> None: """ pass - def process_batch_file( + def process_batch_files( self, encoding: BaseBatchFileEncoding, storage: StorageTarget, - path: str, + files: Sequence[str], ) -> None: """Process a batch file with the given batch context. Args: encoding: The batch file encoding. storage: The storage target. - path: The path to the batch file. + files: The batch files to process. Raises: NotImplementedError: If the batch file encoding is not supported. """ file: GzipFile | IO - if encoding.format == BatchFileFormat.JSONL: - with storage.open(path) as file: - if encoding.compression == "gzip": - file = gzip_open(file) - context = {"records": [json.loads(line) for line in file]} - self.process_batch(context) - else: - raise NotImplementedError( - f"Unsupported batch encoding format: {encoding.format}" - ) + for path in files: + if encoding.format == BatchFileFormat.JSONL: + with storage.open(path) as file: + if encoding.compression == "gzip": + file = gzip_open(file) + context = {"records": [json.loads(line) for line in file]} + self.process_batch(context) + else: + raise NotImplementedError( + f"Unsupported batch encoding format: {encoding.format}" + ) diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index bebfcd4c7..d2c5631ba 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -420,9 +420,11 @@ def _process_batch_message(self, message_dict: dict) -> None: ) encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"]) - - for file in message_dict["manifest"]: - sink.process_batch_file(encoding, sink.batch_config.storage, file) + sink.process_batch_files( + encoding, + sink.batch_config.storage, + message_dict["manifest"], + ) # Sink drain methods From f84621dd9a060e34dc4b78a36d2726218a3f8011 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Fri, 9 Sep 2022 20:16:00 -0500 Subject: [PATCH 11/17] Address mapper feedback --- singer_sdk/mapper_base.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/singer_sdk/mapper_base.py b/singer_sdk/mapper_base.py index dca03cb78..c77a88966 100644 --- a/singer_sdk/mapper_base.py +++ b/singer_sdk/mapper_base.py @@ -96,12 +96,15 @@ def map_batch_message( self, message_dict: dict, ) -> Iterable[singer.Message]: - """Map a version message to zero or more new messages. + """Map a batch message to zero or more new messages. Args: message_dict: A BATCH message JSON dictionary. + + Raises: + NotImplementedError: if not implemented by subclass. """ - pass + raise NotImplementedError("BATCH messages are not supported by mappers.") @classproperty def cli(cls) -> Callable: From 3cb2ee34e3b1a4e8e4cdc7fca03fa0eac2ca2655 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Tue, 13 Sep 2022 17:51:57 -0500 Subject: [PATCH 12/17] docs: Add page for BATCH --- docs/batch.md | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++ docs/index.rst | 1 + 2 files changed, 87 insertions(+) create mode 100644 docs/batch.md diff --git a/docs/batch.md b/docs/batch.md new file mode 100644 index 000000000..6ef5f3f17 --- /dev/null +++ b/docs/batch.md @@ -0,0 +1,86 @@ +# PREVIEW - Batch Messages (A.K.A. Fast Sync) + +```{warning} +The `BATCH` message functionality is currently in preview and is subject to change. +``` + +[The Singer message specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#output) defines the three basic types of messages: `RECORD`, `STATE`, and `SCHEMA`. The `RECORD` message is used to send data from the tap to the target. The `STATE` message is used to send state data from the tap to the target. The `SCHEMA` message is used to send schema data from the tap to the target, and for example, create tables with the correct column types. + +However, the Singer specification can be extended to support additional types of messages. For example, the [`ACTIVATE_VERSION`](https://sdk.meltano.com/en/latest/capabilities.html#singer_sdk.helpers.capabilities.PluginCapabilities.ACTIVATE_VERSION) message is used to manage hard deletes in the target. + +This library's implementation of the `BATCH` message is used to send records in bulk from the tap to the target, using an intermediate filesystem to store _batch_ files. This is useful, for example + +- when the tap outputs records at a much higher rate than the target can consume them, creating backpressure +- when the source system can directly export data in bulk (e.g. a database dump) + +Currently only a local filesystem is supported, but other filesystems like AWS S3, FTP, etc. could be supported in the future. + +## The `BATCH` Message + +```json +{ + "type": "BATCH", + "stream": "users", + "encoding": { + "format": "jsonl", + "compression": "gzip" + }, + "manifest": [ + "path/to/batch/file/1", + "path/to/batch/file/2" + ] +} +``` + +### `encoding` + +The `encoding` field is used to specify the format and compression of the batch files. Currently only `jsonl` and `gzip` are supported, respectively. + +### `manifest` + +The `manifest` field is used to specify the paths to the batch files. The paths are relative to the `root` directory specified in the [`batch_config`](#batch-configuration) storage configuration. + +## Batch configuration + +The batch configuration is used to specify the root directory for the batch files, and the maximum number of records per batch file. + +```json +{ + "encoding": { + "format": "jsonl", + "compression": "gzip", + }, + "storage": { + "root": "file://tests/core/resources", + "prefix": "test-batch-", + } +} +``` + +## Custom batch file creation and processing + +### Tap side + +The tap can customize the batch file creation by implementing the [`get_batches`](singer_sdk.Stream.get_batches). This method should return a _tuple_ of an encoding and a list of batch files: + +```python +class MyStream(Stream): + def get_batches(self, records): + return ( + ParquetEncoding(compression="snappy"), + [ + "s3://my-bucket/my-batch-file-1.parquet", + "s3://my-bucket/my-batch-file-2.parquet", + ] + ) +``` + +### Target side + +The target can customize the batch file processing by implementing the [`process_batch_files`](singer_sdk.Sink.process_batch_files). + +```python +class MySink(Sink): + def process_batch_files(self, encoding, storage, files): + # process the batch files +``` diff --git a/docs/index.rst b/docs/index.rst index 19eed21ef..3b0baa77e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -64,6 +64,7 @@ Advanced Topics parent_streams partitioning stream_maps + batch porting sinks CONTRIBUTING From 55db88cf057e595bbbe8f37adbc5d963b41608d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Tue, 13 Sep 2022 18:00:00 -0500 Subject: [PATCH 13/17] Make linter happy --- singer_sdk/streams/core.py | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 9e0200feb..075aaf685 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -12,19 +12,7 @@ from os import PathLike from pathlib import Path from types import MappingProxyType -from typing import ( - Any, - Callable, - Generator, - Iterable, - Iterator, - List, - Mapping, - Optional, - Tuple, - TypeVar, - cast, -) +from typing import Any, Callable, Generator, Iterable, Iterator, Mapping, TypeVar, cast from uuid import uuid4 import pendulum @@ -834,7 +822,7 @@ def _write_record_message(self, record: dict) -> None: def _write_batch_message( self, encoding: BaseBatchFileEncoding, - manifest: List[str], + manifest: list[str], ) -> None: """Write out a BATCH message. @@ -1018,8 +1006,8 @@ def finalize_state_progress_markers(self, state: dict | None = None) -> None: def _process_record( self, record: dict, - child_context: Optional[dict] = None, - partition_context: Optional[dict] = None, + child_context: dict | None = None, + partition_context: dict | None = None, ) -> None: """Process a record. @@ -1043,7 +1031,7 @@ def _process_record( def _sync_records( self, - context: Optional[dict] = None, + context: dict | None = None, write_messages: bool = True, ) -> Generator[dict, Any, Any]: """Sync records, emitting RECORD and STATE messages. @@ -1331,7 +1319,7 @@ def get_batches( yield batch_config.encoding, [filename] - def post_process(self, row: dict, context: Optional[dict] = None) -> Optional[dict]: + def post_process(self, row: dict, context: dict | None = None) -> dict | None: """As needed, append or transform raw data to match expected structure. Optional. This method gives developers an opportunity to "clean up" the results From 03c8765eb2729a20030d89d20f45f47b7145556d Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 13 Sep 2022 16:32:06 -0700 Subject: [PATCH 14/17] Apply docs suggestions from code review --- docs/batch.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/batch.md b/docs/batch.md index 6ef5f3f17..9bc898387 100644 --- a/docs/batch.md +++ b/docs/batch.md @@ -1,7 +1,8 @@ -# PREVIEW - Batch Messages (A.K.A. Fast Sync) +# Batch Messages ```{warning} The `BATCH` message functionality is currently in preview and is subject to change. +You can [open an issue](https://github.com/meltano/sdk/issues) or [join the discussion](https://github.com/meltano/sdk/discussions/963) on GitHub to provide feedback during the preview period. ``` [The Singer message specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#output) defines the three basic types of messages: `RECORD`, `STATE`, and `SCHEMA`. The `RECORD` message is used to send data from the tap to the target. The `STATE` message is used to send state data from the tap to the target. The `SCHEMA` message is used to send schema data from the tap to the target, and for example, create tables with the correct column types. From 625e61921024b059805f1b4c507a357454995e2a Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 14 Sep 2022 11:08:51 -0500 Subject: [PATCH 15/17] Update docs/batch.md Co-authored-by: Aaron ("AJ") Steers --- docs/batch.md | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/docs/batch.md b/docs/batch.md index 9bc898387..3998a4551 100644 --- a/docs/batch.md +++ b/docs/batch.md @@ -45,15 +45,20 @@ The `manifest` field is used to specify the paths to the batch files. The paths The batch configuration is used to specify the root directory for the batch files, and the maximum number of records per batch file. -```json +In `config.json`: + +```js { - "encoding": { - "format": "jsonl", - "compression": "gzip", - }, - "storage": { - "root": "file://tests/core/resources", - "prefix": "test-batch-", + // ... + "batch_config": { + "encoding": { + "format": "jsonl", + "compression": "gzip", + }, + "storage": { + "root": "file://tests/core/resources", + "prefix": "test-batch-", + } } } ``` From bf06a9c0c92ef6bac78888989e3cb57279dd327e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Rami=CC=81rez=20Mondrago=CC=81n?= Date: Wed, 14 Sep 2022 17:46:09 -0500 Subject: [PATCH 16/17] Get target batch storage config from absolute manifest URLs --- singer_sdk/helpers/_batch.py | 244 +++++++++++++++++++++++++++++ singer_sdk/helpers/_singer.py | 202 +----------------------- singer_sdk/sinks/core.py | 27 ++-- singer_sdk/streams/core.py | 28 ++-- singer_sdk/target_base.py | 11 +- tests/core/test_batch.py | 40 +++++ tests/core/test_singer_messages.py | 21 +-- tests/core/test_sqlite.py | 15 +- 8 files changed, 328 insertions(+), 260 deletions(-) create mode 100644 singer_sdk/helpers/_batch.py create mode 100644 tests/core/test_batch.py diff --git a/singer_sdk/helpers/_batch.py b/singer_sdk/helpers/_batch.py new file mode 100644 index 000000000..161e0097c --- /dev/null +++ b/singer_sdk/helpers/_batch.py @@ -0,0 +1,244 @@ +"""Batch helpers.""" + +from __future__ import annotations + +import enum +import sys +from contextlib import contextmanager +from dataclasses import asdict, dataclass, field +from typing import IO, TYPE_CHECKING, Any, ClassVar, Generator +from urllib.parse import ParseResult, parse_qs, urlencode, urlparse + +import fs +from singer.messages import Message + +from singer_sdk.helpers._singer import SingerMessageType + +if TYPE_CHECKING: + from fs.base import FS + + if sys.version_info >= (3, 8): + from typing import Literal + else: + from typing_extensions import Literal + + +class BatchFileFormat(str, enum.Enum): + """Batch file format.""" + + JSONL = "jsonl" + """JSON Lines format.""" + + +@dataclass +class BaseBatchFileEncoding: + """Base class for batch file encodings.""" + + registered_encodings: ClassVar[dict[str, type[BaseBatchFileEncoding]]] = {} + __encoding_format__: ClassVar[str] = "OVERRIDE_ME" + + # Base encoding fields + format: str = field(init=False) + """The format of the batch file.""" + + compression: str | None = None + """The compression of the batch file.""" + + def __init_subclass__(cls, **kwargs: Any) -> None: + """Register subclasses. + + Args: + **kwargs: Keyword arguments. + """ + super().__init_subclass__(**kwargs) + cls.registered_encodings[cls.__encoding_format__] = cls + + def __post_init__(self) -> None: + """Post-init hook.""" + self.format = self.__encoding_format__ + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> BaseBatchFileEncoding: + """Create an encoding from a dictionary.""" + data = data.copy() + encoding_format = data.pop("format") + encoding_cls = cls.registered_encodings[encoding_format] + return encoding_cls(**data) + + +@dataclass +class JSONLinesEncoding(BaseBatchFileEncoding): + """JSON Lines encoding for batch files.""" + + __encoding_format__ = "jsonl" + + +@dataclass +class SDKBatchMessage(Message): + """Singer batch message in the Meltano SDK flavor.""" + + type: Literal[SingerMessageType.BATCH] = field(init=False) + """The message type.""" + + stream: str + """The stream name.""" + + encoding: BaseBatchFileEncoding + """The file encoding of the batch.""" + + manifest: list[str] = field(default_factory=list) + """The manifest of files in the batch.""" + + def __post_init__(self): + if isinstance(self.encoding, dict): + self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) + + self.type = SingerMessageType.BATCH + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> SDKBatchMessage: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + data.pop("type") + return cls(**data) + + +@dataclass +class StorageTarget: + """Storage target.""" + + root: str + """"The root directory of the storage target.""" + + prefix: str | None = None + """"The file prefix.""" + + params: dict = field(default_factory=dict) + """"The storage parameters.""" + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> StorageTarget: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + return cls(**data) + + @classmethod + def from_url(cls, url: ParseResult) -> StorageTarget: + """Create a storage target from a URL. + + Args: + url: The URL to create the storage target from. + + Returns: + The created storage target. + """ + new_url = url._replace(path="", query="") + return cls(root=new_url.geturl(), params=parse_qs(url.query)) + + @property + def fs_url(self) -> ParseResult: + """Get the storage target URL. + + Returns: + The storage target URL. + """ + return urlparse(self.root)._replace(query=urlencode(self.params)) + + @contextmanager + def fs(self, **kwargs: Any) -> Generator[FS, None, None]: + """Get a filesystem object for the storage target. + + Args: + kwargs: Additional arguments to pass ``f`.open_fs``. + + Returns: + The filesystem object. + """ + filesystem = fs.open_fs(self.fs_url.geturl(), **kwargs) + yield filesystem + filesystem.close() + + @contextmanager + def open(self, filename: str, mode: str = "rb") -> Generator[IO, None, None]: + """Open a file in the storage target. + + Args: + filename: The filename to open. + mode: The mode to open the file in. + + Returns: + The opened file. + """ + filesystem = fs.open_fs(self.root, writeable=True, create=True) + fo = filesystem.open(filename, mode=mode) + try: + yield fo + finally: + fo.close() + filesystem.close() + + +@dataclass +class BatchConfig: + """Batch configuration.""" + + encoding: BaseBatchFileEncoding + """The encoding of the batch file.""" + + storage: StorageTarget + """The storage target of the batch file.""" + + def __post_init__(self): + if isinstance(self.encoding, dict): + self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) + + if isinstance(self.storage, dict): + self.storage = StorageTarget.from_dict(self.storage) + + def asdict(self): + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> BatchConfig: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + return cls(**data) diff --git a/singer_sdk/helpers/_singer.py b/singer_sdk/helpers/_singer.py index 326c7d168..c756f84a6 100644 --- a/singer_sdk/helpers/_singer.py +++ b/singer_sdk/helpers/_singer.py @@ -2,36 +2,17 @@ import enum import logging -import sys -from contextlib import contextmanager -from dataclasses import asdict, dataclass, field, fields -from typing import ( - IO, - TYPE_CHECKING, - Any, - ClassVar, - Dict, - Generator, - Iterable, - Tuple, - Union, - cast, -) - -import fs +from dataclasses import dataclass, fields +from typing import TYPE_CHECKING, Any, Dict, Iterable, Tuple, Union, cast + from singer.catalog import Catalog as BaseCatalog from singer.catalog import CatalogEntry as BaseCatalogEntry -from singer.messages import Message from singer_sdk.helpers._schema import SchemaPlus if TYPE_CHECKING: from typing_extensions import TypeAlias - if sys.version_info >= (3, 8): - from typing import Literal - else: - from typing_extensions import Literal Breadcrumb = Tuple[str, ...] @@ -316,180 +297,3 @@ def add_stream(self, entry: CatalogEntry) -> None: def get_stream(self, stream_id: str) -> CatalogEntry | None: """Retrieve a stream entry from the catalog.""" return self.get(stream_id) - - -class BatchFileFormat(str, enum.Enum): - """Batch file format.""" - - JSONL = "jsonl" - """JSON Lines format.""" - - -@dataclass -class BaseBatchFileEncoding: - """Base class for batch file encodings.""" - - registered_encodings: ClassVar[dict[str, type[BaseBatchFileEncoding]]] = {} - __encoding_format__: ClassVar[str] = "OVERRIDE_ME" - - # Base encoding fields - format: str = field(init=False) - """The format of the batch file.""" - - compression: str | None = None - """The compression of the batch file.""" - - def __init_subclass__(cls, **kwargs: Any) -> None: - """Register subclasses.""" - super().__init_subclass__(**kwargs) - cls.registered_encodings[cls.__encoding_format__] = cls - - def __post_init__(self) -> None: - self.format = self.__encoding_format__ - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> BaseBatchFileEncoding: - """Create an encoding from a dictionary.""" - data = data.copy() - encoding_format = data.pop("format") - encoding_cls = cls.registered_encodings[encoding_format] - return encoding_cls(**data) - - -@dataclass -class JSONLinesEncoding(BaseBatchFileEncoding): - """JSON Lines encoding for batch files.""" - - __encoding_format__ = "jsonl" - - -@dataclass -class SDKBatchMessage(Message): - """Singer batch message in the Meltano SDK flavor.""" - - type: Literal[SingerMessageType.BATCH] = field(init=False) - """The message type.""" - - stream: str - """The stream name.""" - - encoding: BaseBatchFileEncoding - """The file encoding of the batch.""" - - manifest: list[str] = field(default_factory=list) - """The manifest of files in the batch.""" - - def __post_init__(self): - if isinstance(self.encoding, dict): - self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) - - self.type = SingerMessageType.BATCH - - def asdict(self): - """Return a dictionary representation of the message. - - Returns: - A dictionary with the defined message fields. - """ - return asdict(self) - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> SDKBatchMessage: - """Create an encoding from a dictionary. - - Args: - data: The dictionary to create the message from. - - Returns: - The created message. - """ - data.pop("type") - return cls(**data) - - -@dataclass -class StorageTarget: - """Storage target.""" - - root: str - """"The root directory of the storage target.""" - - prefix: str - """"The file prefix.""" - - def asdict(self): - """Return a dictionary representation of the message. - - Returns: - A dictionary with the defined message fields. - """ - return asdict(self) - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> StorageTarget: - """Create an encoding from a dictionary. - - Args: - data: The dictionary to create the message from. - - Returns: - The created message. - """ - return cls(**data) - - @contextmanager - def open(self, filename: str, mode: str = "rb") -> Generator[IO, None, None]: - """Open a file in the storage target. - - Args: - filename: The filename to open. - mode: The mode to open the file in. - - Returns: - The opened file. - """ - filesystem = fs.open_fs(self.root, writeable=True, create=True) - fo = filesystem.open(filename, mode=mode) - try: - yield fo - finally: - fo.close() - filesystem.close() - - -@dataclass -class BatchConfig: - """Batch configuration.""" - - encoding: BaseBatchFileEncoding - """The encoding of the batch file.""" - - storage: StorageTarget - """The storage target of the batch file.""" - - def __post_init__(self): - if isinstance(self.encoding, dict): - self.encoding = BaseBatchFileEncoding.from_dict(self.encoding) - - if isinstance(self.storage, dict): - self.storage = StorageTarget.from_dict(self.storage) - - def asdict(self): - """Return a dictionary representation of the message. - - Returns: - A dictionary with the defined message fields. - """ - return asdict(self) - - @classmethod - def from_dict(cls, data: dict[str, Any]) -> BatchConfig: - """Create an encoding from a dictionary. - - Args: - data: The dictionary to create the message from. - - Returns: - The created message. - """ - return cls(**data) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 470b4e6a6..dfd4afce1 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -11,17 +11,18 @@ from logging import Logger from types import MappingProxyType from typing import IO, Any, Mapping, Sequence +from urllib.parse import urlparse from dateutil import parser from jsonschema import Draft4Validator, FormatChecker -from singer_sdk.helpers._compat import final -from singer_sdk.helpers._singer import ( +from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, BatchConfig, BatchFileFormat, StorageTarget, ) +from singer_sdk.helpers._compat import final from singer_sdk.helpers._typing import ( DatetimeErrorTreatmentEnum, get_datelike_property_type, @@ -435,27 +436,35 @@ def clean_up(self) -> None: def process_batch_files( self, encoding: BaseBatchFileEncoding, - storage: StorageTarget, files: Sequence[str], ) -> None: """Process a batch file with the given batch context. Args: encoding: The batch file encoding. - storage: The storage target. files: The batch files to process. Raises: NotImplementedError: If the batch file encoding is not supported. """ file: GzipFile | IO + storage: StorageTarget | None = None + for path in files: + url = urlparse(path) + + if self.batch_config: + storage = self.batch_config.storage + else: + storage = StorageTarget.from_url(url) + if encoding.format == BatchFileFormat.JSONL: - with storage.open(path) as file: - if encoding.compression == "gzip": - file = gzip_open(file) - context = {"records": [json.loads(line) for line in file]} - self.process_batch(context) + with storage.fs(create=False) as fs: + with fs.open(url.path, mode="rb") as file: + if encoding.compression == "gzip": + file = gzip_open(file) + context = {"records": [json.loads(line) for line in file]} + self.process_batch(context) else: raise NotImplementedError( f"Unsupported batch encoding format: {encoding.format}" diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index 075aaf685..cd78c1ffc 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -21,17 +21,19 @@ from singer import RecordMessage, Schema, SchemaMessage, StateMessage from singer_sdk.exceptions import InvalidStreamSortException, MaxRecordsLimitException +from singer_sdk.helpers._batch import ( + BaseBatchFileEncoding, + BatchConfig, + SDKBatchMessage, +) from singer_sdk.helpers._catalog import pop_deselected_record_properties from singer_sdk.helpers._compat import final from singer_sdk.helpers._flattening import get_flattening_options from singer_sdk.helpers._schema import SchemaPlus from singer_sdk.helpers._singer import ( - BaseBatchFileEncoding, - BatchConfig, Catalog, CatalogEntry, MetadataMapping, - SDKBatchMessage, SelectionMask, ) from singer_sdk.helpers._state import ( @@ -1300,7 +1302,7 @@ def get_batches( A tuple of (encoding, manifest) for each batch. """ sync_id = f"{self.tap_name}--{self.name}-{uuid4()}" - prefix = batch_config.storage.prefix + prefix = batch_config.storage.prefix or "" for i, chunk in enumerate( lazy_chunked_generator( @@ -1310,14 +1312,16 @@ def get_batches( start=1, ): filename = f"{prefix}{sync_id}-{i}.json.gz" - with batch_config.storage.open(filename, "wb") as f: - # TODO: Determine compression from config. - with gzip.GzipFile(fileobj=f, mode="wb") as gz: - gz.writelines( - (json.dumps(record) + "\n").encode() for record in chunk - ) - - yield batch_config.encoding, [filename] + with batch_config.storage.fs() as fs: + with fs.open(filename, "wb") as f: + # TODO: Determine compression from config. + with gzip.GzipFile(fileobj=f, mode="wb") as gz: + gz.writelines( + (json.dumps(record) + "\n").encode() for record in chunk + ) + file_url = fs.geturl(filename) + + yield batch_config.encoding, [file_url] def post_process(self, row: dict, context: dict | None = None) -> dict | None: """As needed, append or transform raw data to match expected structure. diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d2c5631ba..0b21b04c5 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -14,9 +14,9 @@ from singer_sdk.cli import common_options from singer_sdk.exceptions import RecordsWithoutSchemaException +from singer_sdk.helpers._batch import BaseBatchFileEncoding from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._compat import final -from singer_sdk.helpers._singer import BaseBatchFileEncoding from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities from singer_sdk.io_base import SingerMessageType, SingerReader from singer_sdk.mapper import PluginMapper @@ -408,21 +408,12 @@ def _process_batch_message(self, message_dict: dict) -> None: Args: message_dict: TODO - - Raises: - RuntimeError: If the batch message can not be processed. """ sink = self.get_sink(message_dict["stream"]) - if sink.batch_config is None: - raise RuntimeError( - f"Received BATCH message for stream '{sink.stream_name}' " - "but no batch config was provided." - ) encoding = BaseBatchFileEncoding.from_dict(message_dict["encoding"]) sink.process_batch_files( encoding, - sink.batch_config.storage, message_dict["manifest"], ) diff --git a/tests/core/test_batch.py b/tests/core/test_batch.py new file mode 100644 index 000000000..740f6d281 --- /dev/null +++ b/tests/core/test_batch.py @@ -0,0 +1,40 @@ +from dataclasses import asdict +from urllib.parse import urlparse + +import pytest + +from singer_sdk.helpers._batch import ( + BaseBatchFileEncoding, + JSONLinesEncoding, + StorageTarget, +) + + +@pytest.mark.parametrize( + "encoding,expected", + [ + (JSONLinesEncoding("gzip"), {"compression": "gzip", "format": "jsonl"}), + (JSONLinesEncoding(), {"compression": None, "format": "jsonl"}), + ], + ids=["jsonl-compression-gzip", "jsonl-compression-none"], +) +def test_encoding_as_dict(encoding: BaseBatchFileEncoding, expected: dict) -> None: + """Test encoding as dict.""" + assert asdict(encoding) == expected + + +def test_storage_get_url(): + storage = StorageTarget("file://root_dir") + + with storage.fs(create=True) as fs: + url = fs.geturl("prefix--file.jsonl.gz") + assert url.startswith("file://") + assert url.replace("\\", "/").endswith("root_dir/prefix--file.jsonl.gz") + + +def test_storage_from_url(): + url = urlparse("s3://bucket/path/to/file?region=us-east-1") + target = StorageTarget.from_url(url) + assert target.root == "s3://bucket" + assert target.prefix is None + assert target.params == {"region": ["us-east-1"]} diff --git a/tests/core/test_singer_messages.py b/tests/core/test_singer_messages.py index 185606631..3858731a9 100644 --- a/tests/core/test_singer_messages.py +++ b/tests/core/test_singer_messages.py @@ -2,25 +2,8 @@ import pytest -from singer_sdk.helpers._singer import ( - BaseBatchFileEncoding, - JSONLinesEncoding, - SDKBatchMessage, - SingerMessageType, -) - - -@pytest.mark.parametrize( - "encoding,expected", - [ - (JSONLinesEncoding("gzip"), {"compression": "gzip", "format": "jsonl"}), - (JSONLinesEncoding(), {"compression": None, "format": "jsonl"}), - ], - ids=["jsonl-compression-gzip", "jsonl-compression-none"], -) -def test_encoding_as_dict(encoding: BaseBatchFileEncoding, expected: dict) -> None: - """Test encoding as dict.""" - assert asdict(encoding) == expected +from singer_sdk.helpers._batch import JSONLinesEncoding, SDKBatchMessage +from singer_sdk.helpers._singer import SingerMessageType @pytest.mark.parametrize( diff --git a/tests/core/test_sqlite.py b/tests/core/test_sqlite.py index 9b8d5555a..b6a24fd77 100644 --- a/tests/core/test_sqlite.py +++ b/tests/core/test_sqlite.py @@ -114,16 +114,6 @@ def sqlite_sample_target_soft_delete(sqlite_target_test_config): def sqlite_sample_target_batch(sqlite_target_test_config): """Get a sample target object with hard_delete disabled.""" conf = sqlite_target_test_config - conf["batch_config"] = { - "encoding": { - "format": "jsonl", - "compression": "gzip", - }, - "storage": { - "root": "file://tests/core/resources", - "prefix": "test-batch-", - }, - } return SQLiteTarget(conf) @@ -435,7 +425,10 @@ def test_sqlite_process_batch_message( "type": "BATCH", "stream": "users", "encoding": {"format": "jsonl", "compression": "gzip"}, - "manifest": ["batch.1.jsonl.gz", "batch.2.jsonl.gz"], + "manifest": [ + "file://tests/core/resources/batch.1.jsonl.gz", + "file://tests/core/resources/batch.2.jsonl.gz", + ], } tap_output = "\n".join([json.dumps(schema_message), json.dumps(batch_message)]) From 42d980be667c34a95a792e311323010c64c45710 Mon Sep 17 00:00:00 2001 From: "Edgar R. M" Date: Wed, 14 Sep 2022 18:45:25 -0500 Subject: [PATCH 17/17] Apply suggestions from code review Co-authored-by: Aaron ("AJ") Steers --- docs/batch.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/batch.md b/docs/batch.md index 3998a4551..77ca8ee7e 100644 --- a/docs/batch.md +++ b/docs/batch.md @@ -27,8 +27,8 @@ Currently only a local filesystem is supported, but other filesystems like AWS S "compression": "gzip" }, "manifest": [ - "path/to/batch/file/1", - "path/to/batch/file/2" + "file://path/to/batch/file/1", + "file://path/to/batch/file/2" ] } ``` @@ -43,7 +43,10 @@ The `manifest` field is used to specify the paths to the batch files. The paths ## Batch configuration -The batch configuration is used to specify the root directory for the batch files, and the maximum number of records per batch file. +When local storage is used, targets do no require special configuration to process `BATCH` messages. + +Taps may be configured to specify a root storage `root` directory, file path `prefix`, and `encoding` for batch files using a configuration like the below: + In `config.json`: @@ -67,7 +70,7 @@ In `config.json`: ### Tap side -The tap can customize the batch file creation by implementing the [`get_batches`](singer_sdk.Stream.get_batches). This method should return a _tuple_ of an encoding and a list of batch files: +Taps can optionally customize the batch file creation by implementing the [`get_batches`](singer_sdk.Stream.get_batches). This method should return a _tuple_ of an encoding and a list of batch files: ```python class MyStream(Stream): @@ -83,7 +86,7 @@ class MyStream(Stream): ### Target side -The target can customize the batch file processing by implementing the [`process_batch_files`](singer_sdk.Sink.process_batch_files). +Targets can optionally customize the batch file processing by implementing the [`process_batch_files`](singer_sdk.Sink.process_batch_files). ```python class MySink(Sink):