diff --git a/pdr_backend/ppss/multisim_ss.py b/pdr_backend/ppss/multisim_ss.py index ed4f2be20..00df5a91f 100644 --- a/pdr_backend/ppss/multisim_ss.py +++ b/pdr_backend/ppss/multisim_ss.py @@ -36,6 +36,9 @@ def approach(self) -> str: def sweep_params(self) -> list: return self.d["sweep_params"] + @property + def max_workers(self) -> int: + return self.d.get("max_workers", 1) # -------------------------------- # derivative properties @property diff --git a/pdr_backend/sim/multisim_engine.py b/pdr_backend/sim/multisim_engine.py index 265c0f679..7a3bcd42c 100644 --- a/pdr_backend/sim/multisim_engine.py +++ b/pdr_backend/sim/multisim_engine.py @@ -1,8 +1,9 @@ -import asyncio import copy +import concurrent.futures import csv import logging import os +import threading import uuid from typing import List, Union @@ -19,7 +20,7 @@ from pdr_backend.util.time_types import UnixTimeMs logger = logging.getLogger("multisim_engine") -lock = asyncio.Lock() +lock = threading.Lock() class MultisimEngine: @@ -49,19 +50,20 @@ def run(self): ss = self.ss logger.info("Multisim engine: start. # runs = %s", ss.n_runs) self.initialize_csv_with_header() - asyncio.run(self.run_async(ss.n_runs)) + self.run_multithreaded(ss.n_runs) @enforce_types - async def run_async(self, n_runs): - tasks = [] - - for run_i in range(n_runs): - tasks.append(self.run_one(run_i)) - - await asyncio.gather(*tasks) + def run_multithreaded(self, n_runs): + with concurrent.futures.ProcessPoolExecutor(max_workers=self.ss.max_workers) as executor: + futures = [executor.submit(self.run_one, run_i) for run_i in range(n_runs)] + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + logger.error(f"Run {future} generated an exception: {e}") @enforce_types - async def run_one(self, run_i: int): + def run_one(self, run_i: int): point_i = self.ss.point_i(run_i) logger.info("Multisim run_i=%s: start. Vals=%s", run_i, point_i) ppss = self.ppss_from_point(point_i) @@ -71,7 +73,7 @@ async def run_one(self, run_i: int): sim_engine.run() run_metrics = list(sim_engine.st.recent_metrics().values()) - async with lock: + with lock: self.update_csv(run_i, run_metrics, point_i) logger.info("Multisim run_i=%s: done", run_i) diff --git a/ppss.yaml b/ppss.yaml index 7dc0f51d9..189e380ea 100644 --- a/ppss.yaml +++ b/ppss.yaml @@ -63,6 +63,7 @@ sim_ss: # sim only multisim_ss: approach: SimpleSweep # SimpleSweep | FastSweep (future) | .. + max_workers: 1 sweep_params: - trader_ss.buy_amt: 1000 USD, 2000 USD - predictoor_ss.aimodel_ss.max_n_train: 500, 1000, 1500