Skip to content

Commit

Permalink
test pour VM DEV
Browse files Browse the repository at this point in the history
  • Loading branch information
qlevavasseur committed Jun 20, 2023
1 parent 831623e commit b83af66
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 29 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html),
and is generated by [Changie](https://github.com/miniscruff/changie).

## Unreleased
### Fixed
* Manage new workers connections with common offset
* Waiting for workers to finish their jobs bfore closing streams

## 0.6.6 - 2023-04-20
### Added
Expand Down
64 changes: 46 additions & 18 deletions target_bigquery/storage_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
NOTE: This is naive and will vary drastically based on network speed, for example on a GCP VM.
"""
import os
from threading import Lock
from time import sleep
from multiprocessing import Process
from multiprocessing.connection import Connection
from multiprocessing.dummy import Process as _Thread
Expand Down Expand Up @@ -46,13 +48,14 @@
from target_bigquery.core import BaseBigQuerySink, BaseWorker, Denormalized, storage_client_factory
from target_bigquery.proto_gen import proto_schema_factory_v2

logger = logging.getLogger(__name__)

# Stream specific constant
MAX_IN_FLIGHT = 15
"""Maximum number of concurrent requests per worker be processed by grpc before awaiting."""


Dispatcher = Callable[[types.AppendRowsRequest], writer.AppendRowsFuture]
StreamComponents = Tuple[str, writer.AppendRowsStream, Dispatcher]
StreamComponents = Tuple[str, writer.AppendRowsStream, Lock, Dispatcher]


def get_application_stream(client: BigQueryWriteClient, job: "Job") -> StreamComponents:
Expand All @@ -62,7 +65,8 @@ def get_application_stream(client: BigQueryWriteClient, job: "Job") -> StreamCom
write_stream = client.create_write_stream(parent=job.parent, write_stream=write_stream)
job.template.write_stream = write_stream.name
append_rows_stream = writer.AppendRowsStream(client, job.template)
rv = (write_stream.name, append_rows_stream)
lock = Lock()
rv = (write_stream.name, append_rows_stream, lock)
job.stream_notifier.send(rv)
return *rv, retry(
append_rows_stream.send,
Expand All @@ -78,7 +82,8 @@ def get_default_stream(client: BigQueryWriteClient, job: "Job") -> StreamCompone
**BigQueryWriteClient.parse_table_path(job.parent), stream="_default"
)
append_rows_stream = writer.AppendRowsStream(client, job.template)
rv = (job.template.write_stream, append_rows_stream)
lock = Lock()
rv = (job.template.write_stream, append_rows_stream, lock)
job.stream_notifier.send(rv)
return *rv, retry(
append_rows_stream.send,
Expand Down Expand Up @@ -122,14 +127,23 @@ def generate_template(message: Type[Message]):
return template


class Job(NamedTuple):
class Job():
parent: str
template: types.AppendRowsRequest
stream_notifier: Connection
data: types.ProtoRows
offset: int = 0
attempts: int = 1


def __init__(self,
parent,
template,
stream_notifier,
data):
"""Initialize the worker process."""
self.parent = parent
self.template = template
self.stream_notifier = stream_notifier
self.data = data

class StorageWriteBatchWorker(BaseWorker):
"""Worker process for the storage write API."""
Expand All @@ -141,6 +155,8 @@ def __init__(self, *args, **kwargs):
self.awaiting: List[writer.AppendRowsFuture] = []
self.cache: Dict[str, StreamComponents] = {}
self.max_errors_before_recycle = 5
self.offsets: Dict[str, int] = {}
self.logger=logger

def run(self):
"""Run the worker process."""
Expand All @@ -155,19 +171,25 @@ def run(self):
break
if job is None:
break
if job.parent not in self.cache:
if job.parent not in self.cache or self.cache[job.parent][1]._closed:
self.cache[job.parent] = self.get_stream_components(client, job)
write_stream, _, dispatch = cast(StreamComponents, self.cache[job.parent])
self.offsets[job.parent] = 0
write_stream, _, _, dispatch = cast(StreamComponents, self.cache[job.parent])

try:
if self.cache[job.parent][1]._closed:
raise Exception("Connection closed before locking.")
kwargs = {}
if write_stream.endswith("_default"):
kwargs["offset"] = None
kwargs["path"] = write_stream
else:
kwargs["offset"] = job.offset
kwargs["offset"] = self.offsets[job.parent]
self.awaiting.append(dispatch(generate_request(job.data, **kwargs)))

except Exception as exc:
job.attempts += 1
self.logger.info(f"job.attempts : {job.attempts}")
self.max_errors_before_recycle -= 1
if job.attempts > 3:
# TODO: add a metric for this + a DLQ & wrap exception type
Expand All @@ -186,20 +208,23 @@ def run(self):
else:
self.log_notifier.send(
f"[{self.ext_id}] Sent {len(job.data.serialized_rows)} rows to {write_stream}"
f" with offset {job.offset}."
f" with offset {self.offsets[job.parent]}."
)
self.offsets[job.parent] += len(job.data.serialized_rows)
if len(self.awaiting) > MAX_IN_FLIGHT:
self.wait()
finally:
self.queue.task_done()
# Wait for all in-flight requests to complete after poison pill
self.logger.info(f"[{self.ext_id}] : {self.offsets}")
self.wait(drain=True)
self.close_cached_streams()
self.logger.info("Worker process exiting.")
self.log_notifier.send("Worker process exiting.")

def close_cached_streams(self) -> None:
"""Close all cached streams."""
for _, stream, _ in self.cache.values():
for _, stream, _, _ in self.cache.values():
try:
stream.close()
except Exception as exc:
Expand Down Expand Up @@ -240,6 +265,7 @@ class StorageWriteProcessBatchWorker(StorageWriteBatchWorker, Process):

class BigQueryStorageWriteSink(BaseBigQuerySink):
MAX_WORKERS = os.cpu_count() * 2
MAX_JOBS_QUEUED = MAX_WORKERS * 2
WORKER_CAPACITY_FACTOR = 10
WORKER_CREATION_MIN_INTERVAL = 1.0

Expand Down Expand Up @@ -276,7 +302,6 @@ def __init__(
)
self.stream_notification, self.stream_notifier = target.pipe_cls(False)
self.template = generate_template(self.proto_schema)
self.offset = 0

@property
def proto_schema(self) -> Type[Message]:
Expand All @@ -300,17 +325,19 @@ def process_record(self, record: Dict[str, Any], context: Dict[str, Any]) -> Non
)

def process_batch(self, context: Dict[str, Any]) -> None:
while self.global_queue.qsize() >= self.MAX_JOBS_QUEUED:
self.logger.warn(f"Max jobs enqueued reached ({self.MAX_JOBS_QUEUED})")
sleep(1)

self.global_queue.put(
Job(
parent=self.parent,
template=self.template,
data=self.proto_rows,
stream_notifier=self.stream_notifier,
offset=self.offset,
)
)
self.increment_jobs_enqueued()
self.offset += len(self.proto_rows.serialized_rows)

def commit_streams(self) -> None:
while self.stream_notification.poll():
Expand All @@ -320,17 +347,18 @@ def commit_streams(self) -> None:
if not self.open_streams:
return
self.open_streams = [
(name, stream) for name, stream in self.open_streams if not name.endswith("_default")
(name, stream, lock) for name, stream, lock in self.open_streams if not name.endswith("_default")
]
if self.open_streams:
committer = storage_client_factory(self._credentials)
for name, stream in self.open_streams:
for name, stream, _ in self.open_streams:
#self.logger.info(f"YO !!!!!!!! : AFTER LOCK INSTANT KILL {name}")
stream.close()
committer.finalize_write_stream(name=name)
write = committer.batch_commit_write_streams(
types.BatchCommitWriteStreamsRequest(
parent=self.parent,
write_streams=[name for name, _ in self.open_streams],
write_streams=[name for name, _, _ in self.open_streams],
)
)
self.logger.info(f"Batch commit time: {write.commit_time}")
Expand Down
15 changes: 11 additions & 4 deletions target_bigquery/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# The above copyright notice and this permission notice shall be included in all copies or
# substantial portions of the Software.
"""BigQuery target class."""
import os
import copy
import time
import uuid
Expand Down Expand Up @@ -37,15 +38,18 @@
# Defaults for target worker pool parameters
MAX_WORKERS = 15
"""Maximum number of workers to spawn."""
MAX_JOBS_QUEUED = 30
"""Maximum number of jobs placed in the global queue to avoid memory overload."""
WORKER_CAPACITY_FACTOR = 5
"""Jobs enqueued must exceed the number of active workers times this number."""
WORKER_CREATION_MIN_INTERVAL = 5
"""Minimum time between worker creation attempts."""


class TargetBigQuery(Target):
"""Target for BigQuery."""

_MAX_RECORD_AGE_IN_MINUTES = 5.0

name = "target-bigquery"
config_jsonschema = th.PropertiesList(
th.Property(
Expand Down Expand Up @@ -484,6 +488,7 @@ def get_sink(

def drain_one(self, sink: Sink) -> None: # type: ignore
"""Drain a sink. Includes a hook to manage the worker pool and notifications."""
#self.logger.info(f"Jobs queued : {self.queue.qsize()} | Max nb jobs queued : {os.cpu_count() * 4} | Nb workers : {len(self.workers)} | Max nb workers : {os.cpu_count() * 2}")
self.resize_worker_pool()
while self.job_notification.poll():
ext_id = self.job_notification.recv()
Expand Down Expand Up @@ -519,13 +524,15 @@ def drain_all(self, is_endofpipe: bool = False) -> None: # type: ignore
for worker in self.workers:
if worker.is_alive():
self.queue.put(None)
for worker in self.workers:
while len(self.workers):
worker.join()
worker = self.workers.pop()
for sink in self._sinks_active.values():
sink.clean_up()
else:
for worker in self.workers:
worker.join()
for sink in self._sinks_active.values():
sink.pre_state_hook()
if state:
self._write_state_message(state)
self._write_state_message(state)
self._reset_max_record_age()
57 changes: 50 additions & 7 deletions target_bigquery/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,38 @@
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 2, "rep_key": 2}, "time_extracted": "2022-11-11T20:40:56.807796+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 3, "rep_key": 3}, "time_extracted": "2022-11-11T20:40:56.808289+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 4, "rep_key": 4}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 5, "rep_key": 5}, "time_extracted": "2022-11-11T20:40:56.807284+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 6, "rep_key": 6}, "time_extracted": "2022-11-11T20:40:56.807796+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 7, "rep_key": 7}, "time_extracted": "2022-11-11T20:40:56.808289+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 8, "rep_key": 8}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 9, "rep_key": 9}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 10, "rep_key": 10}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 11, "rep_key": 11}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 12, "rep_key": 12}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 13, "rep_key": 13}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 14, "rep_key": 14}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 15, "rep_key": 15}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 16, "rep_key": 16}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 17, "rep_key": 17}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
{"type": "RECORD", "stream": "{stream_name}", "record": {"id": 18, "rep_key": 18}, "time_extracted": "2022-11-11T20:40:56.808782+00:00"}
""".strip()


@pytest.mark.parametrize(
"method",
["batch_job", "streaming_insert", "storage_write_api", "gcs_stage"],
ids=["batch_job", "streaming_insert", "storage_write_api", "gcs_stage"],
[
#"batch_job",
#"streaming_insert",
"storage_write_api",
#"gcs_stage"
],
ids=[
#"batch_job",
#"streaming_insert",
"storage_write_api",
#"gcs_stage"
],
)
@pytest.mark.parametrize("batch_mode", [False, True], ids=["no_batch_mode", "batch_mode"])
def test_basic_sync(method, batch_mode):
Expand Down Expand Up @@ -73,7 +98,11 @@ def test_basic_sync(method, batch_mode):
**OPTS,
},
)

target.get_sink_class().MAX_WORKERS = 1
target.get_sink_class().MAX_JOBS_QUEUED = 2
target.get_sink_class().WORKER_CAPACITY_FACTOR = 1
target.get_sink_class().WORKER_CREATION_MIN_INTERVAL = 1

client = bigquery_client_factory(BigQueryCredentials(json=target.config["credentials_json"]))
try:
client.query(f"TRUNCATE TABLE {target.config['dataset']}.{table_name}").result()
Expand Down Expand Up @@ -102,7 +131,7 @@ def test_basic_sync(method, batch_mode):
]

assert len(records) == 5
assert len(records_2) == 5
assert len(records_2) == 19
assert records == [
{
"data": '{"Col_10_string":"9a164a0b-c6c7-46c5-8224-1ea0f7de485d","Col_11_datetime":"2022-11-11T13:40:56.806233+00:00","Col_12_int":1024423793,"Col_13_float":0.71921051655093,"Col_14_string":"c3630f45-660d-42c1-baf9-c58434d68786","Col_15_datetime":"2022-11-11T13:40:56.806238+00:00","Col_16_int":556025207,"Col_17_float":0.3965419777404805,"Col_18_string":"84ebc035-4b0c-40be-a9a3-61cbf7e0f01e","Col_19_datetime":"2022-11-11T13:40:56.806243+00:00","Col_20_int":661621821,"Col_21_float":0.37192361356880477,"Col_22_string":"41613825-9170-4a55-bfd6-c64222292573","Col_23_datetime":"2022-11-11T13:40:56.806248+00:00","Col_24_int":1716807152,"Col_25_float":0.9639895917683756,"Col_26_string":"87025878-196b-4990-9a65-ec06799fe34e","Col_27_datetime":"2022-11-11T13:40:56.806253+00:00","Col_28_int":542678613,"Col_29_float":0.4722333859761568,"Col_2_string":"55fcfce1-f7ce-4f90-8a54-e92de8dae40d","Col_3_datetime":"2022-11-11T13:40:56.806208+00:00","Col_4_int":1475701382,"Col_5_float":0.7920345506520963,"Col_6_string":"744dddf7-d07b-4b09-baf1-8b1e4914632b","Col_7_datetime":"2022-11-11T13:40:56.806225+00:00","Col_8_int":60612870,"Col_9_float":0.35286203712175723,"id":0,"rep_key":0}'
Expand All @@ -124,15 +153,25 @@ def test_basic_sync(method, batch_mode):

@pytest.mark.parametrize(
"method",
["batch_job", "streaming_insert", "gcs_stage", "storage_write_api"],
ids=["batch_job", "streaming_insert", "gcs_stage", "storage_write_api"],
[
#"batch_job",
#"streaming_insert",
#"gcs_stage",
"storage_write_api"
],
ids=[
#"batch_job",
#"streaming_insert",
#"gcs_stage",
"storage_write_api"
],
)
def test_basic_denorm_sync(method):
OPTS = {
"method": method,
"denormalized": True,
"generate_view": False,
"batch_size": 2, # force multiple batches
"batch_size": 3, # force multiple batches
}

table_name = OPTS["method"]
Expand All @@ -154,6 +193,10 @@ def test_basic_denorm_sync(method):
**OPTS,
},
)
target.get_sink_class().MAX_WORKERS = 1
target.get_sink_class().MAX_JOBS_QUEUED = 2
target.get_sink_class().WORKER_CAPACITY_FACTOR = 1
target.get_sink_class().WORKER_CREATION_MIN_INTERVAL = 1

client = bigquery_client_factory(BigQueryCredentials(json=target.config["credentials_json"]))
try:
Expand Down

0 comments on commit b83af66

Please sign in to comment.