Skip to content

Commit

Permalink
Use a multiprocessing.Pool instead of concurrent.futures.ProcessPoolE…
Browse files Browse the repository at this point in the history
…xecutor.

We already need a multiprocessing.Lock, and we're not doing anything that needs concurrent.Futures, so this seems more direct. There also appears to be very slightly less overhead (~0-5%).
  • Loading branch information
SyntaxColoring committed Feb 10, 2024
1 parent 14113f7 commit bea752a
Showing 1 changed file with 19 additions and 24 deletions.
43 changes: 19 additions & 24 deletions robot-server/robot_server/persistence/_migrations/up_to_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
since the updated `analysis.completed_analysis` (see above) replaces it.
"""

import concurrent.futures
import multiprocessing
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import ContextManager, Dict, Generator, Iterable, List, Optional
from typing import ContextManager, Dict, Generator, Iterable, List, Optional, Tuple

from opentrons.protocol_engine import Command, StateSummary
import pydantic
Expand Down Expand Up @@ -212,37 +211,33 @@ def _migrate_db_commands(
manager = multiprocessing.Manager()
lock = manager.Lock()

with concurrent.futures.ProcessPoolExecutor(
with multiprocessing.Pool(
# One worker per core of the OT-2's Raspberry Pi.
# We're compute-bound, so more workers would just thrash.
#
# This should be safe from a memory footprint point of view.
# Napkin math for the memory footprint:
# Suppose a very large protocol has ~10MB of commands (see e.g. RQA-443).
# The maximum number of runs at the time of writing is 20,
# so that's at most ~200MB total.
max_workers=4
# so that's at most ~200MB total, which should be fine.
processes=4
) as pool:
futures = [
pool.submit(
_migrate_db_commands_for_run,
source_db_file,
dest_db_file,
run_id,
lock,
)
for run_id in run_ids
]
for future in futures:
# TODO: See if there's a better way to await all the results.
future.result()
pool.map(
_migrate_db_commands_for_run,
((source_db_file, dest_db_file, run_id, lock) for run_id in run_ids),
)


def _migrate_db_commands_for_run(
source_db_file: Path,
dest_db_file: Path,
run_id: str,
# This is a multiprocessing.Lock, which can't be a type annotation for some reason.
lock: ContextManager[object],
args: Tuple[
Path,
Path,
str,
# This is a multiprocessing.Lock, which can't be a type annotation for some reason.
ContextManager[object],
]
) -> None:
source_db_file, dest_db_file, run_id, lock = args

with _schema_2_sql_engine(source_db_file) as source_engine, _schema_3_sql_engine(
dest_db_file
) as dest_engine:
Expand Down

0 comments on commit bea752a

Please sign in to comment.