Skip to content

Commit

Permalink
Removed old operation processors
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky committed Jun 19, 2024
1 parent b8d4580 commit 1fb0685
Show file tree
Hide file tree
Showing 20 changed files with 61 additions and 957 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
data_path: Optional[Path] = None,
serializer: Callable[[Operation], Dict[str, Any]] = lambda op: op.to_dict(),
should_print_logs: bool = True,
sleep_time: float = 5.0,
) -> None:
self._should_print_logs = should_print_logs
self._accepts_operations: bool = True
Expand All @@ -79,7 +80,7 @@ def __init__(
)

self._consumer = ConsumerThread(
sleep_time=5,
sleep_time=sleep_time,
processing_resources=self._processing_resources,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022, Neptune Labs Sp. z o.o.
# Copyright (c) 2024, Neptune Labs Sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,67 +21,41 @@
from queue import Queue
from typing import TYPE_CHECKING

from neptune.core.operation_processors.async_operation_processor import AsyncOperationProcessor
from neptune.core.operation_processors.offline_operation_processor import OfflineOperationProcessor
from neptune.core.operation_processors.operation_processor import OperationProcessor
from neptune.core.operation_processors.read_only_operation_processor import ReadOnlyOperationProcessor
from neptune.core.operation_processors.sync_operation_processor import SyncOperationProcessor
from neptune.core.typing.container_type import ContainerType
from neptune.core.typing.id_formats import CustomId
from neptune.envs import NEPTUNE_ASYNC_BATCH_SIZE
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.objects.mode import Mode

from .async_operation_processor import AsyncOperationProcessor
from .sync_operation_processor import SyncOperationProcessor

if TYPE_CHECKING:
from neptune.internal.signals_processing.signals import Signal


# WARNING: Be careful when changing this function. It is used in the experimental package
def build_async_operation_processor(
container_id: UniqueId,
container_type: ContainerType,
backend: NeptuneBackend,
lock: threading.RLock,
sleep_time: float,
queue: "Queue[Signal]",
) -> OperationProcessor:
return AsyncOperationProcessor(
container_id=container_id,
container_type=container_type,
backend=backend,
lock=lock,
sleep_time=sleep_time,
batch_size=int(os.environ.get(NEPTUNE_ASYNC_BATCH_SIZE) or "1000"),
queue=queue,
)


def get_operation_processor(
mode: Mode,
container_id: UniqueId,
custom_id: CustomId,
container_type: ContainerType,
backend: NeptuneBackend,
lock: threading.RLock,
flush_period: float,
queue: "Queue[Signal]",
) -> OperationProcessor:
if mode == Mode.ASYNC:
return build_async_operation_processor(
container_id=container_id,
return AsyncOperationProcessor(
custom_id=custom_id,
container_type=container_type,
backend=backend,
lock=lock,
sleep_time=flush_period,
queue=queue,
batch_size=int(os.environ.get(NEPTUNE_ASYNC_BATCH_SIZE) or "1000"),
signal_queue=queue,
)
elif mode == Mode.SYNC:
return SyncOperationProcessor(container_id, container_type, backend)
elif mode == Mode.DEBUG:
return SyncOperationProcessor(container_id, container_type, backend)
elif mode in {Mode.SYNC, Mode.DEBUG}:
return SyncOperationProcessor(custom_id=custom_id, container_type=container_type)
elif mode == Mode.OFFLINE:
# the object was returned by mocked backend and has some random ID.
return OfflineOperationProcessor(container_id, container_type, lock)
return OfflineOperationProcessor(custom_id=custom_id, container_type=container_type, lock=lock)
elif mode == Mode.READ_ONLY:
return ReadOnlyOperationProcessor()
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,32 @@
from neptune.core.components.operation_storage import OperationStorage
from neptune.core.components.queue.disk_queue import DiskQueue
from neptune.core.operation_processors.operation_processor import OperationProcessor
from neptune.core.operations.operation import Operation
from neptune.internal.operation_processors.utils import (
from neptune.core.operation_processors.utils import (
common_metadata,
get_container_full_path,
)
from neptune.core.operations.operation import Operation
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize

if TYPE_CHECKING:
from neptune.core.components.abstract import Resource
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.core.typing.container_type import ContainerType
from neptune.core.typing.id_formats import CustomId


serializer: Callable[[Operation], Dict[str, Any]] = lambda op: op.to_dict()


class OfflineOperationProcessor(WithResources, OperationProcessor):
def __init__(self, container_id: "UniqueId", container_type: "ContainerType", lock: "threading.RLock"):
self._data_path = get_container_full_path(OFFLINE_DIRECTORY, container_id, container_type)
def __init__(self, custom_id: "CustomId", container_type: "ContainerType", lock: "threading.RLock"):
self._data_path = get_container_full_path(OFFLINE_DIRECTORY, custom_id, container_type)

# Initialize directory
self._data_path.mkdir(parents=True, exist_ok=True)

self._metadata_file = MetadataFile(
data_path=self._data_path,
metadata=common_metadata(mode="offline", container_id=container_id, container_type=container_type),
metadata=common_metadata(mode="offline", custom_id=custom_id, container_type=container_type),
)
self._operation_storage = OperationStorage(data_path=self._data_path)
self._queue = DiskQueue(data_path=self._data_path, to_dict=serializer, from_dict=Operation.from_dict, lock=lock)
Expand Down
Loading

0 comments on commit 1fb0685

Please sign in to comment.