Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: draft async get assets #934

Merged
merged 4 commits into from
Aug 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 92 additions & 2 deletions sepal_ui/scripts/gee.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
"""All the heleper methods to interface Google Earthengine with sepal-ui."""

import asyncio
import os
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List, Union

import ee
import ipyvuetify as v
import nest_asyncio
import psutil

from sepal_ui.message import ms
from sepal_ui.scripts import decorator as sd

# This I have to add because of the error: RuntimeError: This event loop is already running when using jupyter notebook
nest_asyncio.apply()


@sd.need_ee
def wait_for_completion(task_descripsion: str, widget_alert: v.Alert = None) -> str:
Expand Down Expand Up @@ -83,8 +91,91 @@ def is_running(task_descripsion: str) -> ee.batch.Task:
return current_task


async def list_assets_concurrent(folders: list, semaphore: asyncio.Semaphore) -> list:
"""List assets concurrently using ThreadPoolExecutor.

Args:
folders: list of folders to list assets from

Returns:
list of assets for each folder
"""
async with semaphore:
with ThreadPoolExecutor() as executor:
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(executor, ee.data.listAssets, {"parent": folder})
for folder in folders
]
results = await asyncio.gather(*tasks)
return results


async def get_assets_async_concurrent(folder: str) -> List[dict]:
"""Get all the assets from the parameter folder. every nested asset will be displayed.

Args:
folder: the initial GEE folder

Returns:
the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'

"""
folder_queue = asyncio.Queue()
await folder_queue.put(folder)
asset_list = []

# Determine system resources
cpu_count = os.cpu_count()
available_memory = psutil.virtual_memory().available

# 50 MB per task
max_concurrent_tasks = min(30, cpu_count, available_memory // (50 * 1024 * 1024))

# Create a semaphore to limit the number of concurrent tasks
semaphore = asyncio.Semaphore(max_concurrent_tasks)

while not folder_queue.empty():
current_folders = [await folder_queue.get() for _ in range(folder_queue.qsize())]
assets_groups = await list_assets_concurrent(current_folders, semaphore)

for assets in assets_groups:
for asset in assets.get("assets", []):
asset_list.append({"type": asset["type"], "name": asset["name"], "id": asset["id"]})
if asset["type"] == "FOLDER":
await folder_queue.put(asset["name"])

return asset_list


@sd.need_ee
def get_assets(folder: Union[str, Path] = "", async_=True) -> List[dict]:
"""Get all the assets from the parameter folder. every nested asset will be displayed.

Args:
folder: the initial GEE folder
async_: whether or not the function should be executed asynchronously

Returns:
the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id'

"""
folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/"

if async_:
try:
return asyncio.run(get_assets_async_concurrent(folder))
except Exception as e:
# Log the exception for future debugging
print(f"Error occurred in get_assets_async_concurrent: {e}")
# Fallback to synchronous method
return get_assets_sync(folder)

return get_assets_sync(folder)


@sd.need_ee
def get_assets(folder: Union[str, Path] = "") -> List[dict]:
def get_assets_sync(folder: Union[str, Path] = "") -> List[dict]:
"""Get all the assets from the parameter folder. every nested asset will be displayed.

Args:
Expand All @@ -95,7 +186,6 @@ def get_assets(folder: Union[str, Path] = "") -> List[dict]:
"""
# set the folder and init the list
asset_list = []
folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/"

def _recursive_get(folder, asset_list):

Expand Down
Loading