Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17299: [C++][Python] Expose the Scanner kDefaultBatchReadahead and kDefaultFragmentReadahead parameters #13799

Merged
merged 16 commits into from
Sep 1, 2022

Conversation

marsupialtail
Copy link
Contributor

This exposes the Fragment Readahead and Batch Readahead flags in the C++ Scanner to the user in Python.

This can be used to finetune RAM usage and IO utilization during downloading large files from S3 or other network sources. I believe the default settings are overly conservative for small RAM settings and I observe less than 20% IO utilization on some instances on AWS.

The Python API is exposed only to methods where these flags make sense. Scanning from a RecordBatchIterator won't need those these flags nor will those flags make sense. Only the latter flag makes sense for making a scanner from a fragment.

To test this, set up an i3.2xlarge instance on AWS:

import pyarrow
import pyarrow.dataset as ds
import pyarrow.csv as csv
import time
pyarrow.set_cpu_count(8)
pyarrow.set_io_thread_count(16)
lineitem_scheme = ["l_orderkey","l_partkey","l_suppkey","l_linenumber","l_quantity","l_extendedprice",
"l_discount","l_tax","l_returnflag","l_linestatus","l_shipdate","l_commitdate","l_receiptdate","l_shipinstruct",
"l_shipmode","l_comment", "null"]
csv_format = ds.CsvFileFormat(read_options=csv.ReadOptions(column_names=lineitem_scheme, block_size= 32 * 1024 * 1024), parse_options=csv.ParseOptions(delimiter="|"))
dataset = ds.dataset("s3://TPC",format=csv_format)
s = dataset.to_batches(batch_size=1000000000)
while count < 100:
    z = next(s)

For our purposes let's just make the TPC dataset consist of hundreds of Parquet files each with one row group. (something that Spark would generate). This script would get somewhere around 1Gbps. If you now do

s = dataset.to_batches(batch_size=1000000000, fragment_readahead=16)

You can get to 2.5Gbps which is the advertised steady rate cap for this instance type.

@github-actions
Copy link

github-actions bot commented Aug 4, 2022

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW

Opening JIRAs ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename pull request title in the following format?

ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

See also:

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this. I took a quick pass at review.

cpp/src/arrow/dataset/scanner.cc Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
@@ -2435,7 +2462,9 @@ cdef class Scanner(_Weakrefable):
builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema),
fragment.unwrap(), options)
_populate_builder(builder, columns=columns, filter=filter,
batch_size=batch_size, use_threads=use_threads,
batch_size=batch_size, batch_readahead=batch_readahead,
fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD,

I don't think we need to specify this kwarg if we're just going to specify the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Cython quirk. You have to specify all the arguments.

python/pyarrow/_dataset.pyx Show resolved Hide resolved
@marsupialtail marsupialtail changed the title Arrow 17299: [C++][Python] Expose the Scanner kDefaultBatchReadahead and kDefaultFragmentReadahead parameters Arrow-17299: [C++][Python] Expose the Scanner kDefaultBatchReadahead and kDefaultFragmentReadahead parameters Aug 4, 2022
@pitrou
Copy link
Member

pitrou commented Aug 9, 2022

@westonpace @bkietz Why exactly does ScannerBuilder allow setting the same things that can be set in ScanOptions?

cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
@bkietz bkietz changed the title Arrow-17299: [C++][Python] Expose the Scanner kDefaultBatchReadahead and kDefaultFragmentReadahead parameters ARROW-17299: [C++][Python] Expose the Scanner kDefaultBatchReadahead and kDefaultFragmentReadahead parameters Aug 9, 2022
@github-actions
Copy link

github-actions bot commented Aug 9, 2022

@bkietz
Copy link
Member

bkietz commented Aug 9, 2022

@pitrou @westonpace IIUC, ScannerBuilder is at this point mostly a wrapper around a scan options. Once upon a time it was needed to mediate the difference between single threaded and async scanners and to guard construction of a dataset wrapping a record batch reader, but this becomes less and less necessary as more datasets functionality becomes subsumed by the compute engine. (for example, I'd say there's no longer a motivation to support constructing datasets from record batch readers since the compute engine can use them as sources directly.) In short, I think what you're observing is ScannerBuilder on a gentle walk toward deprecation

@westonpace
Copy link
Member

Yes, scanner builder is on its way out, I hope, as part of #13782 (well, probably a follow-up). At the moment it still serves a slight purpose in that the projection option is a little hard to specify and it is something of a thorn when it comes to augmented fields.

I also agree with your other point. We spent considerable effort at one point making various things look like a dataset because datasets were the primary interface to the compute engine (e.g. filtering & projection). The record batch reader example is a good example. I'd even go so far as to say the InMemoryDataset is probably superfluous and a better option in the future would be a "table_source" node. The scanner should be reserved for the case where you have multiple sources of data, with the same (or devolved versions of the same) schema.

All that being said, I don't think readahead is going away. However, in the near future (again, #13782) I was pondering if we should reframe readahead as "roughly how many bytes of data should the scanner attempt to read ahead" instead of "batch readahead and fragment readahead".

@marsupialtail
Copy link
Contributor Author

marsupialtail commented Aug 12, 2022

I believe this is ready to be merged. @pitrou @westonpace

@westonpace westonpace self-requested a review August 15, 2022 18:31
@marsupialtail
Copy link
Contributor Author

There is a potential problem with this. You can't increase the fragment readahead by too much, or else the first batch will be significantly delayed. Not sure how much a problem this is though.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few grammatical suggestions but otherwise I think this is a good addition. I think this may change to bytes_readahead / fragment_readahead before the release but it will be nice to have this in place already.

cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
python/pyarrow/_dataset.pyx Outdated Show resolved Hide resolved
@marsupialtail
Copy link
Contributor Author

Don't think the failed checks have anything to do with me.

@pitrou
Copy link
Member

pitrou commented Aug 24, 2022

Don't think the failed checks have anything to do with me.

Indeed, they don't.

@pitrou
Copy link
Member

pitrou commented Aug 24, 2022

@marsupialtail Would you like to address @westonpace 's suggestions? Then I think we're good to go.

@marsupialtail
Copy link
Contributor Author

OK. I commited all the changes. @pitrou @westonpace

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, just two suggestions below.

python/pyarrow/_dataset.pyx Show resolved Hide resolved
cpp/src/arrow/dataset/scanner.h Outdated Show resolved Hide resolved
@marsupialtail
Copy link
Contributor Author

done

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you @marsupialtail !

@pitrou pitrou merged commit ec7e250 into apache:master Sep 1, 2022
@ursabot
Copy link

ursabot commented Sep 1, 2022

Benchmark runs are scheduled for baseline = 46f38dc and contender = ec7e250. ec7e250 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed] test-mac-arm
[Failed ⬇️0.27% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.75% ⬆️0.11%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] ec7e250c ec2-t3-xlarge-us-east-2
[Failed] ec7e250c test-mac-arm
[Failed] ec7e250c ursa-i9-9960x
[Finished] ec7e250c ursa-thinkcentre-m75q
[Finished] 46f38dca ec2-t3-xlarge-us-east-2
[Failed] 46f38dca test-mac-arm
[Failed] 46f38dca ursa-i9-9960x
[Finished] 46f38dca ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

zagto pushed a commit to zagto/arrow that referenced this pull request Oct 7, 2022
…and kDefaultFragmentReadahead parameters (apache#13799)

This exposes the Fragment Readahead and Batch Readahead flags in the C++ Scanner to the user in Python.

This can be used to finetune RAM usage and IO utilization during downloading large files from S3 or other network sources. I believe the default settings are overly conservative for small RAM settings and I observe less than 20% IO utilization on some instances on AWS. 

The Python API is exposed only to methods where these flags make sense. Scanning from a RecordBatchIterator won't need those these flags nor will those flags make sense. Only the latter flag makes sense for making a scanner from a fragment. 

To test this, set up an i3.2xlarge instance on AWS: 

```
import pyarrow
import pyarrow.dataset as ds
import pyarrow.csv as csv
import time
pyarrow.set_cpu_count(8)
pyarrow.set_io_thread_count(16)
lineitem_scheme = ["l_orderkey","l_partkey","l_suppkey","l_linenumber","l_quantity","l_extendedprice",
"l_discount","l_tax","l_returnflag","l_linestatus","l_shipdate","l_commitdate","l_receiptdate","l_shipinstruct",
"l_shipmode","l_comment", "null"]
csv_format = ds.CsvFileFormat(read_options=csv.ReadOptions(column_names=lineitem_scheme, block_size= 32 * 1024 * 1024), parse_options=csv.ParseOptions(delimiter="|"))
dataset = ds.dataset("s3://TPC",format=csv_format)
s = dataset.to_batches(batch_size=1000000000)
while count < 100:
    z = next(s)
```
For our purposes let's just make the TPC dataset consist of hundreds of Parquet files each with one row group. (something that Spark would generate). This script would get somewhere around 1Gbps. If you now do
```
s = dataset.to_batches(batch_size=1000000000, fragment_readahead=16)
```
You can get to 2.5Gbps which is the advertised steady rate cap for this instance type.


Authored-by: Ziheng Wang <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Oct 17, 2022
…and kDefaultFragmentReadahead parameters (apache#13799)

This exposes the Fragment Readahead and Batch Readahead flags in the C++ Scanner to the user in Python.

This can be used to finetune RAM usage and IO utilization during downloading large files from S3 or other network sources. I believe the default settings are overly conservative for small RAM settings and I observe less than 20% IO utilization on some instances on AWS. 

The Python API is exposed only to methods where these flags make sense. Scanning from a RecordBatchIterator won't need those these flags nor will those flags make sense. Only the latter flag makes sense for making a scanner from a fragment. 

To test this, set up an i3.2xlarge instance on AWS: 

```
import pyarrow
import pyarrow.dataset as ds
import pyarrow.csv as csv
import time
pyarrow.set_cpu_count(8)
pyarrow.set_io_thread_count(16)
lineitem_scheme = ["l_orderkey","l_partkey","l_suppkey","l_linenumber","l_quantity","l_extendedprice",
"l_discount","l_tax","l_returnflag","l_linestatus","l_shipdate","l_commitdate","l_receiptdate","l_shipinstruct",
"l_shipmode","l_comment", "null"]
csv_format = ds.CsvFileFormat(read_options=csv.ReadOptions(column_names=lineitem_scheme, block_size= 32 * 1024 * 1024), parse_options=csv.ParseOptions(delimiter="|"))
dataset = ds.dataset("s3://TPC",format=csv_format)
s = dataset.to_batches(batch_size=1000000000)
while count < 100:
    z = next(s)
```
For our purposes let's just make the TPC dataset consist of hundreds of Parquet files each with one row group. (something that Spark would generate). This script would get somewhere around 1Gbps. If you now do
```
s = dataset.to_batches(batch_size=1000000000, fragment_readahead=16)
```
You can get to 2.5Gbps which is the advertised steady rate cap for this instance type.


Authored-by: Ziheng Wang <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants