Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow batched/concurrent (de)compression support #1398

Closed
akshaysubr opened this issue Apr 20, 2023 · 15 comments
Closed

Allow batched/concurrent (de)compression support #1398

akshaysubr opened this issue Apr 20, 2023 · 15 comments

Comments

@akshaysubr
Copy link
Contributor

I have a proposal for an enhancement and would like to get feedback on this and potentially better ways of achieving the same goal.

In many use cases that are read I/O bandwidth bound like in dataloaders for AI training, compression is typically turned off because that would end up being the bottleneck. We can get upwards of 10 GB/s of read throughput either from an object store or from a distributed filesystem like lustre. But CPU decompression typically maxes out at ~1 GB/s and that is usually with coarse grained multithreaded parallelism. A nice solution to this problem would be to decompress data on the GPU, which we can do at > 50 GB/s. This would make decompression no longer be the bottleneck while speeding up all other parts of the pipeline: lower use of storage, lower data volume fetched from storage, higher effective cache capacity if caching locally and faster CPU-GPU transfers. The issue with any kind of parallel decompression is that the API needs to support batched or concurrent decompression rather than calling decompression on chunks serially in a loop.

Here is some data comparing throughput of some GPU decompression algorithms in nvcomp to multithreaded zstd on the CPU:
decomp_throughput

This is conceptually similar to #547 with concurrent chunk accesses, but for compression/decompression.

The idea is to allow Codecs in numcodecs to implement a batched_encode and batched_decode in addition to the current encode and decode methods. When a codec has these methods available, zarr can dispatch a batch of chunks for encode/decode. The codec implementation can then either use a serial loop, multi-threaded parallelism or parallelize on the GPU using nvcomp. I'm envisioning this to be quite similar to getitems here:

if not hasattr(self.chunk_store, "getitems"):

From a GPU (de)compression standpoint, we have thought about using the sharding transformer format in ZEP0002 as the internal format in nvcomp. But after some consideration, this batched compression approach seems much more useful because:

  1. We can start using GPU decompression with existing zarr datasets that don't have the shard structure. The new sharding format would make GPU compression/decompression much more efficient but would not be required for functionality.
  2. We can support GPU decompression when accessing subset of chunks within a shard or from multiple shards, like when slicing along a dimension.
  3. We wouldn't have to add any new shard access api in zarr or multi-dimensional aware codec implementations.
  4. We don't need to have separate CPU and GPU codec implementations using the same underlying compression algorithm. For example, we can just have a single LZ4 codec that can use the GPU if available in batched mode or use the CPU multi-threaded path or just single threaded on the CPU. So data compressed with a GPU will be directly compatible with CPU decompression allowing users to not require a GPU to decompress a file.

Would really appreciate any suggestions, concerns or other ideas that I might be missing. Also cc @jakirkham since he mentioned that there might be some intersection with Blosc.

@madsbk
Copy link
Contributor

madsbk commented Apr 21, 2023

I think this is a great idea!

@rabernat
Copy link
Contributor

Hi @akshaysubr - thanks for sharing this idea and sorry for missing this notification the first time around.

I think your suggestion of a multithreaded download / decompression layer is a really great one. This would be useful not only for the sharded format but also for existing datasets.

Today we can achieve something similar when we use Zarr together with Dask Array in threaded mode. However, there are many scenarios where a user would want this capability (e.g in ML workflows as you described) when using Dask would be overkill. So I strongly support pursuing this direction.

There is a bit of design work to be done in terms of figuring out the best way to implement this feature. It might be helpful to have a synchronous brainstorming session (including @zarr-developers/python-core-devs) on this.

@kephale
Copy link

kephale commented May 22, 2023

@rabernat great, thanks for pointing me here. I'd be happy to help out here because we also need multithreaded chunk fetching. Please loop me in on the brainstorming session.

@martindurant
Copy link
Member

I would caution against complexity here, at least for the CPU case, for the following reasons:

  • in most cases, fetch time is dominated by IO and not CPU bound even if compressed, so no amount of threading will help particularly versus concurrent requests, which we already have with fsspec and several of its backends. My experimental rfsspec project is designed to test threading the IO part, and with zarr workflows is typically seeing only small improvements if any (but still needs testing on a many-dask-threads setup)
  • most of the numcodecs don't release the GIL, so threads won't help decode time at all. The exception, I think, is blosc (which is default and therefore common in the dask sphere), but this also provides multithreaded decompression per chunk, depending on the specific algorithm
  • dask[array] is not a heavy dependency when we already require numpy. The real problem is that each task returns array pieces even in threaded mode, and these get copied into the output, rather than decompressing/decoding directly without copy. Dask doesn't like mutation in place, but it can do it.

Having said all that, the there here is GPUs, where the cost-per-call is high compared to the throughput. The points are well made! Skirting, for the moment, the problem of IO directly to the GPU, I find the scenario described attractive but perhaps tricky to implement. It sort of points towards an extra layer in the current chain of operations. It ends up similar to calling dask with partition sizes larger than the native zarr chunk sizes.

I guess at this point, it would be good to see what a "batch decode" method might look like, and then we can discuss how it might replace or augment the current loop.

(a note on #547 - this feels like what kerchunk achieves?)

@JackKelly
Copy link
Contributor

This sounds really interesting!

We have a similar use-case as the use-case outlined in the original post... This might be a bit off-topic but perhaps it's useful to give some detail on our use-case (because I suspect we're not alone!) to help give context to the current discussion.

Our use-case:

  • We train large ML models on the GPU (with pytorch).
  • We often need to load data at multiple gigabytes per second to keep the GPU fed with data.
  • We need to load on the order of 1,000 "ML training examples" from disk every second. Each example is a random slice from a multi-dimensional dataset (e.g. a 64x64x8 slice from a 1000x1000x1000 dataset of numerical weather predictions). Each example may span multiple Zarr chunks.
  • We train on-prem (with data on a local SSD) and in the cloud (with data in a bucket).
  • We've tried hard to use dask but, for our use case, dask often doesn't seem to help (although I can't rule out the possibility that I'm doing it wrong!). My current hunch is that we shouldn't use dask during the performance-critical ML training (although we do use dask for offline data analysis!).
  • At the moment, we reluctantly prepare training batches "offline" and save these to disk, and then train from these prepared batches. But we'd far prefer to be able to train on-the-fly from the original Zarr datasets.
  • We would like to continue using "fast" compression (like zstd) because we need to limit our storage costs!

For us, I think the ideal solution would be something like this:

  1. Our python code requests 1,000 random slices in one go.
  2. The underlying Zarr library immediately returns a Future to our calling code (and releases the GIL).
  3. While waiting for this data, our code can get on with ML training on the GPU (using data loaded from a previous call to the Zarr library).
  4. Under the hood, the Zarr library:
    • Requests all chunks from the storage medium in one batch. (Batching is good because SSDs are surprisingly fast at random reads as long as the SSD is given a really long queue of IO tasks in one go. And, for cloud buckets, batching is good so we only have to pay the ~100 millisecond "storage bucket latency penalty" once, instead of 1,000 times!)
    • (If the dataset uses Zarr sharding then the Zarr library could also figure out if some of the requested chunks are in a contiguous region of the shard, and hence all those chunks can be loaded in a single sequential read.)
    • Given that we requested > 1,000 chunks in one go, and they might all turn up in very quick succession, my hunch is that it will be important to decompress in parallel (using multiple CPU cores and/or the GPU).

Earlier in the year, I was seriously considering attempting to implement a Zarr library in Rust (with sharding support)... but work is getting increasingly busy, so I'll struggle to get this done... and new developments might solve most of my problems.

One last thought: On the question of "how much does concurrency help Zarr read speeds", it might be interesting to benchmark tensorstore / TileDB / caterva (which all implement multithreaded reading of chunks, IIRC). I'm currently hoping to benchmark these things later this year (but no promises!).

@rabernat
Copy link
Contributor

rabernat commented May 25, 2023

  • in most cases, fetch time is dominated by IO and not CPU bound even if compressed, so no amount of threading will help particularly versus concurrent requests,

