diff --git a/.github/workflows/test_oonipipeline.yml b/.github/workflows/test_oonipipeline.yml index 371d334a..17fe6ea3 100644 --- a/.github/workflows/test_oonipipeline.yml +++ b/.github/workflows/test_oonipipeline.yml @@ -44,6 +44,11 @@ jobs: sudo apt-get update sudo apt-get install -y clickhouse-server clickhouse-client + - name: Install temporal + run: | + curl -sSf https://temporal.download/cli.sh | sh + echo "$HOME/.temporalio/bin" >> $GITHUB_PATH + - name: Run all tests run: hatch run cov working-directory: ./oonipipeline/ diff --git a/.gitignore b/.gitignore index 999050a9..52d8bbe3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ coverage.xml /output /attic /prof +/clickhouse-data diff --git a/oonipipeline/Readme.md b/oonipipeline/Readme.md new file mode 100644 index 00000000..8775c9b2 --- /dev/null +++ b/oonipipeline/Readme.md @@ -0,0 +1,39 @@ +# OONI Pipeline v5 + +This it the fifth major iteration of the OONI Data Pipeline. + +For historical context, these are the major revisions: +* `v0` - The "pipeline" is basically just writing the RAW json files into a public `www` directory. Used until ~2013 +* `v1` - OONI Pipeline based on custom CLI scripts using mongodb as a backend. Used until ~2015. +* `v2` - OONI Pipeline based on [luigi](https://luigi.readthedocs.io/en/stable/). Used until ~2017. +* `v3` - OONI Pipeline based on [airflow](https://airflow.apache.org/). Used until ~2020. +* `v4` - OONI Pipeline basedon custom script and systemd units (aka fastpath). Currently in use in production. +* `v5` - Next generation OONI Pipeline. What this readme is relevant to. Expected to become in production by Q4 2024. + +## Setup + +In order to run the pipeline you should setup the following dependencies: +* [Temporal for python](https://learn.temporal.io/getting_started/python/dev_environment/) +* [Clickhouse](https://clickhouse.com/docs/en/install) +* [hatch](https://hatch.pypa.io/1.9/install/) + + +### Quick start + +Start temporal dev server: +``` +temporal server start-dev +``` + +Start clickhouse server: +``` +mkdir -p clickhouse-data +clickhouse server +``` + +You can then start the desired workflow, for example to create signal observations for the US: +``` +hatch run oonipipeline mkobs --probe-cc US --test-name signal --start-day 2024-01-01 --end-day 2024-01-02 +``` + +Monitor the workflow executing by accessing: http://localhost:8233/ diff --git a/oonipipeline/debug-temporal.sh b/oonipipeline/debug-temporal.sh deleted file mode 100644 index bbf43acc..00000000 --- a/oonipipeline/debug-temporal.sh +++ /dev/null @@ -1,12 +0,0 @@ -# >>> json.dumps(asdict(ObservationsWorkflowParams(probe_cc=["IT"], start_day="2024-01-01", end_day="2024-01-02", clickhouse="clickhouse://localhost/", data_dir="/Users/art/repos/ooni/data/tests/data/", parallelism=10, fast_fail=False, test_name=["signal"]))) -# -# -INPUT_JSON="{\"probe_cc\": [\"IT\"], \"test_name\": [\"signal\"], \"start_day\": \"2024-01-01\", \"end_day\": \"2024-01-20\", \"clickhouse\": \"clickhouse://localhost/\", \"data_dir\": \"$(pwd)/tests/data/datadir/\", \"parallelism\": 10, \"fast_fail\": false, \"log_level\": 20}" - -echo $INPUT_JSON -temporal workflow start \ - --task-queue oonidatapipeline-task-queue \ - --type ObservationsWorkflow \ - --namespace default \ - --input "$INPUT_JSON" - diff --git a/oonipipeline/pyproject.toml b/oonipipeline/pyproject.toml index c5ee396d..f6ae82a5 100644 --- a/oonipipeline/pyproject.toml +++ b/oonipipeline/pyproject.toml @@ -63,6 +63,7 @@ path = ".venv/" path = "src/oonipipeline/__about__.py" [tool.hatch.envs.default.scripts] +oonipipeline = "python -m oonipipeline.main {args}" test = "pytest {args:tests}" test-cov = "pytest -s --full-trace --log-level=INFO --log-cli-level=INFO -v --setup-show --cov=./ --cov-report=xml --cov-report=html --cov-report=term {args:tests}" cov-report = ["coverage report"] diff --git a/oonipipeline/src/oonipipeline/cli/__init__.py b/oonipipeline/src/oonipipeline/cli/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/oonipipeline/src/oonipipeline/cli/commands.py b/oonipipeline/src/oonipipeline/cli/commands.py new file mode 100644 index 00000000..072948b0 --- /dev/null +++ b/oonipipeline/src/oonipipeline/cli/commands.py @@ -0,0 +1,427 @@ +import logging +import multiprocessing +from pathlib import Path +import sys +from typing import List, Optional +from datetime import date, timedelta, datetime +from typing import List, Optional + +import click +from click_loglevel import LogLevel + +from ..__about__ import VERSION +from ..db.connections import ClickhouseConnection +from ..db.create_tables import create_queries, list_all_table_diffs +from ..netinfo import NetinfoDB + +log = logging.getLogger("oonidata") + +import asyncio + +import concurrent.futures + +from temporalio.client import Client as TemporalClient +from temporalio.worker import Worker, SharedStateManager + +from temporalio.types import MethodAsyncSingleParam, SelfType, ParamType, ReturnType + +from ..workflows.observations import ( + ObservationsWorkflow, + ObservationsWorkflowParams, + make_observation_in_day, +) + +from ..workflows.ground_truths import ( + GroundTruthsWorkflow, + GroundTruthsWorkflowParams, + make_ground_truths_in_day, +) + +from ..workflows.analysis import ( + AnalysisWorkflow, + AnalysisWorkflowParams, + make_analysis_in_a_day, +) + + +TASK_QUEUE_NAME = "oonipipeline-task-queue" + + +async def run_workflow( + workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType], + arg: ParamType, + parallelism: int = 5, + temporal_address: str = "localhost:7233", +): + client = await TemporalClient.connect(temporal_address) + async with Worker( + client, + task_queue=TASK_QUEUE_NAME, + workflows=[ + ObservationsWorkflow, + GroundTruthsWorkflow, + AnalysisWorkflow, + ], + activities=[ + make_observation_in_day, + make_ground_truths_in_day, + make_analysis_in_a_day, + ], + activity_executor=concurrent.futures.ProcessPoolExecutor(parallelism + 2), + max_concurrent_activities=parallelism, + shared_state_manager=SharedStateManager.create_from_multiprocessing( + multiprocessing.Manager() + ), + ): + await client.execute_workflow( + workflow, + arg, + id=TASK_QUEUE_NAME, + task_queue=TASK_QUEUE_NAME, + ) + + +def _parse_csv(ctx, param, s: Optional[str]) -> List[str]: + if s: + return s.split(",") + return [] + + +probe_cc_option = click.option( + "--probe-cc", + callback=_parse_csv, + help="two letter country code, can be comma separated for a list (eg. IT,US). If omitted will select process all countries.", +) +test_name_option = click.option( + "--test-name", + type=str, + callback=_parse_csv, + help="test_name you care to process, can be comma separated for a list (eg. web_connectivity,whatsapp). If omitted will select process all test names.", +) +start_day_option = click.option( + "--start-day", + default=(date.today() - timedelta(days=14)).strftime("%Y-%m-%d"), + help="""the timestamp of the day for which we should start processing data (inclusive). + + Note: this is the upload date, which doesn't necessarily match the measurement date. + """, +) +end_day_option = click.option( + "--end-day", + default=(date.today() + timedelta(days=1)).strftime("%Y-%m-%d"), + help="""the timestamp of the day for which we should start processing data (inclusive). + + Note: this is the upload date, which doesn't necessarily match the measurement date. + """, +) + +clickhouse_option = click.option( + "--clickhouse", type=str, required=True, default="clickhouse://localhost" +) + +datadir_option = click.option( + "--data-dir", + type=str, + required=True, + default="tests/data/datadir", + help="data directory to store fingerprint and geoip databases", +) + + +@click.group() +@click.option("--error-log-file", type=Path) +@click.option( + "-l", + "--log-level", + type=LogLevel(), + default="INFO", + help="Set logging level", + show_default=True, +) +@click.version_option(VERSION) +def cli(error_log_file: Path, log_level: int): + log.addHandler(logging.StreamHandler(sys.stderr)) + log.setLevel(log_level) + if error_log_file: + logging.basicConfig( + filename=error_log_file, encoding="utf-8", level=logging.ERROR + ) + + +@cli.command() +@probe_cc_option +@test_name_option +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use. Only works when writing to a database", +) +@click.option( + "--fast-fail", + is_flag=True, + help="should we fail immediately when we encounter an error?", +) +@click.option( + "--create-tables", + is_flag=True, + help="should we attempt to create the required clickhouse tables", +) +@click.option( + "--drop-tables", + is_flag=True, + help="should we drop tables before creating them", +) +def mkobs( + probe_cc: List[str], + test_name: List[str], + start_day: str, + end_day: str, + clickhouse: str, + data_dir: str, + parallelism: int, + fast_fail: bool, + create_tables: bool, + drop_tables: bool, +): + """ + Make observations for OONI measurements and write them into clickhouse or a CSV file + """ + if create_tables: + if drop_tables: + click.confirm( + "Are you sure you want to drop the tables before creation?", abort=True + ) + + with ClickhouseConnection(clickhouse) as db: + for query, table_name in create_queries: + if drop_tables: + db.execute(f"DROP TABLE IF EXISTS {table_name};") + db.execute(query) + + click.echo("Starting to process observations") + NetinfoDB(datadir=Path(data_dir), download=True) + click.echo("downloaded netinfodb") + + arg = ObservationsWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + start_day=start_day, + end_day=end_day, + clickhouse=clickhouse, + data_dir=str(data_dir), + fast_fail=fast_fail, + ) + click.echo(f"starting to make observations with arg={arg}") + asyncio.run( + run_workflow( + ObservationsWorkflow.run, + arg, + parallelism=parallelism, + ) + ) + + +@cli.command() +@probe_cc_option +@test_name_option +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use. Only works when writing to a database", +) +@click.option( + "--fast-fail", + is_flag=True, + help="should we fail immediately when we encounter an error?", +) +@click.option( + "--create-tables", + is_flag=True, + help="should we attempt to create the required clickhouse tables", +) +@click.option( + "--rebuild-ground-truths", + is_flag=True, + help="should we force the rebuilding of ground truths", +) +def mkanalysis( + probe_cc: List[str], + test_name: List[str], + start_day: str, + end_day: str, + clickhouse: str, + data_dir: Path, + parallelism: int, + fast_fail: bool, + create_tables: bool, + rebuild_ground_truths: bool, +): + if create_tables: + with ClickhouseConnection(clickhouse) as db: + for query, table_name in create_queries: + click.echo(f"Running create query for {table_name}") + db.execute(query) + + click.echo("Starting to perform analysis") + NetinfoDB(datadir=Path(data_dir), download=True) + click.echo("downloaded netinfodb") + + arg = AnalysisWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + start_day=start_day, + end_day=end_day, + clickhouse=clickhouse, + data_dir=str(data_dir), + parallelism=parallelism, + fast_fail=fast_fail, + rebuild_ground_truths=rebuild_ground_truths, + ) + click.echo(f"starting to make analysis with arg={arg}") + asyncio.run( + run_workflow( + AnalysisWorkflow.run, + arg, + parallelism=parallelism, + ) + ) + + +@cli.command() +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +def mkgt( + start_day: str, + end_day: str, + clickhouse: str, + data_dir: Path, +): + click.echo("Starting to build ground truths") + NetinfoDB(datadir=Path(data_dir), download=True) + click.echo("downloaded netinfodb") + + arg = GroundTruthsWorkflowParams( + start_day=start_day, + end_day=end_day, + clickhouse=clickhouse, + data_dir=str(data_dir), + ) + click.echo(f"starting to make ground truths with arg={arg}") + asyncio.run( + run_workflow( + GroundTruthsWorkflow.run, + arg, + ) + ) + + +@cli.command() +@probe_cc_option +@test_name_option +@start_day_option +@end_day_option +@clickhouse_option +@datadir_option +@click.option("--archives-dir", type=Path, required=True) +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use. Only works when writing to a database", +) +def mkbodies( + probe_cc: List[str], + test_name: List[str], + start_day: date, + end_day: date, + clickhouse: str, + data_dir: Path, + archives_dir: Path, + parallelism: int, +): + """ + Make response body archives + """ + # start_response_archiver( + # probe_cc=probe_cc, + # test_name=test_name, + # start_day=start_day, + # end_day=end_day, + # data_dir=data_dir, + # archives_dir=archives_dir, + # clickhouse=clickhouse, + # parallelism=parallelism, + # ) + raise NotImplemented("TODO(art)") + + +@cli.command() +@datadir_option +@click.option("--archives-dir", type=Path, required=True) +@click.option( + "--parallelism", + type=int, + default=multiprocessing.cpu_count() + 2, + help="number of processes to use", +) +def fphunt(data_dir: Path, archives_dir: Path, parallelism: int): + click.echo("🏹 starting the hunt for blockpage fingerprints!") + # start_fingerprint_hunter( + # archives_dir=archives_dir, + # data_dir=data_dir, + # parallelism=parallelism, + # ) + raise NotImplemented("TODO(art)") + + +@cli.command() +@click.option("--clickhouse", type=str) +@click.option( + "--create-tables", + is_flag=True, + help="should we attempt to create the required clickhouse tables", +) +@click.option( + "--drop-tables", + is_flag=True, + help="should we drop tables before creating them", +) +def checkdb( + clickhouse: Optional[str], + create_tables: bool, + drop_tables: bool, +): + """ + Check if the database tables require migrations. If the create-tables flag + is not specified, it will not perform any operations. + """ + + if create_tables: + if not clickhouse: + click.echo("--clickhouse needs to be specified when creating tables") + return 1 + if drop_tables: + click.confirm( + "Are you sure you want to drop the tables before creation?", abort=True + ) + + with ClickhouseConnection(clickhouse) as db: + for query, table_name in create_queries: + if drop_tables: + db.execute(f"DROP TABLE IF EXISTS {table_name};") + db.execute(query) + + with ClickhouseConnection(clickhouse) as db: + list_all_table_diffs(db) diff --git a/oonipipeline/src/oonipipeline/main.py b/oonipipeline/src/oonipipeline/main.py index ed0e4cfc..5d678bab 100644 --- a/oonipipeline/src/oonipipeline/main.py +++ b/oonipipeline/src/oonipipeline/main.py @@ -1,31 +1,4 @@ -import asyncio - -import concurrent.futures - -from temporalio.client import Client -from temporalio.worker import Worker - -from .workflows.observations import ObservationsWorkflow -from .workflows.observations import make_observation_in_day - - -async def async_main(): - client = await Client.connect("localhost:7233") - with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor: - worker = Worker( - client, - task_queue="oonipipeline-task-queue", - workflows=[ObservationsWorkflow], - activities=[make_observation_in_day], - activity_executor=activity_executor, - ) - - await worker.run() - - -def main(): - asyncio.run(async_main()) - +from .cli.commands import cli if __name__ == "__main__": - main() + cli() diff --git a/oonipipeline/src/oonipipeline/workflows/analysis.py b/oonipipeline/src/oonipipeline/workflows/analysis.py index a971e8e6..76f854e2 100644 --- a/oonipipeline/src/oonipipeline/workflows/analysis.py +++ b/oonipipeline/src/oonipipeline/workflows/analysis.py @@ -1,5 +1,6 @@ import asyncio import dataclasses +from dataclasses import dataclass import logging import pathlib @@ -8,63 +9,59 @@ from temporalio import workflow, activity -import orjson -import statsd +with workflow.unsafe.imports_passed_through(): + import clickhouse_driver -from oonidata.dataclient import date_interval -from oonidata.datautils import PerfTimer -from oonidata.models.analysis import WebAnalysis -from oonidata.models.experiment_result import MeasurementExperimentResult + import orjson + import statsd -from ..analysis.control import BodyDB, WebGroundTruthDB -from ..analysis.datasources import iter_web_observations -from ..analysis.web_analysis import make_web_analysis -from ..analysis.website_experiment_results import make_website_experiment_results -from ..db.connections import ClickhouseConnection -from ..fingerprintdb import FingerprintDB + from oonidata.dataclient import date_interval + from oonidata.datautils import PerfTimer + from oonidata.models.analysis import WebAnalysis + from oonidata.models.experiment_result import MeasurementExperimentResult -from ..netinfo import NetinfoDB -from .ground_truths import maybe_build_web_ground_truth + from ..analysis.control import BodyDB, WebGroundTruthDB + from ..analysis.datasources import iter_web_observations + from ..analysis.web_analysis import make_web_analysis + from ..analysis.website_experiment_results import make_website_experiment_results + from ..db.connections import ClickhouseConnection + from ..fingerprintdb import FingerprintDB -from .common import ( - get_obs_count_by_cc, - get_prev_range, - make_db_rows, - maybe_delete_prev_range, - optimize_all_tables, -) + from .ground_truths import make_ground_truths_in_day, MakeGroundTruthsParams -from .observations import ObservationsWorkflowParams, MakeObservationsParams + from .common import ( + get_obs_count_by_cc, + get_prev_range, + make_db_rows, + maybe_delete_prev_range, + optimize_all_tables, + ) log = logging.getLogger("oonidata.processing") -def make_ctrl( - clickhouse: str, - data_dir: pathlib.Path, - rebuild_ground_truths: bool, - day: date, -): - netinfodb = NetinfoDB(datadir=data_dir, download=False) - db_lookup = ClickhouseConnection(clickhouse) - maybe_build_web_ground_truth( - db=db_lookup, - netinfodb=netinfodb, - day=day, - data_dir=data_dir, - rebuild_ground_truths=rebuild_ground_truths, - ) - db_lookup.close() +@dataclass +class AnalysisWorkflowParams: + probe_cc: List[str] + test_name: List[str] + start_day: str + end_day: str + clickhouse: str + data_dir: str + parallelism: int + fast_fail: bool + rebuild_ground_truths: bool + log_level: int = logging.INFO -@dataclasses.dataclass +@dataclass class MakeAnalysisParams: probe_cc: List[str] test_name: List[str] clickhouse: str data_dir: str fast_fail: bool - day: date + day: str @activity.defn @@ -74,7 +71,7 @@ def make_analysis_in_a_day(params: MakeAnalysisParams) -> dict: optimize_all_tables(params.clickhouse) data_dir = pathlib.Path(params.data_dir) clickhouse = params.clickhouse - day = params.day + day = datetime.strptime(params.day, "%Y-%m-%d").date() probe_cc = params.probe_cc test_name = params.test_name @@ -256,12 +253,17 @@ def make_cc_batches( return cc_batches -@workflow.defn +# TODO(art) +# We disable the sanbox for all this workflow, since otherwise pytz fails to +# work which is a requirement for clickhouse. +# This is most likely due to it doing an open() in order to read the timezone +# definitions. +# I spent some time debugging this, but eventually gave up. We should eventually +# look into making this run OK inside of the sandbox. +@workflow.defn(sandboxed=False) class AnalysisWorkflow: @workflow.run - async def run(self, params: ObservationsWorkflowParams) -> dict: - # TODO(art): should this be a parameter or is it better to remove it? - rebuild_ground_truths = True + async def run(self, params: AnalysisWorkflowParams) -> dict: t_total = PerfTimer() t = PerfTimer() @@ -269,14 +271,22 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: end_day = datetime.strptime(params.end_day, "%Y-%m-%d").date() log.info("building ground truth databases") - for day in date_interval(start_day, end_day): - make_ctrl( - clickhouse=params.clickhouse, - data_dir=pathlib.Path(params.data_dir), - rebuild_ground_truths=rebuild_ground_truths, - day=day, - ) - log.info(f"built ground truth db in {t.pretty}") + + async with asyncio.TaskGroup() as tg: + for day in date_interval(start_day, end_day): + tg.create_task( + workflow.execute_activity( + make_ground_truths_in_day, + MakeGroundTruthsParams( + day=day.strftime("%Y-%m-%d"), + clickhouse=params.clickhouse, + data_dir=params.data_dir, + rebuild_ground_truths=params.rebuild_ground_truths, + ), + start_to_close_timeout=timedelta(minutes=2), + ) + ) + log.info(f"built ground truth db in {t.pretty}") with ClickhouseConnection(params.clickhouse) as db: cnt_by_cc = get_obs_count_by_cc( @@ -300,12 +310,12 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: workflow.execute_activity( make_analysis_in_a_day, MakeAnalysisParams( - probe_cc=params.probe_cc, + probe_cc=probe_cc, test_name=params.test_name, clickhouse=params.clickhouse, data_dir=params.data_dir, fast_fail=params.fast_fail, - day=day, + day=day.strftime("%Y-%m-%d"), ), start_to_close_timeout=timedelta(minutes=30), ) diff --git a/oonipipeline/src/oonipipeline/workflows/ground_truths.py b/oonipipeline/src/oonipipeline/workflows/ground_truths.py index 3d679afa..eda81727 100644 --- a/oonipipeline/src/oonipipeline/workflows/ground_truths.py +++ b/oonipipeline/src/oonipipeline/workflows/ground_truths.py @@ -1,34 +1,51 @@ -import queue +import asyncio +from dataclasses import dataclass import pathlib import logging -import multiprocessing as mp -from multiprocessing.synchronize import Event as EventClass +from datetime import datetime, timedelta -from threading import Thread -from datetime import date -from oonidata.dataclient import date_interval +from temporalio import workflow, activity -from oonidata.datautils import PerfTimer +with workflow.unsafe.imports_passed_through(): + import clickhouse_driver -from ..analysis.control import WebGroundTruthDB, iter_web_ground_truths -from ..netinfo import NetinfoDB - -from ..db.connections import ( - ClickhouseConnection, -) -from .common import run_progress_thread + from oonidata.dataclient import date_interval + from oonidata.datautils import PerfTimer + from ..analysis.control import WebGroundTruthDB, iter_web_ground_truths + from ..netinfo import NetinfoDB + from ..db.connections import ( + ClickhouseConnection, + ) log = logging.getLogger("oonidata.processing") -def maybe_build_web_ground_truth( - db: ClickhouseConnection, - netinfodb: NetinfoDB, - day: date, - data_dir: pathlib.Path, - rebuild_ground_truths: bool = False, -): +@dataclass +class GroundTruthsWorkflowParams: + start_day: str + end_day: str + clickhouse: str + data_dir: str + + +@dataclass +class MakeGroundTruthsParams: + clickhouse: str + data_dir: str + day: str + rebuild_ground_truths: bool + + +@activity.defn +def make_ground_truths_in_day(params: MakeGroundTruthsParams): + clickhouse = params.clickhouse + day = datetime.strptime(params.day, "%Y-%m-%d").date() + data_dir = pathlib.Path(params.data_dir) + rebuild_ground_truths = params.rebuild_ground_truths + + db = ClickhouseConnection(clickhouse) + netinfodb = NetinfoDB(datadir=data_dir, download=False) ground_truth_dir = data_dir / "ground_truths" ground_truth_dir.mkdir(exist_ok=True) dst_path = ground_truth_dir / f"web-{day.strftime('%Y-%m-%d')}.sqlite3" @@ -45,105 +62,29 @@ def maybe_build_web_ground_truth( log.info(f"built ground truth DB {day} in {t.pretty}") -class GroundTrutherWorker(mp.Process): - def __init__( +@workflow.defn +class GroundTruthsWorkflow: + @workflow.run + async def run( self, - day_queue: mp.JoinableQueue, - progress_queue: mp.Queue, - clickhouse: str, - shutdown_event: EventClass, - data_dir: pathlib.Path, - log_level: int = logging.INFO, + params: GroundTruthsWorkflowParams, ): - super().__init__(daemon=True) - self.day_queue = day_queue - self.progress_queue = progress_queue - self.data_dir = data_dir - self.clickhouse = clickhouse - - self.shutdown_event = shutdown_event - log.setLevel(log_level) - - def run(self): - db = ClickhouseConnection(self.clickhouse) - netinfodb = NetinfoDB(datadir=self.data_dir, download=False) - - while not self.shutdown_event.is_set(): - try: - day = self.day_queue.get(block=True, timeout=0.1) - except queue.Empty: - continue - - try: - maybe_build_web_ground_truth( - db=db, - netinfodb=netinfodb, - day=day, - data_dir=self.data_dir, - rebuild_ground_truths=True, + task_list = [] + start_day = datetime.strptime(params.start_day, "%Y-%m-%d").date() + end_day = datetime.strptime(params.end_day, "%Y-%m-%d").date() + + async with asyncio.TaskGroup() as tg: + for day in date_interval(start_day, end_day): + task = tg.create_task( + workflow.execute_activity( + make_ground_truths_in_day, + MakeGroundTruthsParams( + clickhouse=params.clickhouse, + data_dir=params.data_dir, + day=day.strftime("%Y-%m-%d"), + rebuild_ground_truths=True, + ), + start_to_close_timeout=timedelta(minutes=30), + ) ) - except: - log.error(f"failed to build ground truth for {day}", exc_info=True) - - finally: - self.day_queue.task_done() - self.progress_queue.put(1) - - -def start_ground_truth_builder( - start_day: date, - end_day: date, - clickhouse: str, - data_dir: pathlib.Path, - parallelism: int, - log_level: int = logging.INFO, -): - shutdown_event = mp.Event() - worker_shutdown_event = mp.Event() - - progress_queue = mp.JoinableQueue() - - progress_thread = Thread( - target=run_progress_thread, - args=(progress_queue, shutdown_event, "generating ground truths"), - ) - progress_thread.start() - - workers = [] - day_queue = mp.JoinableQueue() - for _ in range(parallelism): - worker = GroundTrutherWorker( - day_queue=day_queue, - progress_queue=progress_queue, - shutdown_event=worker_shutdown_event, - clickhouse=clickhouse, - data_dir=data_dir, - log_level=log_level, - ) - worker.start() - log.info(f"started worker {worker.pid}") - workers.append(worker) - - for day in date_interval(start_day, end_day): - day_queue.put(day) - - log.info("waiting for the day queue to finish") - day_queue.join() - - log.info(f"sending shutdown signal to workers") - worker_shutdown_event.set() - - log.info("waiting for progress queue to finish") - progress_queue.join() - - log.info(f"waiting for ground truth workers to finish running") - for idx, p in enumerate(workers): - log.info(f"waiting worker {idx} to join") - p.join() - log.info(f"waiting worker {idx} to close") - p.close() - - log.info("sending shutdown event progress thread") - shutdown_event.set() - log.info("waiting on progress queue") - progress_thread.join() + task_list.append(task) diff --git a/oonipipeline/src/oonipipeline/workflows/observations.py b/oonipipeline/src/oonipipeline/workflows/observations.py index 1b6529be..1232165a 100644 --- a/oonipipeline/src/oonipipeline/workflows/observations.py +++ b/oonipipeline/src/oonipipeline/workflows/observations.py @@ -129,7 +129,6 @@ class ObservationsWorkflowParams: end_day: str clickhouse: str data_dir: str - parallelism: int fast_fail: bool log_level: int = logging.INFO @@ -178,7 +177,6 @@ def make_observation_in_day(params: MakeObservationsParams) -> dict: total_msmt_count = 0 for batch in file_entry_batches: - # TODO(art): add extra parallelism here msmt_cnt = make_observations_for_file_entry_batch( batch, params.clickhouse, diff --git a/oonipipeline/tests/conftest.py b/oonipipeline/tests/conftest.py index 030b8bb3..571d05cd 100644 --- a/oonipipeline/tests/conftest.py +++ b/oonipipeline/tests/conftest.py @@ -1,12 +1,15 @@ import os +import subprocess from pathlib import Path from datetime import date -from click.testing import CliRunner +import time + import pytest import orjson +from click.testing import CliRunner from clickhouse_driver import Client as ClickhouseClient from oonidata.dataclient import sync_measurements @@ -43,6 +46,15 @@ def clickhouse_server(docker_ip, docker_services): yield url +@pytest.fixture(scope="session") +def temporal_dev_server(request): + proc = subprocess.Popen(["temporal", "server", "start-dev"]) + time.sleep(2) + assert not proc.poll() + yield proc + request.addfinalizer(proc.kill) + + @pytest.fixture def datadir(): return DATA_DIR diff --git a/oonipipeline/tests/test_worfklows.py b/oonipipeline/tests/test_workflows.py similarity index 85% rename from oonipipeline/tests/test_worfklows.py rename to oonipipeline/tests/test_workflows.py index 5a11a9d6..4305c993 100644 --- a/oonipipeline/tests/test_worfklows.py +++ b/oonipipeline/tests/test_workflows.py @@ -1,8 +1,29 @@ +import asyncio +from multiprocessing import Process from pathlib import Path +import time +from oonipipeline.cli.commands import cli -def _test_full_workflow( - db, cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: Path + +def wait_for_mutations(db, table_name): + while True: + res = db.execute( + f"SELECT * FROM system.mutations WHERE is_done=0 AND table='{table_name}';" + ) + if len(res) == 0: # type: ignore + break + time.sleep(1) + + +def test_full_workflow( + db, + cli_runner, + fingerprintdb, + netinfodb, + datadir, + tmp_path: Path, + temporal_dev_server, ): result = cli_runner.invoke( cli,