Skip to content

Commit

Permalink
fix sending from forked container (#1692)
Browse files Browse the repository at this point in the history
Co-authored-by: Szymon Sadkowski <[email protected]>
  • Loading branch information
szysad and Szymon Sadkowski authored Mar 25, 2024
1 parent e025183 commit 855df0b
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
### Fixes
- Restored support for SSL verification exception ([#1661](https://github.com/neptune-ai/neptune-client/pull/1661))
- Allow user to control logging level ([#1679](https://github.com/neptune-ai/neptune-client/pull/1679))
- Fix sending data with forked container ([#1692](https://github.com/neptune-ai/neptune-client/pull/1692))

### Changes
- Improve dependency installation checking ([#1670](https://github.com/neptune-ai/neptune-client/pull/1670))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,26 @@

def trigger_evaluation(method: Callable[..., RT]) -> Callable[..., RT]:
def _wrapper(self: LazyOperationProcessorWrapper, *args: Any, **kwargs: Any) -> RT:
if self._operation_processor is None:
self._operation_processor = self._operation_processor_getter()
if self._post_trigger_side_effect is not None:
self._post_trigger_side_effect()
self.evaluate()
return method(self, *args, **kwargs)

return _wrapper


def noop_if_not_triggered(method: Callable[..., RT]) -> Callable[..., RT]:
def _wrapper(self: LazyOperationProcessorWrapper, *args: Any, **kwargs: Any) -> RT:
if self._operation_processor is not None:
def noop_if_not_evaluated(method: Callable[..., RT]) -> Callable[..., Optional[RT]]:
def _wrapper(self: LazyOperationProcessorWrapper, *args: Any, **kwargs: Any) -> Optional[RT]:
if self.is_evaluated:
return method(self, *args, **kwargs)
return None

return _wrapper


def noop_if_evaluated(method: Callable[..., RT]) -> Callable[..., Optional[RT]]:
def _wrapper(self: LazyOperationProcessorWrapper, *args: Any, **kwargs: Any) -> Optional[RT]:
if not self.is_evaluated:
return method(self, *args, **kwargs)
return None

return _wrapper

Expand All @@ -63,8 +70,13 @@ def __init__(
self._post_trigger_side_effect = post_trigger_side_effect
self._operation_processor: OperationProcessor = None # type: ignore

@noop_if_evaluated
def evaluate(self) -> None:
self._operation_processor = self._operation_processor_getter()
self._operation_processor.start()

@property
def evaluated(self) -> bool:
def is_evaluated(self) -> bool:
return self._operation_processor is not None

@trigger_evaluation
Expand All @@ -88,26 +100,26 @@ def data_path(self) -> Path:
def start(self) -> None:
self._operation_processor.start()

@noop_if_not_triggered
@noop_if_not_evaluated
def pause(self) -> None:
self._operation_processor.pause()

@noop_if_not_triggered
@noop_if_not_evaluated
def resume(self) -> None:
self._operation_processor.resume()

@noop_if_not_triggered
@noop_if_not_evaluated
def flush(self) -> None:
self._operation_processor.flush()

@noop_if_not_triggered
@noop_if_not_evaluated
def wait(self) -> None:
self._operation_processor.wait()

@noop_if_not_triggered
@noop_if_not_evaluated
def stop(self, seconds: Optional[float] = None) -> None:
self._operation_processor.stop(seconds=seconds)

@noop_if_not_triggered
@noop_if_not_evaluated
def close(self) -> None:
self._operation_processor.close()
2 changes: 0 additions & 2 deletions src/neptune/metadata_containers/metadata_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,7 @@ def _handle_fork_in_child(self):
flush_period=self._flush_period,
queue=self._signals_queue,
),
post_trigger_side_effect=self._op_processor.start,
)

# TODO: Every implementation of background job should handle fork by itself.
jobs = []
if self._mode == Mode.ASYNC:
Expand Down
41 changes: 41 additions & 0 deletions tests/e2e/standard/test_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os
import signal
import unittest
from multiprocessing import Barrier

import pytest

from neptune.common.utils import IS_WINDOWS
from tests.e2e.base import AVAILABLE_CONTAINERS
from tests.e2e.utils import (
Environment,
initialize_container,
)


@unittest.skipIf(IS_WINDOWS, "Windows does not support fork")
@pytest.mark.parametrize("container_type", AVAILABLE_CONTAINERS)
def test_fork_child_parent_info_exchange(container_type: str, environment: Environment):
barrier = Barrier(2)
with initialize_container(container_type=container_type, project=environment.project) as container:
child_pid = os.fork()
if child_pid == 0:
# child process exec
container["child_key"] = "child_value"
container.wait()
barrier.wait() # after barrier both processes have sent data

container.sync()
assert container["parent_key"].fetch() == "parent_value"

os.kill(os.getpid(), signal.SIGTERM) # kill child process, as it has cloned testing runtime
else:
# parent process exec
container["parent_key"] = "parent_value"
container.wait()
barrier.wait() # after barrier both processes have sent data

container.sync()
assert container["child_key"].fetch() == "child_value"

os.waitpid(child_pid, 0)
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ def test_lazy_initialization():

# then
operation_processor_getter.assert_not_called()
assert not lazy_wrapper.evaluated
assert not lazy_wrapper.is_evaluated

# when
lazy_wrapper.enqueue_operation(mock.Mock(), wait=False)
lazy_wrapper.enqueue_operation(mock.Mock(), wait=False)

# then
operation_processor_getter.assert_called_once()
assert lazy_wrapper.evaluated
assert lazy_wrapper.is_evaluated


def test_call_propagation_to_wrapped():
Expand All @@ -47,24 +47,24 @@ def test_call_propagation_to_wrapped():
# then
operation_storage.assert_called_once()

for method in ["start", "pause", "resume", "flush", "wait", "stop", "close"]:
for method in ["pause", "resume", "flush", "wait", "stop", "close"]:
# when
getattr(lazy_wrapper, method)()

# then
getattr(operation_processor, method).assert_called_once()


def test_post_init_trigger_side_effect_called():
def test_op_processor_started_after_evaluation():

# given
operation_processor = mock.Mock(spec=OperationProcessor)
operation_processor_getter = mock.Mock(return_value=operation_processor)
post_trigger_side_effect = mock.Mock()
lazy_wrapper = LazyOperationProcessorWrapper(operation_processor_getter, post_trigger_side_effect)
lazy_wrapper = LazyOperationProcessorWrapper(operation_processor_getter)

# when
lazy_wrapper.enqueue_operation(mock.Mock(), wait=False)
lazy_wrapper.evaluate()
lazy_wrapper.evaluate()

# then
post_trigger_side_effect.assert_called_once()
operation_processor.start.assert_called_once()

0 comments on commit 855df0b

Please sign in to comment.