I think it is at least worth trying to quantitatively examine this assumption under the new parameters that will be opened up by sharding. Specifically, say we are fetching a lot of really tiny chunks from S3 as part of a single shard: for example, a 10 MB shard containing 1000 10 KB chunks. In this case, we really only need one request to S3, to the opportunity for async concurrency is limited. In our current implementation, we would then have to loop over 1000 10 KB chunks, decompress each in serial, and assemble them into a contiguous array. On a basic level, it's very hard to imagine that parallelism of some sort would not be able to help here. Multithreaded parallelism on the CPU should be effective (provided we can find a way to release the GIL). Or parallelism within the GPU.

Very knowledgeable people on this thread, with a lot of close-to-the-metal experience with high-performance I/O pipelines, think this is a promising direction to explore. Someone should try to implement batched_encode and batched_decode to test these ideas.

@d-v-b
Copy link
Contributor

d-v-b commented May 25, 2023

most of the numcodecs don't release the GIL, so threads won't help decode time at all.

This seems like a pretty glaring problem for performance?

@rabernat
Copy link
Contributor

This seems like a pretty glaring problem for performance?

I feel like this must be solvable. Nearly all of the codecs are implemented in a compiled language, c, cython, etc.

If not, we can implement the store / codec pipeline in rust! 🦀

The NVIDIA folks on this thread don't rely on numcodecs--they have their own GPU-native codecs.

@martindurant
Copy link
Member

we would then have to loop over 1000 10 KB chunks

Noting order of magnitude timescales, to decompess 10_000 bytes of original data:

  • 10us with gzip
  • 600ns with zstd

Your batched decompress had better release the GIL for the entirety of the process, rather than repeatedly claim and release it...

@martindurant
Copy link
Member

we can implement the store / codec pipeline in rust!

Happy to help with this, you might want to check out cramjam for de/compressors. I do believe we are approaching a tipping where rust is becoming easier than cython.

@akshaysubr
Copy link
Contributor Author

There is a bit of design work to be done in terms of figuring out the best way to implement this feature. It might be helpful to have a synchronous brainstorming session (including @zarr-developers/python-core-devs) on this.

@rabernat This sounds like a good idea. Please keep me in the loop for this brainstorming discussion.

in most cases, fetch time is dominated by IO and not CPU bound even if compressed, so no amount of threading will help particularly versus concurrent requests,

I agree about having concurrent I/O requests in flight being more important than just threading. In terms of I/O being the bottleneck though, we've found this to not always be true in our ML training use cases for a few reasons, many of which are particular to the specific use case:

  1. We can sustain ~2GB/s of throughput from object storage per compute node. If only using one GPU, then this is sufficient I/O throughput, but single threaded decompression, say using deflate/zstd, cannot keep up with that (see lzbench results). This issue becomes much more severe if we are loading data from a parallel filesystem like lustre or from a node local NVMe drive which can sustain >10GB/s of throughput.
  2. In most cases, we actually use 8 GPUs per node and have 1 process per GPU for efficiency. This means that per GPU, we need at least ~5 threads (across pytorch, CUDA driver, etc.). That's 40 threads per node already. On top of that we need further asynchronous data loading threads and decompression threads and it starts to add up really fast. This limits the number of CPU threads we can use for decompression to at most 4 per GPU which again makes it hard to get high enough throughput. If we increase the thread count for decompression any further, that leads to pytorch or CUDA driver threads getting evicted for long periods of time causing very significant slowdowns on that GPU and the synchronous nature of data parallel training means all processes in the job have to pay this cost. This is one of the bigger motivations for us to try and use the GPU for decompression and allow the CPU to perform all the control tasks more efficiently.

I guess at this point, it would be good to see what a "batch decode" method might look like, and then we can discuss how it might replace or augment the current loop.

We are in the process of implementing this right now. Tagging @Alexey-Kamenev who is working on a draft implementation currently. Will post some snippets here in a couple of weeks after we have something satisfactory.

@JackKelly Your use case sounds astonishingly similar to ours. We also have the exact same issue using zarr in an ML training pipeline in PyTorch . The solution we've been using is to do some initial dataset curation before training and use DALI for asynchronous data loading and prefetching to overlap compute and I/O. We currently turn off compression mainly to avoid decompression bottlenecks. Adding GPU compression and reading directly from the original zarr dataset would help all parts of the pipeline from I/O to local cache capacity to higher effective PCIe bandwidth for CPU->GPU transfers.

One last thought: On the question of "how much does concurrency help Zarr read speeds", it might be interesting to benchmark tensorstore / TileDB / caterva (which all implement multithreaded reading of chunks, IIRC). I'm currently hoping to benchmark these things later this year (but no promises!).

@rabernat had benchmarked this a while ago in this post. Here is another benchmark that we did for internally:
Screenshot 2023-05-25 at 9 19 30 AM

The NVIDIA folks on this thread don't rely on numcodecs--they have their own GPU-native codecs.

We were actually planning on using numcodecs similar to what's in kvikio currently (but without batched encode/decode). In nvCOMP, we have some GPU-native codecs like GDeflate, but we also do support many standard codecs including LZ4, Snappy, Deflate and zStandard. One of the big reasons for wanting batched encode and decode is that one can create a zarr store using a standard LZ4 CPU codec but then

In general, I wanted to provide some context on how GPU decompression works in nvCOMP. The idea is similar to parallel CPU decompression, with multiple chunks being decompresses independently. But we also add fine grained parallelism so multiple CUDA threads can cooperatively process each chunk. Both these dimensions of parallelism are important to get maximum throughput. Here is an image that describes this better:
Screenshot 2023-05-23 at 4 15 25 PM

@akshaysubr
Copy link
Contributor Author

Wanted to update here that an implementation of a Codec with batched encode and decode methods is now merged into kvikIO: rapidsai/kvikio#249

Here's the basic structure of the API:

class NvCompBatchCodec(Codec):
    def __init__(
        self,
        algorithm: str,
        options: Optional[Mapping[str, Any]] = None,
        stream: Optional[cp.cuda.Stream] = None,
    ) -> None:
        ...
        
    def encode(self, buf):
        return self.encode_batch([buf])[0]
    
    def encode_batch(self, bufs: List[Any]) -> List[Any]:
        ...
        
    def decode(self, buf, out=None):
        return self.decode_batch([buf], [out])[0]

    def decode_batch(
        self, bufs: List[Any], out: Optional[List[Any]] = None
    ) -> List[Any]:
        ...

Here, the regular encode and decode methods call into the corresponding _batch methods with a single buffer. One potential way to standardize this is to implement these batched methods in the numcodecs base class in the opposite way:

class Codec:
    """Codec abstract base class."""

    @abstractmethod
    def encode(self, buf):

   @abstractmethod
    def decode(self, buf, out=None):

    def encode_batch(self, bufs: List[Any]) -> List[Any]:
        return [self.encode(b) for b in bufs]

    def decode_batch(
        self, bufs: List[Any], out: Optional[List[Any]] = None
    ) -> List[Any]:
        if out is None:
            out = [None for i in range(len(bufs))]
        return [self.decode(b, o) for b, o in zip(bufs, out)]

Then zarr-python can always call into the batched API and have that default to the serialized loop or if a codec supports it, can leverage parallel (de)compresssion.

@JackKelly
Copy link
Contributor

Just to connect up two related conversations... Over in the "Zarr Benchmarking & Performance" group, we've briefly discussed (amongst other things) batched async IO and decompression (possibly implemented in Rust). Of course, we should probably keep any focused discussion about batched decompression to this current issue. But I just wanted to flag up the connection. And it's quite likely that we'll discuss related issues in our twice-monthly calls (starting in Sept), so I'd encourage folks who are interested in batched (de)compression to register their availability for a regular meeting.

@jhamman
Copy link
Member

jhamman commented Oct 18, 2024

@akshaysubr - curious if you feel that the current state of 3.0 concurrent compression is sufficient to close this. Some early performance numbers sure seem to indicate we've really moved the ball on this one.

@akshaysubr
Copy link
Contributor Author

Yeah, I think we can close this issue as done. The batched Codec APIs in the way that they are exposed now are good and your performance numbers reflect that as well. If any other concerns come up, they can be raised as separate issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants