-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathcore.py
543 lines (442 loc) · 17.1 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
"""Sink classes load data to a target."""
from __future__ import annotations
import abc
import copy
import datetime
import json
import time
import typing as t
from gzip import GzipFile
from gzip import open as gzip_open
from types import MappingProxyType
from dateutil import parser
from jsonschema import Draft7Validator, FormatChecker
from singer_sdk.exceptions import MissingKeyPropertiesError
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
BatchFileFormat,
StorageTarget,
)
from singer_sdk.helpers._compat import final
from singer_sdk.helpers._typing import (
DatetimeErrorTreatmentEnum,
get_datelike_property_type,
handle_invalid_timestamp_in_record,
)
if t.TYPE_CHECKING:
from logging import Logger
from singer_sdk.target_base import Target
JSONSchemaValidator = Draft7Validator
class Sink(metaclass=abc.ABCMeta):
"""Abstract base class for target sinks."""
# max timestamp/datetime supported, used to reset invalid dates
logger: Logger
MAX_SIZE_DEFAULT = 10000
def __init__(
self,
target: Target,
stream_name: str,
schema: dict,
key_properties: list[str] | None,
) -> None:
"""Initialize target sink.
Args:
target: Target instance.
stream_name: Name of the stream to sink.
schema: Schema of the stream to sink.
key_properties: Primary key of the stream to sink.
"""
self.logger = target.logger
self.sync_started_at = target.initialized_at
self._config = dict(target.config)
self._pending_batch: dict | None = None
self.stream_name = stream_name
self.logger.info(
"Initializing target sink for stream '%s'...",
stream_name,
)
self.original_schema = copy.deepcopy(schema)
self.schema = schema
if self.include_sdc_metadata_properties:
self._add_sdc_metadata_to_schema()
else:
self._remove_sdc_metadata_from_schema()
self.records_to_drain: list[dict] | t.Any = []
self._context_draining: dict | None = None
self.latest_state: dict | None = None
self._draining_state: dict | None = None
self.drained_state: dict | None = None
self._key_properties = key_properties or []
# Tally counters
self._total_records_written: int = 0
self._total_dupe_records_merged: int = 0
self._total_records_read: int = 0
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0
self._validator = Draft7Validator(schema, format_checker=FormatChecker())
def _get_context(self, record: dict) -> dict: # noqa: ARG002
"""Return an empty dictionary by default.
NOTE: Future versions of the SDK may expand the available context attributes.
Args:
record: Individual record in the stream.
Returns:
TODO
"""
return {}
# Size properties
@property
def max_size(self) -> int:
"""Get max batch size.
Returns:
Max number of records to batch before `is_full=True`
"""
return self.MAX_SIZE_DEFAULT
@property
def current_size(self) -> int:
"""Get current batch size.
Returns:
The number of records to drain.
"""
return self._batch_records_read
@property
def is_full(self) -> bool:
"""Check against size limit.
Returns:
True if the sink needs to be drained.
"""
return self.current_size >= self.max_size
# Tally methods
@final
def tally_record_read(self, count: int = 1) -> None:
"""Increment the records read tally.
This method is called automatically by the SDK when records are read.
Args:
count: Number to increase record count by.
"""
self._total_records_read += count
self._batch_records_read += count
@final
def tally_record_written(self, count: int = 1) -> None:
"""Increment the records written tally.
This method is called automatically by the SDK after
:meth:`~singer_sdk.Sink.process_record()`
or :meth:`~singer_sdk.Sink.process_batch()`.
Args:
count: Number to increase record count by.
"""
self._total_records_written += count
@final
def tally_duplicate_merged(self, count: int = 1) -> None:
"""Increment the records merged tally.
This method should be called directly by the Target implementation.
Args:
count: Number to increase record count by.
"""
self._total_dupe_records_merged += count
self._batch_dupe_records_merged += count
# Properties
@property
def config(self) -> t.Mapping[str, t.Any]:
"""Get plugin configuration.
Returns:
A frozen (read-only) config dictionary map.
"""
return MappingProxyType(self._config)
@property
def batch_config(self) -> BatchConfig | None:
"""Get batch configuration.
Returns:
A frozen (read-only) config dictionary map.
"""
raw = self.config.get("batch_config")
return BatchConfig.from_dict(raw) if raw else None
@property
def include_sdc_metadata_properties(self) -> bool:
"""Check if metadata columns should be added.
Returns:
True if metadata columns should be added.
"""
return self.config.get("add_record_metadata", False)
@property
def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum:
"""Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL.
Returns:
TODO
"""
return DatetimeErrorTreatmentEnum.ERROR
@property
def key_properties(self) -> list[str]:
"""Return key properties.
Override this method to return a list of key properties in a format that is
compatible with the target.
Returns:
A list of stream key properties.
"""
return self._key_properties
# Record processing
def _add_sdc_metadata_to_record(
self,
record: dict,
message: dict,
context: dict,
) -> None:
"""Populate metadata _sdc columns from incoming record message.
Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
Args:
record: Individual record in the stream.
message: The record message.
context: Stream partition or context dictionary.
"""
record["_sdc_extracted_at"] = message.get("time_extracted")
record["_sdc_received_at"] = datetime.datetime.now(
tz=datetime.timezone.utc,
).isoformat()
record["_sdc_batched_at"] = (
context.get("batch_start_time", None)
or datetime.datetime.now(tz=datetime.timezone.utc)
).isoformat()
record["_sdc_deleted_at"] = record.get("_sdc_deleted_at")
record["_sdc_sequence"] = int(round(time.time() * 1000))
record["_sdc_table_version"] = message.get("version")
record["_sdc_sync_started_at"] = self.sync_started_at
def _add_sdc_metadata_to_schema(self) -> None:
"""Add _sdc metadata columns.
Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in (
"_sdc_extracted_at",
"_sdc_received_at",
"_sdc_batched_at",
"_sdc_deleted_at",
):
properties_dict[col] = {
"type": ["null", "string"],
"format": "date-time",
}
for col in ("_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at"):
properties_dict[col] = {"type": ["null", "integer"]}
def _remove_sdc_metadata_from_schema(self) -> None:
"""Remove _sdc metadata columns.
Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in (
"_sdc_extracted_at",
"_sdc_received_at",
"_sdc_batched_at",
"_sdc_deleted_at",
"_sdc_sequence",
"_sdc_table_version",
"_sdc_sync_started_at",
):
properties_dict.pop(col, None)
def _remove_sdc_metadata_from_record(self, record: dict) -> None:
"""Remove metadata _sdc columns from incoming record message.
Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
Args:
record: Individual record in the stream.
"""
record.pop("_sdc_extracted_at", None)
record.pop("_sdc_received_at", None)
record.pop("_sdc_batched_at", None)
record.pop("_sdc_deleted_at", None)
record.pop("_sdc_sequence", None)
record.pop("_sdc_table_version", None)
record.pop("_sdc_sync_started_at", None)
# Record validation
def _validate_and_parse(self, record: dict) -> dict:
"""Validate or repair the record, parsing to python-native types as needed.
Args:
record: Individual record in the stream.
Returns:
TODO
"""
self._validator.validate(record)
self._parse_timestamps_in_record(
record=record,
schema=self.schema,
treatment=self.datetime_error_treatment,
)
return record
def _singer_validate_message(self, record: dict) -> None:
"""Ensure record conforms to Singer Spec.
Args:
record: Record (after parsing, schema validations and transformations).
Raises:
MissingKeyPropertiesError: If record is missing one or more key properties.
"""
if any(key_property not in record for key_property in self._key_properties):
msg = (
f"Record is missing one or more key_properties. \n"
f"Key Properties: {self._key_properties}, "
f"Record Keys: {list(record.keys())}"
)
raise MissingKeyPropertiesError(
msg,
)
def _parse_timestamps_in_record(
self,
record: dict,
schema: dict,
treatment: DatetimeErrorTreatmentEnum,
) -> None:
"""Parse strings to datetime.datetime values, repairing or erroring on failure.
Attempts to parse every field that is of type date/datetime/time. If its value
is out of range, repair logic will be driven by the `treatment` input arg:
MAX, NULL, or ERROR.
Args:
record: Individual record in the stream.
schema: TODO
treatment: TODO
"""
for key, value in record.items():
if key not in schema["properties"]:
self.logger.warning("No schema for record field '%s'", key)
continue
datelike_type = get_datelike_property_type(schema["properties"][key])
if datelike_type:
date_val = value
try:
if value is not None:
date_val = parser.parse(date_val)
except parser.ParserError as ex:
date_val = handle_invalid_timestamp_in_record(
record,
[key],
date_val,
datelike_type,
ex,
treatment,
self.logger,
)
record[key] = date_val
def _after_process_record(self, context: dict) -> None:
"""Perform post-processing and record keeping. Internal hook.
Args:
context: Stream partition or context dictionary.
"""
self.logger.debug("Processed record: %s", context)
# SDK developer overrides:
def preprocess_record(self, record: dict, context: dict) -> dict: # noqa: ARG002
"""Process incoming record and return a modified result.
Args:
record: Individual record in the stream.
context: Stream partition or context dictionary.
Returns:
A new, processed record.
"""
return record
@abc.abstractmethod
def process_record(self, record: dict, context: dict) -> None:
"""Load the latest record from the stream.
Implementations may either load to the `context` dict for staging (the
default behavior for Batch types), or permanently write out to the target.
Anything appended to :attr:`singer_sdk.Sink.records_to_drain` will be
automatically passed to
:meth:`~singer_sdk.Sink.process_batch()` to be permanently written during the
process_batch operation.
If duplicates are merged, these can be tracked via
:meth:`~singer_sdk.Sink.tally_duplicate_merged()`.
Args:
record: Individual record in the stream.
context: Stream partition or context dictionary.
"""
def start_drain(self) -> dict:
"""Set and return `self._context_draining`.
Returns:
TODO
"""
self._context_draining = self._pending_batch or {}
self._pending_batch = None
return self._context_draining
@abc.abstractmethod
def process_batch(self, context: dict) -> None:
"""Process all records per the batch's `context` dictionary.
If duplicates are merged, these can optionally be tracked via
`tally_duplicate_merged()`.
Args:
context: Stream partition or context dictionary.
Raises:
NotImplementedError: If derived class does not override this method.
"""
msg = "No handling exists for process_batch()."
raise NotImplementedError(msg)
def mark_drained(self) -> None:
"""Reset `records_to_drain` and any other tracking."""
self.drained_state = self._draining_state
self._draining_state = None
self._context_draining = None
if self._batch_records_read:
self.tally_record_written(
self._batch_records_read - self._batch_dupe_records_merged,
)
self._batch_records_read = 0
def activate_version(self, new_version: int) -> None:
"""Bump the active version of the target table.
This method should be overridden by developers if a custom implementation is
expected.
Args:
new_version: The version number to activate.
"""
_ = new_version
self.logger.warning(
"ACTIVATE_VERSION message received but not implemented by this target. "
"Ignoring.",
)
def setup(self) -> None:
"""Perform any setup actions at the beginning of a Stream.
Setup is executed once per Sink instance, after instantiation. If a Schema
change is detected, a new Sink is instantiated and this method is called again.
"""
self.logger.info("Setting up %s", self.stream_name)
def clean_up(self) -> None:
"""Perform any clean up actions required at end of a stream.
Implementations should ensure that clean up does not affect resources
that may be in use from other instances of the same sink. Stream name alone
should not be relied on, it's recommended to use a uuid as well.
"""
self.logger.info("Cleaning up %s", self.stream_name)
def process_batch_files(
self,
encoding: BaseBatchFileEncoding,
files: t.Sequence[str],
) -> None:
"""Process a batch file with the given batch context.
Args:
encoding: The batch file encoding.
files: The batch files to process.
Raises:
NotImplementedError: If the batch file encoding is not supported.
"""
file: GzipFile | t.IO
storage: StorageTarget | None = None
for path in files:
head, tail = StorageTarget.split_url(path)
if self.batch_config:
storage = self.batch_config.storage
else:
storage = StorageTarget.from_url(head)
if encoding.format == BatchFileFormat.JSONL:
with storage.fs(create=False) as batch_fs, batch_fs.open(
tail,
mode="rb",
) as file:
open_file = (
gzip_open(file) if encoding.compression == "gzip" else file
)
context = {
"records": [
json.loads(line)
for line in open_file # type: ignore[attr-defined]
],
}
self.process_batch(context)
else:
msg = f"Unsupported batch encoding format: {encoding.format}"
raise NotImplementedError(msg)