Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically determine optimal batch size #2178

Merged
merged 5 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions src/cmd/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ apply options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows in one batch.
Automatically determined for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch. Set to 1 to force batch optimization
even for files with less than 50000 rows.
[default: 50000]

Common options:
Expand Down Expand Up @@ -566,32 +568,22 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffers
let batchsize: usize = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(std::mem::take(&mut batch_record));
} else {
// nothing else to add to batch
break;
}
},
Ok(true) => batch.push(std::mem::take(&mut batch_record)),
Ok(false) => break, // nothing else to add to batch
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
Expand Down
26 changes: 9 additions & 17 deletions src/cmd/datefmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ datefmt options:
-j, --jobs <arg> The number of jobs to run in parallel.
When not set, the number of jobs is set to the number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory, before running in parallel.
Set to 0 to load all rows in one batch.
Automatically determined for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch. Set to 1 to force batch optimization
even for files with less than 50000 rows.
[default: 50000]

Common options:
Expand Down Expand Up @@ -252,12 +254,11 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffers
let batchsize: usize = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let batchsize = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

Expand Down Expand Up @@ -316,23 +317,14 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let is_output_utc = output_tz == chrono_tz::UTC;

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(std::mem::take(&mut batch_record));
} else {
// nothing else to add to batch
break;
}
},
Ok(true) => batch.push(std::mem::take(&mut batch_record)),
Ok(false) => break, // nothing else to add to batch
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
Expand Down
30 changes: 12 additions & 18 deletions src/cmd/tojsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ Tojsonl options:
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. Set to 0 to load all
rows in one batch. [default: 50000]
before running in parallel. Automatically determined
for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch.
Set to 1 to force batch optimization even for files with
less than 50000 rows.
[default: 50000]

Common options:
-h, --help Display this message
Expand Down Expand Up @@ -255,32 +259,22 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
#[allow(unused_assignments)]
let mut batch_record = csv::StringRecord::new();

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffers
let batchsize: usize = if args.flag_batch == 0 {
record_count as usize
} else {
args.flag_batch
};
let batchsize = util::optimal_batch_size(&conf, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batchsize);
let mut batch_results = Vec::with_capacity(batchsize);

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// main loop to read CSV and construct batches for parallel processing.
// each batch is processed via Rayon parallel iterator.
// loop exits when batch is empty.
'batch_loop: loop {
for _ in 0..batchsize {
match rdr.read_record(&mut batch_record) {
Ok(has_data) => {
if has_data {
batch.push(batch_record.clone());
} else {
// nothing else to add to batch
break;
}
},
Ok(true) => batch.push(std::mem::take(&mut batch_record)),
Ok(false) => break, // nothing else to add to batch
Err(e) => {
return fail_clierror!("Error reading file: {e}");
},
Expand Down
20 changes: 10 additions & 10 deletions src/cmd/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ Validate options:
When not set, the number of jobs is set to the
number of CPUs detected.
-b, --batch <size> The number of rows per batch to load into memory,
before running in parallel. Set to 0 to load all rows in one batch.
[default: 50000]
before running in parallel. Automatically determined
for CSV files with more than 50000 rows.
Set to 0 to load all rows in one batch.
Set to 1 to force batch optimization even for files with
less than 50000 rows. [default: 50000]
--timeout <seconds> Timeout for downloading json-schemas on URLs and for
'dynamicEnum' lookups on URLs. [default: 30]

Expand Down Expand Up @@ -647,21 +650,18 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

// amortize memory allocation by reusing record
let mut record = csv::ByteRecord::with_capacity(500, header_len);

// set RAYON_NUM_THREADS
let num_jobs = util::njobs(args.flag_jobs);

// reuse batch buffer
let batch_size = if args.flag_batch == 0 {
util::count_rows(&rconfig)? as usize
} else {
args.flag_batch
};
let batch_size = util::optimal_batch_size(&rconfig, args.flag_batch, num_jobs);
let mut batch = Vec::with_capacity(batch_size);
let mut validation_results = Vec::with_capacity(batch_size);
let mut valid_flags: Vec<bool> = Vec::with_capacity(batch_size);
let mut validation_error_messages: Vec<String> = Vec::with_capacity(50);
let flag_trim = args.flag_trim;

// set RAYON_NUM_THREADS
util::njobs(args.flag_jobs);

// amortize buffer allocation
let mut buffer = itoa::Buffer::new();

Expand Down
27 changes: 27 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ macro_rules! regex_oncelock {
// leave at least 20% of the available memory free
const DEFAULT_FREEMEMORY_HEADROOM_PCT: u8 = 20;

const DEFAULT_BATCH_SIZE: usize = 50_000;

static ROW_COUNT: OnceLock<Option<u64>> = OnceLock::new();

pub type ByteString = Vec<u8>;
Expand Down Expand Up @@ -2207,6 +2209,31 @@ pub fn csv_to_jsonl(
Ok(writer.flush()?)
}

/// get the optimal batch size
/// if batch_size is 0, return the number of rows in the CSV, effectively disabling batching
/// if batch_size is 1, force batch_size to be set to "optimal_size", even though
/// its not recommended (number of rows is too small for parallel processing)
/// if batch_size is equal to DEFAULT_BATCH_SIZE, return the optimal_size
/// failing everything above, return the requested batch_size
#[inline]
pub fn optimal_batch_size(rconfig: &Config, batch_size: usize, num_jobs: usize) -> usize {
if batch_size < DEFAULT_BATCH_SIZE {
return DEFAULT_BATCH_SIZE;
}

let num_rows = count_rows(rconfig).unwrap_or(DEFAULT_BATCH_SIZE as u64) as usize;
if batch_size == 0 {
num_rows
} else if (num_rows > DEFAULT_BATCH_SIZE && (batch_size == DEFAULT_BATCH_SIZE))
|| batch_size == 1
{
let optimal_size = (num_rows / num_jobs) + 1;
optimal_size
} else {
batch_size
}
}

// comment out for now as this is still WIP
// pub fn create_json_record(
// no_headers: bool,
Expand Down
Loading