Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Spill-To-Disk Object Storage Download #2205

Closed
tustvold opened this issue Apr 12, 2022 · 16 comments
Closed

RFC: Spill-To-Disk Object Storage Download #2205

tustvold opened this issue Apr 12, 2022 · 16 comments
Labels
enhancement New feature or request

Comments

@tustvold
Copy link
Contributor

tustvold commented Apr 12, 2022

Creating as high-level ticket to hopefully get consensus on the approach, before potentially creating lower level tickets

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently ObjectStore::file_reader returns an Arc<dyn ObjectReader>, this in turn has a method ObjectReader::sync_chunk_reader which takes a byte range.

In the case of parquet, a ChunkObjectReader wraps this ObjectReader and adapts it to the parquet::ChunkReader trait. The result is that the parquet reader calls ObjectReader::sync_chunk_reader for the byte range of each column chunk, of which there will be one per-column per-RowGroup, which in turn performs a range request to object storage to fetch the bytes.

As pointed out by @mateuszkj on datafusion-contrib/datafusion-objectstore-s3#53 this unfortunately results in a large number of small requests to S3 (there are also metadata requests which I will cover in a separate ticket concerning catalogs).

In the case of CSV, JSON, etc... ObjectReader::sync_reader is used which is equivalent to calling sync_chunk_reader with the length of the file, and will therefore buffer the entire file in memory.

This approach therefore has two aspects that could be improved:

  • Potentially large numbers of very small requests to object storage adding latency and cost
  • Potentially large amounts of data buffered in memory

Describe the solution you'd like

The simplest solution is to download the entire file to temporary local storage. This is what IOx currently does and it works well.

The next obvious improvement would then be to use the MemoryManager and DiskManager functionality added by @yjshen in #1526 to buffer in memory initially and only spill to disk under memory pressure.

I suspect for many use-cases this will perform very well, the key observations being:

  • Data stored in non-archival object store tiers is billed on request count, and not the amount of data transferred
  • Data transfer from object storage within the same region is extremely fast (10+ Gbps)

A final extension might be to add functionality to fetch smaller byte ranges based on projection and predicate pushdown, I started experimenting with an API of what this might look like here, but I don't have a good handle on how to balance the trade-offs of making too many requests vs requesting data we don't need, and I'm personally inclined to punt on this at least initially...

I'm not very familiar with how spark, etc... solve this problem, this is just based on my intuition, and so perhaps @sunchao or someone with more familiarity with that ecosystem might be able to provide some insight here.

Describe alternatives you've considered

One option we are likely to implement for IOx is having a shared, instance-local, read-through, disk-based Object Storage cache. The idea being to use the ephemeral NVMe disk that is available on cloud provider VMs as a shared cache for one or more query engines running on that instance. This effectively works around this problem by making all IO done by the query engine to very fast local disk, with a separate process handling interaction with object storage as required. It will also accelerate repeated queries to the same "hot" dataset. I would be very happy to write up some tickets if someone wanted to take this on.

This blog post written by @jorgecarleitao proposes streaming files block-wise (thank you @xudong963 for the link). This is close to what the implementation currently does, however, it comes with the drawbacks listed above. FWIW I have also not found this approach to perform especially well on local files either, see here, but I could have been doing something wrong.

Additional context

FYI @alamb @houqp @matthewmturner

@tustvold tustvold added the enhancement New feature or request label Apr 12, 2022
@Cheappie
Copy link
Contributor

  • Data transfer from object storage within the same region is extremely fast (10+ Gbps)

From what I know such high transfer speed you can only achieve between EC2 instances in cluster placement group. For example If I remember correctly r4.2xlarge achieves ~160 MB/s transfer between ec2 and s3.

@tustvold
Copy link
Contributor Author

I thought placement groups were a mechanism to improve EC2-EC2 traffic, and not EC2-S3? I'll do some digging and report back. I had always assumed EC2-S3 was so much faster than EC2-EC2 because they had dedicated networking for it, but perhaps I was mistaken...

@tustvold
Copy link
Contributor Author

