From e3b6463993519eafc28d57eb56cd69ff1db3fe20 Mon Sep 17 00:00:00 2001 From: dfguerrerom Date: Wed, 21 Aug 2024 15:45:17 +0200 Subject: [PATCH 1/4] feat: draft async get assets --- sepal_ui/scripts/gee.py | 57 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/sepal_ui/scripts/gee.py b/sepal_ui/scripts/gee.py index b3807e2f..703b679a 100644 --- a/sepal_ui/scripts/gee.py +++ b/sepal_ui/scripts/gee.py @@ -1,5 +1,7 @@ """All the heleper methods to interface Google Earthengine with sepal-ui.""" +import asyncio +from concurrent.futures import ThreadPoolExecutor import time from pathlib import Path from typing import List, Union @@ -83,8 +85,59 @@ def is_running(task_descripsion: str) -> ee.batch.Task: return current_task +async def list_assets_concurrent(folders): + with ThreadPoolExecutor() as executor: + loop = asyncio.get_running_loop() + tasks = [ + loop.run_in_executor(executor, ee.data.listAssets, {"parent": folder}) + for folder in folders + ] + results = await asyncio.gather(*tasks) + return results + + +async def get_assets_async_concurrent(folder: str) -> list: + folder_queue = asyncio.Queue() + await folder_queue.put(folder) + asset_list = [] + + while not folder_queue.empty(): + current_folders = [await folder_queue.get() for _ in range(folder_queue.qsize())] + assets_groups = await list_assets_concurrent(current_folders) + + for assets in assets_groups: + for asset in assets.get("assets", []): + asset_list.append({"type": asset["type"], "name": asset["name"], "id": asset["id"]}) + if asset["type"] == "FOLDER": + await folder_queue.put(asset["name"]) + + return asset_list + + @sd.need_ee -def get_assets(folder: Union[str, Path] = "") -> List[dict]: +def get_assets(folder: Union[str, Path] = "", async_=True) -> List[dict]: + """Get all the assets from the parameter folder. every nested asset will be displayed. + + Args: + + folder: the initial GEE folder + async_: whether or not the function should be executed asynchronously + + Returns: + the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id' + + """ + + folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/" + + if async_: + return asyncio.run(get_assets_async_concurrent(folder)) + + return get_assets_sync(folder) + + +@sd.need_ee +def get_assets_sync(folder: Union[str, Path] = "") -> List[dict]: """Get all the assets from the parameter folder. every nested asset will be displayed. Args: @@ -95,7 +148,6 @@ def get_assets(folder: Union[str, Path] = "") -> List[dict]: """ # set the folder and init the list asset_list = [] - folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/" def _recursive_get(folder, asset_list): @@ -151,6 +203,7 @@ def delete_assets(asset_id: str, dry_run: bool = True) -> None: asset_id: the Id of the asset or a folder dry_run: whether or not a dry run should be launched. dry run will only display the files name without deleting them. """ + # define the action to execute for each asset based on the dry run mode def delete(id: str) -> None: if dry_run is True: From 28c8e7ce5f8a703e757ea88b44a2efdf390f7c4d Mon Sep 17 00:00:00 2001 From: dfguerrerom Date: Wed, 21 Aug 2024 16:39:27 +0200 Subject: [PATCH 2/4] feat: fallback to sync call in case async fails --- sepal_ui/scripts/gee.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sepal_ui/scripts/gee.py b/sepal_ui/scripts/gee.py index 703b679a..9893fabc 100644 --- a/sepal_ui/scripts/gee.py +++ b/sepal_ui/scripts/gee.py @@ -11,6 +11,9 @@ from sepal_ui.message import ms from sepal_ui.scripts import decorator as sd +import nest_asyncio + +nest_asyncio.apply() @sd.need_ee @@ -131,7 +134,13 @@ def get_assets(folder: Union[str, Path] = "", async_=True) -> List[dict]: folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/" if async_: - return asyncio.run(get_assets_async_concurrent(folder)) + try: + return asyncio.run(get_assets_async_concurrent(folder)) + except Exception as e: + # Log the exception for future debugging + print(f"Error occurred in get_assets_async_concurrent: {e}") + # Fallback to synchronous method + return get_assets_sync(folder) return get_assets_sync(folder) From 4bb79a2a0320ce24a6a15d711cde205a7e50ca27 Mon Sep 17 00:00:00 2001 From: dfguerrerom Date: Wed, 21 Aug 2024 16:42:17 +0200 Subject: [PATCH 3/4] style: add docstring --- sepal_ui/scripts/gee.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/sepal_ui/scripts/gee.py b/sepal_ui/scripts/gee.py index 9893fabc..9d8b60d4 100644 --- a/sepal_ui/scripts/gee.py +++ b/sepal_ui/scripts/gee.py @@ -1,17 +1,17 @@ """All the heleper methods to interface Google Earthengine with sepal-ui.""" import asyncio -from concurrent.futures import ThreadPoolExecutor import time +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import List, Union import ee import ipyvuetify as v +import nest_asyncio from sepal_ui.message import ms from sepal_ui.scripts import decorator as sd -import nest_asyncio nest_asyncio.apply() @@ -88,7 +88,15 @@ def is_running(task_descripsion: str) -> ee.batch.Task: return current_task -async def list_assets_concurrent(folders): +async def list_assets_concurrent(folders: list) -> list: + """List assets concurrently using ThreadPoolExecutor. + + Args: + folders: list of folders to list assets from + + Returns: + list of assets for each folder + """ with ThreadPoolExecutor() as executor: loop = asyncio.get_running_loop() tasks = [ @@ -99,7 +107,16 @@ async def list_assets_concurrent(folders): return results -async def get_assets_async_concurrent(folder: str) -> list: +async def get_assets_async_concurrent(folder: str) -> List[dict]: + """Get all the assets from the parameter folder. every nested asset will be displayed. + + Args: + folder: the initial GEE folder + + Returns: + the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id' + + """ folder_queue = asyncio.Queue() await folder_queue.put(folder) asset_list = [] @@ -122,7 +139,6 @@ def get_assets(folder: Union[str, Path] = "", async_=True) -> List[dict]: """Get all the assets from the parameter folder. every nested asset will be displayed. Args: - folder: the initial GEE folder async_: whether or not the function should be executed asynchronously @@ -130,7 +146,6 @@ def get_assets(folder: Union[str, Path] = "", async_=True) -> List[dict]: the asset list. each asset is a dict with 3 keys: 'type', 'name' and 'id' """ - folder = str(folder) or f"projects/{ee.data._cloud_api_user_project}/assets/" if async_: @@ -212,7 +227,6 @@ def delete_assets(asset_id: str, dry_run: bool = True) -> None: asset_id: the Id of the asset or a folder dry_run: whether or not a dry run should be launched. dry run will only display the files name without deleting them. """ - # define the action to execute for each asset based on the dry run mode def delete(id: str) -> None: if dry_run is True: From 4ce5069a6539d7a7d450a57efdc9d0f04f551474 Mon Sep 17 00:00:00 2001 From: dfguerrerom Date: Fri, 23 Aug 2024 13:09:43 -0700 Subject: [PATCH 4/4] feat: limit the number of async tasks based on memory and earthengine rate limits --- sepal_ui/scripts/gee.py | 34 ++++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/sepal_ui/scripts/gee.py b/sepal_ui/scripts/gee.py index 9d8b60d4..4c7adb63 100644 --- a/sepal_ui/scripts/gee.py +++ b/sepal_ui/scripts/gee.py @@ -1,6 +1,7 @@ """All the heleper methods to interface Google Earthengine with sepal-ui.""" import asyncio +import os import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path @@ -9,10 +10,12 @@ import ee import ipyvuetify as v import nest_asyncio +import psutil from sepal_ui.message import ms from sepal_ui.scripts import decorator as sd +# This I have to add because of the error: RuntimeError: This event loop is already running when using jupyter notebook nest_asyncio.apply() @@ -88,7 +91,7 @@ def is_running(task_descripsion: str) -> ee.batch.Task: return current_task -async def list_assets_concurrent(folders: list) -> list: +async def list_assets_concurrent(folders: list, semaphore: asyncio.Semaphore) -> list: """List assets concurrently using ThreadPoolExecutor. Args: @@ -97,14 +100,15 @@ async def list_assets_concurrent(folders: list) -> list: Returns: list of assets for each folder """ - with ThreadPoolExecutor() as executor: - loop = asyncio.get_running_loop() - tasks = [ - loop.run_in_executor(executor, ee.data.listAssets, {"parent": folder}) - for folder in folders - ] - results = await asyncio.gather(*tasks) - return results + async with semaphore: + with ThreadPoolExecutor() as executor: + loop = asyncio.get_running_loop() + tasks = [ + loop.run_in_executor(executor, ee.data.listAssets, {"parent": folder}) + for folder in folders + ] + results = await asyncio.gather(*tasks) + return results async def get_assets_async_concurrent(folder: str) -> List[dict]: @@ -121,9 +125,19 @@ async def get_assets_async_concurrent(folder: str) -> List[dict]: await folder_queue.put(folder) asset_list = [] + # Determine system resources + cpu_count = os.cpu_count() + available_memory = psutil.virtual_memory().available + + # 50 MB per task + max_concurrent_tasks = min(30, cpu_count, available_memory // (50 * 1024 * 1024)) + + # Create a semaphore to limit the number of concurrent tasks + semaphore = asyncio.Semaphore(max_concurrent_tasks) + while not folder_queue.empty(): current_folders = [await folder_queue.get() for _ in range(folder_queue.qsize())] - assets_groups = await list_assets_concurrent(current_folders) + assets_groups = await list_assets_concurrent(current_folders, semaphore) for assets in assets_groups: for asset in assets.get("assets", []):