diff --git a/pdr_backend/sim/multisim_engine.py b/pdr_backend/sim/multisim_engine.py index 71d7f132e..c3839756e 100644 --- a/pdr_backend/sim/multisim_engine.py +++ b/pdr_backend/sim/multisim_engine.py @@ -1,3 +1,4 @@ +import asyncio import copy import csv import logging @@ -18,6 +19,7 @@ from pdr_backend.util.time_types import UnixTimeMs logger = logging.getLogger("multisim_engine") +lock = asyncio.Lock() class MultisimEngine: @@ -47,15 +49,29 @@ def run(self): ss = self.ss logger.info("Multisim engine: start. # runs = %s", ss.n_runs) self.initialize_csv_with_header() - for run_i in range(ss.n_runs): - point_i = self.ss.point_i(run_i) - logger.info("Multisim run_i=%s: start. Vals=%s", run_i, point_i) - ppss = self.ppss_from_point(point_i) - feedset = ppss.predictoor_ss.predict_train_feedsets[0] - multi_id = str(uuid.uuid4()) - sim_engine = SimEngine(ppss, feedset, multi_id) - sim_engine.run() - run_metrics = sim_engine.st.recent_metrics() + asyncio.run(self.run_async(ss.n_runs)) + + @enforce_types + async def run_async(self, n_runs): + tasks = [] + + for run_i in range(n_runs): + tasks.append(self.run_one(run_i)) + + await asyncio.gather(*tasks) + + @enforce_types + async def run_one(self, run_i: int): + point_i = self.ss.point_i(run_i) + logger.info("Multisim run_i=%s: start. Vals=%s", run_i, point_i) + ppss = self.ppss_from_point(point_i) + feedset = ppss.predictoor_ss.predict_train_feedsets[0] + multi_id = str(uuid.uuid4()) + sim_engine = SimEngine(ppss, feedset, multi_id) + sim_engine.run() + run_metrics = sim_engine.st.recent_metrics() + + async with lock: self.update_csv(run_i, run_metrics, point_i) logger.info("Multisim run_i=%s: done", run_i)