This AWS blog post from 2018 would suggest up to 25Gpbs EC2-S3 is possible and also highlights placement groups as a way to accelerate EC2-EC2. This support question would suggest the EC2-S3 limit has since been raised to 100Gbps.

I also found this benchmark from 2019, which shows speeds in the 1000s of MB/s, including 1,135 MB/s for the r4.2xlarge. I have not been able to find anyone complaining about the network speeds being below what is advertised.

FWIW if using VPC networking, you need to make sure you have configured a VPC Gateway and are using a region-specific endpoint for S3. Otherwise your traffic will transit an Internet Gateway or NAT gateway which will make things a lot slower (and cost a LOT of money).

@Cheappie
Copy link
Contributor

This AWS blog post from 2018 would suggest up to 25Gpbs EC2-S3 is possible and also highlights placement groups as a way to accelerate EC2-EC2. This support question would suggest the EC2-S3 limit has since been raised to 100Gbps.

I also found this benchmark from 2019, which shows speeds in the 1000s of MB/s, including 1,135 MB/s for the r4.2xlarge. I have not been able to find anyone complaining about the network speeds being below what is advertised.

FWIW if using VPC networking, you need to make sure you have configured a VPC Gateway and are using a region-specific endpoint for S3. Otherwise your traffic will transit an Internet Gateway or NAT gateway which will make things a lot slower (and cost a LOT of money).

You might be right, I don't recall testing against region specific endpoint. That's really interesting, I will have to check that.

@alamb
Copy link
Contributor

alamb commented Apr 13, 2022

I see two major, and somewhat orthogonal usecases:

Usecase: Multiple reads of unpredictable column / row group subsets of the same file (e.g. IOx)
Optimal: Read data to local file

Goal: Single read of a subset of column/row groups (e.g. Cloud Fuse, other "analytics on S3 parquet files")
Optimal: Read subset of the data that is needed into memory, discard after decode

I have been hoping our ObjectStore interface would allow for both usecases.

In terms of the "many small requests to S3" problem, I was imagining that the S3 ObjectStore implementation would implement "pre-fetching" internally (the same way local filesystems do) to coalesce multiple small requests into fewer larger ones. This strategy is particularly effective if we know what parts of the file are likely to be needed.

Conveniently, the parquet format is quite amenable to this (as once the reader has figured out it wants to scan a row group, it also knows what file data (offsets) it needs).

@alamb
Copy link
Contributor

alamb commented Apr 13, 2022

Thus, if I were doing this I would probably make the following three things

  1. Add a "prefetch_hint" certain offsets to the ObjectStore API and make the parquet reader call it
  2. Implement a "buffered" ObjectStore interface that wrapped another ObjectStore that prefetches and buffers data memory buffer
  3. Implement a "cached" ObjectStore interface that also wraps another ObjectStore that simply downloads any request to a local disk cache

With those components I think most usecases could be addressed and if someone needed custom caching logic they would likely get a good head start using the "buffered" or "cached" interfaces

@tustvold
Copy link
Contributor Author

tustvold commented Apr 13, 2022

I think we all agree on where we would like to end up, however, I worry we are trying to run before we can walk here. I would much prefer an approach that does the simplest thing possible, namely downloads the entire file, and then iteratively add functionality, such as fetching to memory, selective fetching, etc... Currently we have an approach that isn't really very effective at either...

the same way local filesystems do

I'm not sure this is a fair comparison, object storage has vastly different performance and billing characteristics from a local filesystem?

Add a "prefetch_hint" certain offsets to the ObjectStore API and make the parquet reader call it

Why would you implement this in the ObjectStore API, and not some FileScan component generic over object stores. The caching, spilling, logic, etc... is not going to vary based on object store provider? An ObjectStore API that supports fetch requests with an optional byte range should have us covered?

@alamb
Copy link
Contributor

alamb commented Apr 13, 2022

Why would you implement this in the ObjectStore API, and not some FileScan component generic over object stores. The caching, spilling, logic, etc... is not going to vary based on object store provider? An ObjectStore API that supports fetch requests with an optional byte range should have us covered?

I was thinking that keeping things behind an ObjectStore API makes sense because:

  1. the economies and performance of S3, glacier, HDFS, local Minio could be quite different so the amount of consolidation, number of requests, aggressiveness of caching, might vary by object store implementation (not sure)
  2. Some caching strategies / implementations (e.g. redis, for example) might not be appropriate to include in the core datafusion

So in other words, binding details of caching / resource usage to DataFusion seemed to be unecessary

@tustvold
Copy link
Contributor Author

I think this makes sense, the situation I'm trying to avoid is:

  • If you have the file on local disk we don't want to have to buffer it in memory before we can read it
  • If you have the file in memory, we don't want to copy the byte ranges, and just want to slice a Bytes object

Currently the trait returns Box<dyn Read> which is likely sub-optimal for both.

If we can find some way to handle this, that sounds good to me 😀

@sunchao
Copy link
Member

sunchao commented Apr 14, 2022

FWIW within each Spark task, it currently process each row group in a sequential manner, and for each of these it'll read all the projected column chunks (with filtered pages after column index), buffer them in memory and then start decompressing + decoding. For interacting with S3/HDFS/etc, it relies on the Hadoop's FileSystem API. @steveloughran is the expert here on the S3 client implementation.

@steveloughran
Copy link

choosing when/how to scan and prefetch in object stores is a real tricky business

abfs and gcs connectors do forward prefetch in block sizes you can config in hadoop site/job settings, cache into memory. The more prefetching you do, the more likely a large process will run out of memory.

s3a doesn't and we've been getting complaints about lack of buffering in the client. it does have different seek policies, look at fs.s3a.experimental.fadvise and fs.s3a.readahead.range

You can set seek policy cluster-wise, or, if you use the openFile() api, when opening specific files.

we have two big bits of work on going there how to help mitigate things, both in feature branches right now

  • HADOOP-18103 vectored IO API. It will be available for all FSDataInputStream; object stores can improve with range coalescing and fetching of different ranges in parallel (s3a will be first for this).
  • HADOOP-18028. High performance S3A input stream with prefetching & caching to local disk. feature branch works, but for broader adoption we again need to deal with memory/buffer use and some other issues.
    Really good to have you involved in reviewing/testing the vectored IO API (yes, we want a native binding too), the prefetching work, and indeed if we can get good traces of how your library reads files.

Note also s3a and abfs connectors connect/report stats through the IOStatistics interface. Even if you build against Hadoop versions which don't have that,

  1. if you call toString() on the streams you get a good summary of what IO took place in that stream only. log this, at debug
  2. on hadoop 3.3.2, set "fs.iostatistics.logging.level"; to info and you get full fs stats dump when the fs instance is closed.

@steveloughran
Copy link

if the api you came up with mapped well to that vectored api which is not yet too late to freeze, then it'd be really good, even if you don't yet compile against releases with that api.

see https://github.com/apache/hadoop/blob/feature-vectored-io/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md#default-void-readvectoredlist-extends-filerange-ranges-intfunctionbytebuffer-allocate

readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate)

where each file range returns a completable future to a byte buffer allocated with the allocator function you supplied.

@alamb
Copy link
Contributor

alamb commented Apr 15, 2022

Thank you @steveloughran

It seems like something any of those pre-fetching systems requires is something to tell what to prefetch (or what ranges are needed in scatter / gather or vectored IO.

Maybe that is a good place to start @tustvold

@mukund-thakur
Copy link

Do try out the vectored api from the feature brach. Any feedback or improvements is highly appreciated. Thanks.
Here is the uber jira https://issues.apache.org/jira/browse/HADOOP-18103

@tustvold
Copy link
Contributor Author

It seems like something any of those pre-fetching systems requires is something to tell what to prefetch (or what ranges are needed in scatter / gather or vectored IO.

Proposal in apache/arrow-rs#1605, thank you all for your very helpful feedback 👍

@tustvold
Copy link
Contributor Author

DataFusion is now using the async parquet interface, which automatically handles buffering, and so this can be closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants