Skip to content

Commit

Permalink
Add lazyobject and lazy_get api.
Browse files Browse the repository at this point in the history
Signed-off-by: Ye Cao <[email protected]>
  • Loading branch information
dashanji committed Dec 27, 2024
1 parent fa7f690 commit b24c1eb
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
127 changes: 127 additions & 0 deletions python/vineyard/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import warnings
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from concurrent.futures import Future
import threading
from typing import Any
from typing import Dict
from typing import List
Expand All @@ -43,9 +45,94 @@
from vineyard._C import _connect
from vineyard.core.builder import BuilderContext
from vineyard.core.builder import put
from vineyard.core.resolver import get_current_resolvers
from vineyard.core.resolver import ResolverContext
from vineyard.core.resolver import get

class LazyObject:
"""A helper class for lazy fetching of vineyard objects."""

def __init__(self, client, object_id: Optional[ObjectID] = None, name: Optional[str] = None, resolver: Optional[BuilderContext] = None, fetch: bool = False, **kwargs):
"""
Initialize the LazyObject.
Args:
client (Client): The Vineyard client.
object_id (Optional[ObjectID]): The object id to fetch.
name (Optional[str]): The name of the object to fetch.
resolver (Optional[BuilderContext]): The resolver to use for fetching the object.
fetch (bool): Whether to fetch the object immediately.
"""
self.client = client
self.object_id = object_id
self.name = name
self.resolver = resolver
self.kwargs = kwargs
self._result: Optional[Any] = None
self._exception: Optional[Exception] = None
self._ready_event = threading.Event()

# Mark the future as not started yet
self.future: Optional[Future] = None

# Start the asynchronous fetch
self._start_fetch()

def _start_fetch(self):
"""Start the asynchronous fetch operation."""
if self.future is None:
self.future = self.client._lazy_get_thread_pool.submit(self._fetch)
self.future.add_done_callback(self._callback)

def _fetch(self) -> None:
"""Internal method to fetch the object."""
obj = self.client.get(object_id=self.object_id, name=self.name, resolver=self.resolver, fetch=self.fetch, **self.kwargs)
self._result = obj

def _callback(self, fut: Future):
"""Callback executed when the future completes."""
try:
fut.result() # This will raise if the fetch failed
except Exception as e:
self._exception = e
finally:
self._ready_event.set()

def get(self) -> Any:
"""
Retrieve the object. If the object is not ready, raise an exception.
Returns:
Any: The fetched object.
Raises:
Exception: If an error occurred during fetching.
"""
if self.future is None:
raise RuntimeError("Fetch operation was not started.")

if self.future.done():
if self._exception:
raise self._exception
return self._result
else:
self.cancel()
raise Exception("Data not ready, fetch operation has been canceled.")

def is_ready(self) -> bool:
"""
Check if the tensor has been fetched.
Returns:
bool: True if the tensor has been fetched, False otherwise.
"""
return self._ready_event.is_set()

def cancel(self):
"""Cancel the fetch operation"""
if self.future and not self.future.done():
self.future.cancel()


def _apply_docstring(func):
def _apply(fn):
Expand Down Expand Up @@ -168,6 +255,7 @@ def __init__(
session: int = None,
username: str = None,
password: str = None,
max_workers: int = 8,
config: str = None,
):
"""Connects to the vineyard IPC socket and RPC socket.
Expand Down Expand Up @@ -211,6 +299,8 @@ def __init__(
is enabled.
password: Optional, the required password of vineyardd when authentication
is enabled.
max_workers: Optional, the maximum number of threads that can be used to
asynchronously get/put objects from/to vineyard. Default is 8.
config: Optional, can either be a path to a YAML configuration file or
a path to a directory containing the default config file
`vineyard-config.yaml`. Also, the environment variable
Expand Down Expand Up @@ -292,6 +382,12 @@ def __init__(

self._spread = False
self._compression = True

# Initialize thread pool for lazy_get
self._lazy_get_thread_pool = ThreadPoolExecutor(max_workers=max_workers)
self._lazy_get_futures: Dict[Optional[ObjectID, str], Any] = {}
self._lazy_get_lock = threading.Lock()

if self._ipc_client is None and self._rpc_client is None:
raise ConnectionError(
"Failed to connect to vineyard via both IPC and RPC connection. "
Expand Down Expand Up @@ -874,5 +970,36 @@ def with_spread(self, enabled: bool = True):
yield
self.spread = tmp_spread

def lazy_get(self, object_id: Optional[ObjectID] = None, name: Optional[str] = None, resolver: Optional[BuilderContext] = None, fetch: bool = False, **kwargs) -> LazyObject:
"""Asynchronously fetch an object corresponding to the given object name.
Args:
object_id (Optional[ObjectID]): The object id to fetch.
name (Optional[str]): The name of the object to fetch.
resolver (Optional[BuilderContext]): The resolver to use for fetching the object.
fetch (bool): Whether to fetch the object immediately.
Returns:
LazyObject: An object that represents the future result of the fetch operation.
"""
with self._lazy_get_lock:
if object_id is not None and object_id in self._lazy_get_futures:
return self._lazy_get_futures[object_id]
if name is not None and name in self._lazy_get_futures:
return self._lazy_get_futures[name]

resolver = resolver or get_current_resolvers()
lazy_object = LazyObject(self, object_id,, resolver, **kwargs)
self._lazy_get_futures[name] = lazy_object

def _cleanup(fut):
with self._lazy_get_lock:
self._lazy_get_futures.pop(name, None)

# Attach cleanup logic once the tensor is fetched or fails
if lazy_object.future:
lazy_object.future.add_done_callback(_cleanup)

return lazy_object

__all__ = ['Client']
48 changes: 48 additions & 0 deletions python/vineyard/core/tests/test_lazy_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import vineyard
import torch
from vineyard.contrib.ml.torch import torch_context

client = vineyard.connect('/var/run/vineyard.sock1')

#create a random tensor
#x = torch.rand(100, 100, 100)
#client.delete(name="test_lazy_tensor")
#with torch_context(client):
# client.put(x, name="test_lazy_tensor", persist=True)

test_kwargs = {'test': 'test'}
with torch_context(client):
lazy_tensor = client.lazy_get(name="test_lazy_tensor111", **test_kwargs)
"""
try:
for i in range(8):
name = "test_lazy_tensor" + str(i)
print("Creating tensor", name)
lazy_tensor = client.lazy_get("test_lazy_tensor")
except Exception as e:
print(e)
"""

# At a later point, attempt to access the tensor

try:
if lazy_tensor.is_ready():
print("Tensor is ready")
#try:
# tensor = lazy_tensor.get()
# print(tensor)
#except RuntimeError as e:
# pass
else:
print("Tensor is not ready")
#import time
#time.sleep(5)
# print("Waiting for 5 seconds")
#if lazy_tensor.is_ready():
# print("Tensor is ready")
# tensor = lazy_tensor.get()
# print(tensor)
except Exception as e:
print(e)

print("Done")

0 comments on commit b24c1eb

Please sign in to comment.