Skip to content

Commit

Permalink
Make EnsembleEvaluator async
Browse files Browse the repository at this point in the history
A new asyncio.loop is set in base_run_mode to run the entire EEAsync.
  • Loading branch information
xjules committed Jan 23, 2024
1 parent 797b49a commit 495aabe
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 11 deletions.
2 changes: 2 additions & 0 deletions src/ert/ensemble_evaluator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)
from .config import EvaluatorServerConfig
from .evaluator import EnsembleEvaluator
from .evaluator_async import EnsembleEvaluatorAsync
from .evaluator_tracker import EvaluatorTracker
from .event import EndEvent, FullSnapshotEvent, SnapshotUpdateEvent
from .monitor import Monitor
Expand All @@ -17,6 +18,7 @@
"Ensemble",
"EnsembleBuilder",
"EnsembleEvaluator",
"EnsembleEvaluatorAsync",
"EvaluatorServerConfig",
"EvaluatorTracker",
"ForwardModel",
Expand Down
2 changes: 2 additions & 0 deletions src/ert/ensemble_evaluator/_builder/_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ async def send_cloudevent(
retries: int = 10,
) -> None:
async with Client(url, token, cert, max_retries=retries) as client:
print(f"DEBUG before send {event=}")
await client._send(to_json(event, data_marshaller=evaluator_marshaller))
print(f"DEBUG after send {event=}")

def get_successful_realizations(self) -> List[int]:
return self._snapshot.get_successful_realizations()
Expand Down
27 changes: 27 additions & 0 deletions src/ert/ensemble_evaluator/_builder/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,32 @@ def evaluate(self, config: EvaluatorServerConfig) -> None:

threading.Thread(target=self._evaluate, name="LegacyEnsemble").start()

async def evaluate_async(self, config: EvaluatorServerConfig) -> asyncio.Task[Any]:
if not config:
raise ValueError("no config for evaluator")
self._config = config
await wait_for_evaluator(
base_url=self._config.url,
token=self._config.token,
cert=self._config.cert,
)
ce_unary_send_method_name = "_ce_unary_send"
setattr(
self.__class__,
ce_unary_send_method_name,
partialmethod(
self.__class__.send_cloudevent,
self._config.dispatch_uri,
token=self._config.token,
cert=self._config.cert,
),
)
return asyncio.create_task(
self._evaluate_inner(
cloudevent_unary_send=getattr(self, ce_unary_send_method_name)
)
)

def _evaluate(self) -> None:
"""
This method is executed on a separate thread, i.e. in parallel
Expand Down Expand Up @@ -244,6 +270,7 @@ async def _evaluate_inner( # pylint: disable=too-many-branches
await send_timeout_future

# Dispatch final result from evaluator - FAILED, CANCEL or STOPPED
print(f"DEBUG final event!@!!!!!! {result=}")
await cloudevent_unary_send(event_creator(result, None))

@property
Expand Down
Loading

0 comments on commit 495aabe

Please sign in to comment.