Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17299: [C++][Python] Expose the Scanner kDefaultBatchReadahead and kDefaultFragmentReadahead parameters #13799

Merged
merged 16 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -826,15 +826,6 @@ Status ScannerBuilder::UseThreads(bool use_threads) {
return Status::OK();
}

Status ScannerBuilder::FragmentReadahead(int fragment_readahead) {
if (fragment_readahead <= 0) {
return Status::Invalid("FragmentReadahead must be greater than 0, got ",
fragment_readahead);
}
scan_options_->fragment_readahead = fragment_readahead;
return Status::OK();
}

Status ScannerBuilder::BatchSize(int64_t batch_size) {
if (batch_size <= 0) {
return Status::Invalid("BatchSize must be greater than 0, got ", batch_size);
Expand All @@ -843,6 +834,24 @@ Status ScannerBuilder::BatchSize(int64_t batch_size) {
return Status::OK();
}

Status ScannerBuilder::BatchReadahead(int32_t batch_readahead) {
if (batch_readahead < 0) {
westonpace marked this conversation as resolved.
Show resolved Hide resolved
return Status::Invalid("BatchReadahead must be greater than or equal 0, got ",
batch_readahead);
}
scan_options_->batch_readahead = batch_readahead;
return Status::OK();
}

Status ScannerBuilder::FragmentReadahead(int32_t fragment_readahead) {
if (fragment_readahead < 0) {
return Status::Invalid("FragmentReadahead must be greater than or equal 0, got ",
fragment_readahead);
}
scan_options_->fragment_readahead = fragment_readahead;
return Status::OK();
}

Status ScannerBuilder::Pool(MemoryPool* pool) {
scan_options_->pool = pool;
return Status::OK();
Expand Down
20 changes: 17 additions & 3 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,6 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// ThreadPool found in ScanOptions;
Status UseThreads(bool use_threads = true);

/// \brief Limit how many fragments the scanner will read at once
Status FragmentReadahead(int fragment_readahead);

/// \brief Set the maximum number of rows per RecordBatch.
///
/// \param[in] batch_size the maximum number of rows.
Expand All @@ -384,6 +381,23 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// This option provides a control limiting the memory owned by any RecordBatch.
Status BatchSize(int64_t batch_size);

/// \brief Set the number of batches to read ahead within a file.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
///
/// \param[in] batch_readahead How many batches to read ahead within a file,
/// might not work for all formats.
/// \returns An error if this number is less than 0.
///
/// This option provides a control on RAM vs I/O tradeoff.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
Status BatchReadahead(int32_t batch_readahead);

/// \brief Set the number of fragments to read ahead
///
/// \param[in] fragment_readahead How many fragments to read ahead
/// \returns An error if this number is less than 0.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
///
/// This option provides a control on RAM vs IO tradeoff.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
Status FragmentReadahead(int32_t fragment_readahead);

/// \brief Set the pool from which materialized and scanned arrays will be allocated.
Status Pool(MemoryPool* pool);

Expand Down
37 changes: 33 additions & 4 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2168,11 +2168,14 @@ cdef class TaggedRecordBatchIterator(_Weakrefable):


_DEFAULT_BATCH_SIZE = 2**17

_DEFAULT_BATCH_READAHEAD = 16
_DEFAULT_FRAGMENT_READAHEAD = 4

cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr,
object columns=None, Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
int batch_readahead=_DEFAULT_BATCH_READAHEAD,
int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD,
bint use_threads=True, MemoryPool memory_pool=None,
FragmentScanOptions fragment_scan_options=None)\
except *:
Expand Down Expand Up @@ -2207,6 +2210,8 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr,
)

check_status(builder.BatchSize(batch_size))
check_status(builder.BatchReadahead(batch_readahead))
check_status(builder.FragmentReadahead(fragment_readahead))
check_status(builder.UseThreads(use_threads))
if memory_pool:
check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool)))
Expand Down Expand Up @@ -2254,6 +2259,12 @@ cdef class Scanner(_Weakrefable):
The maximum row count for scanned record batches. If scanned
record batches are overflowing memory then this method can be
called to reduce their size.
batch_readahead : int, default 16
The number of batches to read ahead in a file. Increasing this number
will increase RAM usage but also improve IO utilization.
fragment_readahead : int, default 4
The number of files to read ahead. Increasing this number will increase
RAM usage but also improve IO utilization.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by
the number of available CPU cores.
Expand Down Expand Up @@ -2291,6 +2302,8 @@ cdef class Scanner(_Weakrefable):
MemoryPool memory_pool=None,
object columns=None, Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
int batch_readahead=_DEFAULT_BATCH_READAHEAD,
int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD,
FragmentScanOptions fragment_scan_options=None):
"""
Create Scanner from Dataset,
Expand Down Expand Up @@ -2328,6 +2341,13 @@ cdef class Scanner(_Weakrefable):
The maximum row count for scanned record batches. If scanned
record batches are overflowing memory then this method can be
called to reduce their size.
batch_readahead : int, default 16
The number of batches to read ahead in a file. This might not work
for all file formats like CSV. Increasing this number will increase
RAM usage but also improve IO utilization.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
fragment_readahead : int, default 4
The number of files to read ahead. Increasing this number will increase
RAM usage but also improve IO utilization.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by
the number of available CPU cores.
Expand All @@ -2354,7 +2374,8 @@ cdef class Scanner(_Weakrefable):

builder = make_shared[CScannerBuilder](dataset.unwrap(), options)
_populate_builder(builder, columns=columns, filter=filter,
batch_size=batch_size, use_threads=use_threads,
batch_size=batch_size, batch_readahead=batch_readahead,
fragment_readahead=fragment_readahead, use_threads=use_threads,
memory_pool=memory_pool,
fragment_scan_options=fragment_scan_options)

Expand All @@ -2367,6 +2388,7 @@ cdef class Scanner(_Weakrefable):
MemoryPool memory_pool=None,
object columns=None, Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
int batch_readahead=_DEFAULT_BATCH_READAHEAD,
FragmentScanOptions fragment_scan_options=None):
"""
Create Scanner from Fragment,
Expand Down Expand Up @@ -2406,6 +2428,10 @@ cdef class Scanner(_Weakrefable):
The maximum row count for scanned record batches. If scanned
record batches are overflowing memory then this method can be
called to reduce their size.
batch_readahead : int, default 16
The number of batches to read ahead in a file. This might not work
for all file formats like CSV. Increasing this number will increase
RAM usage but also improve IO utilization.
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by
the number of available CPU cores.
Expand Down Expand Up @@ -2435,7 +2461,9 @@ cdef class Scanner(_Weakrefable):
builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema),
fragment.unwrap(), options)
_populate_builder(builder, columns=columns, filter=filter,
batch_size=batch_size, use_threads=use_threads,
batch_size=batch_size, batch_readahead=batch_readahead,
fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD,

I don't think we need to specify this kwarg if we're just going to specify the default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Cython quirk. You have to specify all the arguments.

use_threads=use_threads,
memory_pool=memory_pool,
fragment_scan_options=fragment_scan_options)

Expand Down Expand Up @@ -2508,7 +2536,8 @@ cdef class Scanner(_Weakrefable):
FutureWarning)

_populate_builder(builder, columns=columns, filter=filter,
batch_size=batch_size, use_threads=use_threads,
batch_size=batch_size, batch_readahead=_DEFAULT_BATCH_READAHEAD,
fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, use_threads=use_threads,
marsupialtail marked this conversation as resolved.
Show resolved Hide resolved
memory_pool=memory_pool,
fragment_scan_options=fragment_scan_options)
scanner = GetResultValue(builder.get().Finish())
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CStatus UseThreads(c_bool use_threads)
CStatus Pool(CMemoryPool* pool)
CStatus BatchSize(int64_t batch_size)
CStatus BatchReadahead(int32_t batch_readahead)
CStatus FragmentReadahead(int32_t fragment_readahead)
CStatus FragmentScanOptions(
shared_ptr[CFragmentScanOptions] fragment_scan_options)
CResult[shared_ptr[CScanner]] Finish()
Expand Down