Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow data stores to prepare accessing inert resources #1093

Closed
forman opened this issue Nov 29, 2024 · 4 comments · Fixed by #1097
Closed

Allow data stores to prepare accessing inert resources #1093

forman opened this issue Nov 29, 2024 · 4 comments · Fixed by #1097

Comments

@forman
Copy link
Member

forman commented Nov 29, 2024

Is your feature request related to a problem? Please describe.

Some data APIs provide only limited data access performance. Hence, the xcube data store's store.open_data(data_id, ...) approach doesn't apply well because (A) the method is blocking meaning it is not async, and (2) it may take long time before it returns. The reasons for data APIs to be slow are manifold: intended access is by file download, access is bandwidth limited, or not allowing for subsetting.

Describe the solution you'd like

Give users the opportunity to prepare selected datasets for faster access. Therefore, the data store interface should be extended by a new method preload_data() that allows for preparing selected datasets for faster access. Of course, how this is done depends heavily on the specific data API. However, the data store framework could help implementing such preparations, e.g., by providing the means to download and cache data in generic filesystems (by the use of fsspec).
The input to the methods should be the data identifiers to be prepared, and data store specific preload parameters.
The return value of preload_data() should be a handle that represents the preloading process.
A second method should allow for describing allowed parameters passed to the preload_data(): get_preload_params_schema(). New method signarures:

def preload_data(self, *data_ids: str, **preload_params: Any) -> PreloadHandle:
    """Preaload data for faster access."""
    return NullPreloadHandle()  # No effect

def get_preload_data_params(self) -> JsonObjectSchema:
    """Get the JSON schema that describes the parameters passed to `preload_data()`."""
    return JsonObjectSchema(additional_properties=False)

The PreloadHandle interface:

@property
def is_cancelled() -> bool:
    """Whether the preload process has been cancelled."""

def cancel():
    """Cancel the preload process."""

@property
def is_closed() -> bool:
    """Whether the preload process has been closed."""

def close():
   """Close the preload process and release any bound resources.""""

@property
def progress() -> float:
   """Get the progress as a number in the range 0 to 1.""""

def subscribe(listener: Callable[[str, float, str], None]) -> Callable[[], None]:
   """Subscribe to state changes of the preload process.

   Args:
       listener: a listener that receives status, progress, and a message
   Returns: 
       A function to unsubscribe from the preload process.
    """

The PreloadHandle interface should also have an intuitive Jupyter Notebook representation to let user observe a long-running preload process.

@forman
Copy link
Member Author

forman commented Nov 29, 2024

FYI @konstntokas @b-yogesh

@b-yogesh
Copy link
Contributor

Thanks @forman for defining this in detail.
I have a few questions

  • PreloadHandle.progress - you mention that this should return a float from 0 to 1, but how do we quantify the waiting time + download + extraction for a tuple of data_ids? And what does this number mean to the user?
  • close vs cancel - what is the difference here? When to use close and the expected behavour? If the preload is in the progress and the user calls close, does it mean we have to internally first cancel the preload and then release the resources?
  • If the user calls cancel, do we cancel the preload activities for all the data_ids? What if some of the requests are already completed? Or, do we add parameters to the cancel method for cancelling requests of an individual data_id?

@forman
Copy link
Member Author

forman commented Dec 9, 2024

@b-yogesh

  • PreloadHandle.progress - you mention that this should return a float from 0 to 1, but how do we quantify the waiting time + download + extraction for a tuple of data_ids?

This is up to the specific implementation. Naive implementation would compute it by current_data_index / (len(data_ids) - 1). But usually you can achieve a higher progress resolution by recording progress within preloading of individual data items.

And what does this number mean to the user?

The preload process is 100 * progress percent completed.

  • close vs cancel - what is the difference here? When to use close and the expected behavour? If the preload is in the progress and the user calls close, does it mean we have to internally first cancel the preload and then release the resources?

I believe the original idea I had was allowing for implementing the PreloadHandle as a context manager:

# Blocks until preloading is done then executes the code block, then closes.
with store.preload("my_id_1", "my_id_2"):  
    ds1 = store.open_data("my_id_1")
    ds2 = store.open_data("my_id_2")
    # do something with ds1, ds2

vs.

# Non-blocking call. 
plh = store.preload("my_id_1", "my_id_2")

# Use the handle to observe the preload process. E.g. show progress bar in JL.
# If it takes to long for your your taste, call plh.cancel().

ds1 = store.open_data("my_id_1")
ds2 = store.open_data("my_id_2")
# do something with ds1, ds2

# Call close when you are done to release any resource, e.g., temp files.
plh.close()
  • If the user calls cancel, do we cancel the preload activities for all the data_ids? What if some of the requests are already completed? Or, do we add parameters to the cancel method for cancelling requests of an individual data_id?

Cancellation refers to the handle, hence the process covering all the passed data_ids.

@konstntokas
Copy link
Contributor

konstntokas commented Dec 13, 2024

@forman our suggestion based on current implementations in xcube-clms and xcube-zenodo:

Software design of preload method

Preload Handler

This can go into xcube.

class PreloadHandle():
    
    def __init__(self, cache_store: MutableDataStore, preload_task: Type[PreloadTask],  *data_ids, **preload_params):
        self._cache_store = cache_store
        self.data_id = data_ids
        self._preload_params = preload_params
        self._is_cancelled = False
        self._is_closed = False
        self._tasks = [preload_task(cache_store, data_id, **preload_params) for data_id in data_ids]
        executor = ThreadPoolExecutor()
        for task in self._tasks:
            executor.submit(task.run_worklfow)

    @property
    def is_cancelled(self) -> bool:
        """Whether the preload process has been cancelled."""
        return self._is_cancelled
    
    def cancel(self):
        """Cancel the preload process."""
        self._is_cancelled = True
        for task in self._tasks:
            task.update("Canceled", "--", "Preload canceled by the user.")
            task.cancel_workflow()
    
    @property
    def is_closed(self) -> bool:
        """Whether the preload process has been closed."""
        return self._is_closed
    
    def close(self):
       """Close the preload process and release any bound resources."""
       self._is_closed = True
        for task in self._tasks:
            task.update("Closed", "--", "")
            task.cleanup()
    
    def progress():
       """Generate a table which shows the progress for each data ID and visualize it"""

    # Skipped for now
    # def subscribe(listener: Callable[[str, float, str], None]) -> Callable[[], None]:
    #    """Subscribe to state changes of the preload process.
    # 
    #    Args:
    #        listener: a listener that receives status, progress, and a message
    #    Returns: 
    #        A function to unsubscribe from the preload process.
    #     """

Task Manager

Thsi will be abstract class in xcube, from which we can inherit when implementing a data store plugin.

class PreloadTask(ABC):
    
    def __init__(self, cache_store, data_id, **preload_params):
        self._cache_store = cache_store
        self.data_id = data_id
        self._preload_params = preload_params
        self.status = "Not started"
        self.progress = 0.0
        self.message = "Preloading not started yet."
        
    def run_workflow(self):
        self.download()
        self.processing()

    @abstractmethod
    def cleanup(self):
        """This function is called, when preload is finished or canceled, like clean up temp files."""

    def cancel_workflow(self):
        """Cancel workflow and clean up after wards"""
        # implementation
        self.cleanup()
    
    @abstractmethod
    def download(self):
        """"handles the download"""

    @abstractmethod
    def processing(self):
        """Processing the data from downloaded file, e.g. extraction from zip,
        conversion to another format like zarr"""

    def update(self, status: str, progress: float, message: str):
        """Updates a task and triggers the visualization."""
        self.status = status
        self.progress = progress
        self.message = message
        # some function to trigger the table creation

Usage of PreloadTask in data store implementation

class ClmsPreloadTask(PreloadTask):

    def download(self):
        """"handles the download"""

    def processing(self):
        """Processing the data from downloaded file, e.g. extraction from zip,
        conversion to another format like zarr"""

    def cleanup(self):
        """This function is called, when preload is finished or canceled, like clean up temp files."""

Example in the the ClmsDataStore class

def preload_data(self, *data_ids: str, **preload_params: Any) -> PreloadHandle:
    """Preload data for faster access."""
    # import PreloadTask from the data store plugin. PreloadTask is implemented for each data store plugin.
    return PreloadHandle(cache_store, ClmsPreloadTask, *data_ids, **preload_params)

def get_preload_data_params(self) -> JsonObjectSchema:
    """Get the JSON schema that describes the parameters passed to `preload_data()`."""
    return JsonObjectSchema(additional_properties=False)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants