Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-thread hashing of large S3 files #1788

Merged
merged 1 commit into from
Oct 13, 2020
Merged

Conversation

sir-sigurd
Copy link
Member

@sir-sigurd sir-sigurd commented Sep 7, 2020

Benchmarks on m5a.large

Single/several large files of the same size

Before

In [1]: import quilt3; quilt3.data_transfer.calculate_sha256(
    [quilt3.util.PhysicalKey('ai2-datasets', 'arc/ARC-V1-Feb2018.zip', None)], [680841265])
Hashing: 100%|██████████████████████████████████████████████████████████████| 681M/681M [00:28<00:00, 23.7MB/s]

In [7]: import quilt3; quilt3.data_transfer.calculate_sha256(
    [quilt3.util.PhysicalKey('ai2-datasets', 'arc/ARC-V1-Feb2018.zip', None)]*4, [680841265]*4)
Hashing: 100%|████████████████████████████████████████████████████████████| 2.72G/2.72G [00:32<00:00, 85.1MB/s]

In [7]: import quilt3; quilt3.data_transfer.calculate_sha256(
    [quilt3.util.PhysicalKey('ai2-datasets', 'arc/ARC-V1-Feb2018.zip', None)]*10, [680841265]*10)
Hashing: 100%|█████████████████████████████████████████████████████████████| 6.81G/6.81G [01:04<00:00, 106MB/s]

After

In [2]: import quilt3; quilt3.data_transfer.calculate_sha256(
    [quilt3.util.PhysicalKey('ai2-datasets', 'arc/ARC-V1-Feb2018.zip', None)], [680841265])
Hashing: 100%|███████████████████████████████████████████████████████████████| 681M/681M [00:04<00:00, 141MB/s]

In [5]: import quilt3; quilt3.data_transfer.calculate_sha256(
    [quilt3.util.PhysicalKey('ai2-datasets', 'arc/ARC-V1-Feb2018.zip', None)]*4, [680841265]*4)
Hashing: 100%|█████████████████████████████████████████████████████████████| 2.72G/2.72G [00:15<00:00, 174MB/s]

In [8]: import quilt3; quilt3.data_transfer.calculate_sha256(
    [quilt3.util.PhysicalKey('ai2-datasets', 'arc/ARC-V1-Feb2018.zip', None)]*10, [680841265]*10)
Hashing: 100%|█████████████████████████████████████████████████████████████| 6.81G/6.81G [00:37<00:00, 184MB/s]

Multiple small files, a single large file

Before

In [3]: import quilt3
   ...: pkg = quilt3.Package.browse('ai2/arc', registry='s3://ai2-datasets')['extracted']
   ...: for _, e in pkg.walk():
   ...:     e.hash = None
   ...: pkg.build('test/test-hash')
Loading manifest: 100%|████████████████████████████████████████████████| 19/19 [00:00<00:00, 19848.51entries/s]
Hashing: 100%|████████████████████████████████████████████████████████████| 1.49G/1.49G [01:10<00:00, 21.1MB/s]

After

In [11]: import quilt3
    ...: pkg = quilt3.Package.browse('ai2/arc', registry='s3://ai2-datasets')['extracted']
    ...: for _, e in pkg.walk():
    ...:     e.hash = None
    ...: pkg.build('test/test-hash')
Loading manifest: 100%|████████████████████████████████████████████████| 19/19 [00:00<00:00, 21099.23entries/s]
Hashing: 100%|█████████████████████████████████████████████████████████████| 1.49G/1.49G [00:09<00:00, 151MB/s]

Multiple small files, two large files

Before

In [6]: import quilt3
   ...: pkg = quilt3.Package.browse('ai2/arc', registry='s3://ai2-datasets')
   ...: for _, e in pkg.walk():
   ...:     e.hash = None
   ...: pkg.build('test/test-hash')
Loading manifest: 100%|████████████████████████████████████████████████| 19/19 [00:00<00:00, 19828.76entries/s]
Hashing: 100%|████████████████████████████████████████████████████████████| 2.17G/2.17G [01:17<00:00, 28.1MB/s]

After

In [14]: import quilt3
    ...: pkg = quilt3.Package.browse('ai2/arc', registry='s3://ai2-datasets')
    ...: for _, e in pkg.walk():
    ...:     e.hash = None
    ...: pkg.build('test/test-hash')
Loading manifest: 100%|████████████████████████████████████████████████| 19/19 [00:00<00:00, 21342.20entries/s]
Hashing: 100%|█████████████████████████████████████████████████████████████| 2.17G/2.17G [00:12<00:00, 170MB/s]

This PR changes the number of parallel S3 requests: previously it was the same as max_workers of ThreadPoolExecutor, now it's the same as for other multi-thread operations (10).

TODO

  • finer progress bar updates
  • test performance with multiple files
  • finer concurrency control
  • tests
  • clean up code in util get rid of asyncio
  • add comments regarding cancelling enqueued tasks

@sir-sigurd sir-sigurd force-pushed the multi-thread-hashing branch 2 times, most recently from 34c38d6 to f46af33 Compare September 7, 2020 18:15
@codecov-commenter
Copy link

codecov-commenter commented Sep 7, 2020

Codecov Report

Merging #1788 into master will increase coverage by 0.09%.
The diff coverage is 93.47%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1788      +/-   ##
==========================================
+ Coverage   89.44%   89.53%   +0.09%     
==========================================
  Files          62       62              
  Lines        7190     7284      +94     
==========================================
+ Hits         6431     6522      +91     
- Misses        759      762       +3     
Flag Coverage Δ
#api-python 88.05% <93.47%> (+0.17%) ⬆️
#lambda 92.23% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
api/python/quilt3/data_transfer.py 79.46% <90.00%> (+1.52%) ⬆️
api/python/tests/test_data_transfer.py 100.00% <100.00%> (ø)
api/python/tests/utils.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2018aa2...1056a19. Read the comment docs.

@akarve akarve requested a review from dimaryaz September 7, 2020 23:27
Copy link
Contributor

@dimaryaz dimaryaz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My biggest concern here is introducing asyncio for just one function, while using regular threadpools everywhere else.

I think asyncio is great and we should probably switch everything to it - but it's a fairly big and risky change, so we should do it separately, not as part of a performance fix for one function. We should think about proper design, and possibly even expose async functions to the user. But, that will require lots of testing. For now, I think it's best to fix the download the "old" way.

One potential problem with parallel downloads: because hashing has to be done sequentially and cannot be parallelized (for a given file), there is a possibility of running out of memory. If the network is very fast and CPU cannot keep up with the download, this can potentially load every chunk of the file into memory at the same time.

@akarve
Copy link
Member

akarve commented Sep 8, 2020

@sir-sigurd I talked a bit more with Dima about this and we're OK with asyncio if that's your choice; we just need to test it and unit test it carefully as it's a new dependency for us.

@sir-sigurd sir-sigurd force-pushed the multi-thread-hashing branch 6 times, most recently from 408f619 to 550ed9a Compare September 16, 2020 19:02
@sir-sigurd sir-sigurd marked this pull request as ready for review September 16, 2020 19:07
Copy link
Member

@akarve akarve left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested on large complex package and got the same top-hash (nice work); a few comments inline

api/python/quilt3/data_transfer.py Outdated Show resolved Hide resolved
api/python/quilt3/data_transfer.py Outdated Show resolved Hide resolved
api/python/quilt3/data_transfer.py Outdated Show resolved Hide resolved
api/python/quilt3/data_transfer.py Outdated Show resolved Hide resolved
api/python/quilt3/data_transfer.py Show resolved Hide resolved
s3_client_provider = S3ClientProvider()
# This controls how many parts can be stored in the memory.
# This includes the ones that are being downloaded or hashed.
s3_max_pending_parts = s3_transfer_config.max_request_concurrency * 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why * 4?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted; it should be a named constant; might even make sense to adjust it based on available memory

api/python/quilt3/data_transfer.py Outdated Show resolved Hide resolved
@akarve
Copy link
Member

akarve commented Sep 16, 2020

I got the following error in Jupyter (can you repro in iPython as well)? A lot of our users are using quilt3 in Jupyter/iPython so we'll have to consider that:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-11-91e1a0c98333> in <module>
      2     "sequencer/GUID",
      3     message="Initial instrument data upload",
----> 4     registry=f"s3://{buckets['raw']}"
      5 )

~/code/quilt/api/python/quilt3/telemetry.py in decorated(*args, **kwargs)
    129             ApiTelemetry.report_api_use(self.api_name, ApiTelemetry.session_id)
    130 
--> 131             results = func(*args, **kwargs)
    132             # print(f"{len(ApiTelemetry.pending_reqs)} request(s) pending!")
    133 

~/code/quilt/api/python/quilt3/packages.py in push(self, name, registry, dest, message, selector_fn)
   1257                 )
   1258 
-> 1259         self._fix_sha256()
   1260 
   1261         pkg = self.__class__()

~/code/quilt/api/python/quilt3/packages.py in _fix_sha256(self)
    886             sizes.append(entry.size)
    887 
--> 888         results = calculate_sha256(physical_keys, sizes)
    889         exc = None
    890         for entry, obj_hash in zip(self._incomplete_entries, results):

~/code/quilt/api/python/quilt3/data_transfer.py in calculate_sha256(src_list, sizes)
    859     if not src_list:
    860         return []
--> 861     return _calculate_sha256_internal(src_list, sizes, [None] * len(src_list))
    862 
    863 

~/anaconda3/envs/de/lib/python3.7/site-packages/tenacity/__init__.py in wrapped_f(*args, **kw)
    327         @_utils.wraps(f)
    328         def wrapped_f(*args, **kw):
--> 329             return self.call(f, *args, **kw)
    330 
    331         def retry_with(*args, **kwargs):

~/anaconda3/envs/de/lib/python3.7/site-packages/tenacity/__init__.py in call(self, fn, *args, **kwargs)
    407             retry_object=self, fn=fn, args=args, kwargs=kwargs)
    408         while True:
--> 409             do = self.iter(retry_state=retry_state)
    410             if isinstance(do, DoAttempt):
    411                 try:

~/anaconda3/envs/de/lib/python3.7/site-packages/tenacity/__init__.py in iter(self, retry_state)
    354             and isinstance(retry_state.outcome.exception(), TryAgain)
    355         if not (is_explicit_retry or self.retry(retry_state=retry_state)):
--> 356             return fut.result()
    357 
    358         if self.after is not None:

~/anaconda3/envs/de/lib/python3.7/concurrent/futures/_base.py in result(self, timeout)
    426                 raise CancelledError()
    427             elif self._state == FINISHED:
--> 428                 return self.__get_result()
    429 
    430             self._condition.wait(timeout)

~/anaconda3/envs/de/lib/python3.7/concurrent/futures/_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

~/anaconda3/envs/de/lib/python3.7/site-packages/tenacity/__init__.py in call(self, fn, *args, **kwargs)
    410             if isinstance(do, DoAttempt):
    411                 try:
--> 412                     result = fn(*args, **kwargs)
    413                 except BaseException:
    414                     retry_state.set_exception(sys.exc_info())

~/code/quilt/api/python/quilt3/data_transfer.py in _calculate_sha256_internal(src_list, sizes, results)
   1005                 results[i] = await f
   1006 
-> 1007     util.asyncio_run(main())
   1008     return results
   1009 

~/anaconda3/envs/de/lib/python3.7/asyncio/runners.py in run(main, debug)
     32     if events._get_running_loop() is not None:
     33         raise RuntimeError(
---> 34             "asyncio.run() cannot be called from a running event loop")
     35 
     36     if not coroutines.iscoroutine(main):

RuntimeError: asyncio.run() cannot be called from a running event loop

Copy link
Contributor

@dimaryaz dimaryaz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding Jupyter: that's my concern about adding asyncio :-\ We don't know what's going to break.

api/python/quilt3/data_transfer.py Outdated Show resolved Hide resolved
api/python/quilt3/data_transfer.py Outdated Show resolved Hide resolved
@akarve
Copy link
Member

akarve commented Sep 17, 2020

Regarding Jupyter: that's my concern about adding asyncio :-\ We don't know what's going to break.

This might be solved in the new Jupyter. Still seems kinda strict to require a specific version of Jupyter.

How different exactly would this code like with threading primitives?

@codecov-io
Copy link

codecov-io commented Oct 8, 2020

Codecov Report

Merging #1788 into master will increase coverage by 0.12%.
The diff coverage is 93.47%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1788      +/-   ##
==========================================
+ Coverage   88.72%   88.85%   +0.12%     
==========================================
  Files          93       93              
  Lines       11772    11958     +186     
==========================================
+ Hits        10445    10625     +180     
- Misses       1327     1333       +6     
Flag Coverage Δ
#api-python 87.87% <93.47%> (+0.18%) ⬆️
#lambda 92.30% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
api/python/quilt3/data_transfer.py 79.31% <90.00%> (+1.53%) ⬆️
api/python/tests/test_data_transfer.py 99.08% <100.00%> (+0.05%) ⬆️
api/python/tests/utils.py 100.00% <100.00%> (ø)
...:/Users/circleci/project/api/python/tests/utils.py 100.00% <0.00%> (ø)
...eci/project/api/python/tests/test_data_transfer.py 99.53% <0.00%> (+0.02%) ⬆️
...ircleci/project/api/python/quilt3/data_transfer.py 79.19% <0.00%> (+1.52%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 80cc412...c1bfcda. Read the comment docs.

Copy link
Contributor

@kevinemoore kevinemoore left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not confident in my ability to review these changes, but I don't see any issues. It would be great to get @dimaryaz to take a look.

else:
stopped_event.set()
finally:
if not f.cancelled():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the semaphore release only called if not f.cancelled?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that so the hashing of that part will be retried?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future can be cancelled only before being runned, so semaphore wasn't acquired when f.cancelled() is true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still not great to have acquire and release in different places - if nothing else, it's hard to prove that the logic is correct.

Theoretically, if logger.debug('%r part %s: download enqueued', src, part_number) throws an exception, then release will be called without an acquire. It's pretty difficult to make that happen, so kind of a negligible risk, but still...

Copy link
Contributor

@dimaryaz dimaryaz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still a bit more complicated than I'd like, but I think it's correct, so don't want to block on that. We can improve it in the future.

else:
stopped_event.set()
finally:
if not f.cancelled():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still not great to have acquire and release in different places - if nothing else, it's hard to prove that the logic is correct.

Theoretically, if logger.debug('%r part %s: download enqueued', src, part_number) throws an exception, then release will be called without an acquire. It's pretty difficult to make that happen, so kind of a negligible risk, but still...

@sir-sigurd sir-sigurd merged commit 9ceb8e6 into master Oct 13, 2020
@sir-sigurd sir-sigurd deleted the multi-thread-hashing branch October 13, 2020 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants