diff --git a/pdr_backend/sim/multisim_engine.py b/pdr_backend/sim/multisim_engine.py index b25105cbe..cc9b8835e 100644 --- a/pdr_backend/sim/multisim_engine.py +++ b/pdr_backend/sim/multisim_engine.py @@ -1,3 +1,4 @@ +import asyncio import copy import csv import logging @@ -18,6 +19,7 @@ from pdr_backend.util.time_types import UnixTimeMs logger = logging.getLogger("multisim_engine") +lock = asyncio.Lock() class MultisimEngine: @@ -47,18 +49,30 @@ def run(self): ss = self.ss logger.info("Multisim engine: start. # runs = %s", ss.n_runs) self.initialize_csv_with_header() - for run_i in range(ss.n_runs): - point_i = self.ss.point_i(run_i) - logger.info("Multisim run_i=%s: start. Vals=%s", run_i, point_i) - ppss = self.ppss_from_point(point_i) - feed = ppss.predictoor_ss.feeds[0] - sim_engine = SimEngine(ppss, feed=feed, multi_id=str(uuid.uuid4())) - sim_engine.run() - run_metrics = sim_engine.st.recent_metrics() + asyncio.run(self.run_async(ss.n_runs)) + + @enforce_types + async def run_async(self, n_runs): + tasks = [] + + for run_i in range(n_runs): + tasks.append(self.run_one(run_i)) + + await asyncio.gather(*tasks) + + @enforce_types + async def run_one(self, run_i: int): + point_i = self.ss.point_i(run_i) + logger.info("Multisim run_i=%s: start. Vals=%s", run_i, point_i) + ppss = self.ppss_from_point(point_i) + feed = ppss.predictoor_ss.feeds[0] + sim_engine = SimEngine(ppss, feed=feed, multi_id=str(uuid.uuid4())) + sim_engine.run() + run_metrics = sim_engine.st.recent_metrics() + async with lock: self.update_csv(run_i, run_metrics, point_i) - logger.info("Multisim run_i=%s: done", run_i) - logger.info("Multisim engine: done. Output file: %s", self.csv_file) + logger.info("Multisim run_i=%s: done", run_i) def ppss_from_point(self, point_i: Point) -> PPSS: """