diff --git a/fiftyone/core/storage.py b/fiftyone/core/storage.py index 236c32f1f7..e4a9467d4a 100644 --- a/fiftyone/core/storage.py +++ b/fiftyone/core/storage.py @@ -207,7 +207,7 @@ def open_files(paths, mode="r", skip_failures=False, progress=None): a list of open file-like objects """ tasks = [(p, mode, skip_failures) for p in paths] - return _run(_do_open_file, tasks, progress=progress) + return _run(_do_open_file, tasks, return_results=True, progress=progress) def read_file(path, binary=False): @@ -239,7 +239,7 @@ def read_files(paths, binary=False, skip_failures=False, progress=None): a list of file contents """ tasks = [(p, binary, skip_failures) for p in paths] - return _run(_do_read_file, tasks, progress=progress) + return _run(_do_read_file, tasks, return_results=True, progress=progress) def write_file(str_or_bytes, path): @@ -793,7 +793,7 @@ def move_files(inpaths, outpaths, skip_failures=False, progress=None): progress callback function to invoke instead """ tasks = [(i, o, skip_failures) for i, o in zip(inpaths, outpaths)] - _run(_do_move_file, tasks, progress=progress) + _run(_do_move_file, tasks, return_results=False, progress=progress) def move_dir( @@ -848,7 +848,7 @@ def delete_files(paths, skip_failures=False, progress=None): progress callback function to invoke instead """ tasks = [(p, skip_failures) for p in paths] - _run(_do_delete_file, tasks, progress=progress) + _run(_do_delete_file, tasks, return_results=False, progress=progress) def delete_dir(dirpath): @@ -861,67 +861,65 @@ def delete_dir(dirpath): etau.delete_dir(dirpath) -def run(fcn, tasks, num_workers=None, progress=None): +def run(fcn, tasks, return_results=True, num_workers=None, progress=None): """Applies the given function to each element of the given tasks. Args: fcn: a function that accepts a single argument tasks: an iterable of function arguments + return_results (True): whether to return the function results num_workers (None): a suggested number of threads to use progress (None): whether to render a progress bar (True/False), use the default value ``fiftyone.config.show_progress_bars`` (None), or a progress callback function to invoke instead Returns: - the list of function outputs - """ - num_workers = fou.recommend_thread_pool_workers(num_workers) - - try: - num_tasks = len(tasks) - except: - num_tasks = None - - kwargs = dict(total=num_tasks, iters_str="files", progress=progress) - - if num_workers <= 1: - with fou.ProgressBar(**kwargs) as pb: - results = [fcn(task) for task in pb(tasks)] - else: - with multiprocessing.dummy.Pool(processes=num_workers) as pool: - with fou.ProgressBar(**kwargs) as pb: - results = list(pb(pool.imap(fcn, tasks))) - - return results + the list of function outputs, or None if ``return_results == False`` + """ + return _run( + fcn, + tasks, + return_results=return_results, + num_workers=num_workers, + progress=progress, + ) def _copy_files(inpaths, outpaths, skip_failures, progress): tasks = [(i, o, skip_failures) for i, o in zip(inpaths, outpaths)] - _run(_do_copy_file, tasks, progress=progress) + _run(_do_copy_file, tasks, return_results=False, progress=progress) + +def _run(fcn, tasks, return_results=True, num_workers=None, progress=None): + try: + num_tasks = len(tasks) + except: + num_tasks = None -def _run(fcn, tasks, num_workers=None, progress=None): - num_tasks = len(tasks) if num_tasks == 0: - return [] + return [] if return_results else None num_workers = fou.recommend_thread_pool_workers(num_workers) - kwargs = dict(total=num_tasks, iters_str="files", progress=progress) - results = [] if num_workers <= 1: with fou.ProgressBar(**kwargs) as pb: - for task in pb(tasks): - result = fcn(task) - results.append(result) + if return_results: + results = [fcn(task) for task in pb(tasks)] + else: + for task in pb(tasks): + fcn(task) else: with multiprocessing.dummy.Pool(processes=num_workers) as pool: with fou.ProgressBar(**kwargs) as pb: - for result in pb(pool.imap_unordered(fcn, tasks)): - results.append(result) + if return_results: + results = list(pb(pool.imap(fcn, tasks))) + else: + for _ in pb(pool.imap_unordered(fcn, tasks)): + pass - return results + if return_results: + return results def _do_copy_file(arg): @@ -1003,6 +1001,8 @@ def _copy_file(inpath, outpath, cleanup=False): etau.ensure_basedir(outpath) if cleanup: shutil.move(inpath, outpath) + else: + shutil.copy(inpath, outpath) def _delete_file(filepath):