Skip to content

Commit

Permalink
Merge pull request #6957 from karlmcdowall/sort_merge_chunking_rework
Browse files Browse the repository at this point in the history
sort: Rework merge batching logic
  • Loading branch information
sylvestre authored Dec 21, 2024
2 parents bb2fb66 + 6bdcad3 commit 913d5d4
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 42 deletions.
4 changes: 2 additions & 2 deletions src/uu/sort/src/ext_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ fn reader_writer<
)?;
match read_result {
ReadResult::WroteChunksToFile { tmp_files } => {
let merger = merge::merge_with_file_limit::<_, _, Tmp>(
merge::merge_with_file_limit::<_, _, Tmp>(
tmp_files.into_iter().map(|c| c.reopen()),
settings,
output,
tmp_dir,
)?;
merger.write_all(settings, output)?;
}
ReadResult::SortedSingleChunk(chunk) => {
if settings.unique {
Expand Down
77 changes: 40 additions & 37 deletions src/uu/sort/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::{
};

use compare::Compare;
use itertools::Itertools;
use uucore::error::UResult;

use crate::{
Expand Down Expand Up @@ -67,58 +66,63 @@ fn replace_output_file_in_input_files(
///
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
pub fn merge<'a>(
pub fn merge(
files: &mut [OsString],
settings: &'a GlobalSettings,
output: Option<&str>,
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> {
replace_output_file_in_input_files(files, output, tmp_dir)?;
) -> UResult<()> {
replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?;
let files = files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file }));
if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir)
} else {
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir)
}
}

// Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit<
'a,
M: MergeInput + 'static,
F: ExactSizeIterator<Item = UResult<M>>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
settings: &'a GlobalSettings,
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> {
if files.len() > settings.merge_batch_size {
let mut remaining_files = files.len();
let batches = files.chunks(settings.merge_batch_size);
let mut batches = batches.into_iter();
) -> UResult<()> {
if files.len() <= settings.merge_batch_size {
let merger = merge_without_limit(files, settings);
merger?.write_all(settings, output)
} else {
let mut temporary_files = vec![];
while remaining_files != 0 {
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
let merger = merge_without_limit(batches.next().unwrap(), settings)?;
let mut batch = vec![];
for file in files {
batch.push(file);
if batch.len() >= settings.merge_batch_size {
assert_eq!(batch.len(), settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;
batch = vec![];

let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
}
// Merge any remaining files that didn't get merged in a full batch above.
if !batch.is_empty() {
assert!(batch.len() < settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;

let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
assert!(batches.next().is_none());
merge_with_file_limit::<_, _, Tmp>(
temporary_files
.into_iter()
Expand All @@ -127,10 +131,9 @@ pub fn merge_with_file_limit<
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
>),
settings,
output,
tmp_dir,
)
} else {
merge_without_limit(files, settings)
}
}

Expand Down Expand Up @@ -260,7 +263,7 @@ struct PreviousLine {
}

/// Merges files together. This is **not** an iterator because of lifetime problems.
pub struct FileMerger<'a> {
struct FileMerger<'a> {
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
request_sender: Sender<(usize, RecycledChunk)>,
prev: Option<PreviousLine>,
Expand All @@ -269,12 +272,12 @@ pub struct FileMerger<'a> {

impl FileMerger<'_> {
/// Write the merged contents to the output file.
pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
let mut out = output.into_write();
self.write_all_to(settings, &mut out)
}

pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
while self.write_next(settings, out) {}
drop(self.request_sender);
self.reader_join_handle.join().unwrap()
Expand Down
3 changes: 1 addition & 2 deletions src/uu/sort/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,7 @@ fn exec(
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
if settings.merge {
let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?;
file_merger.write_all(settings, output)
merge::merge(files, settings, output, tmp_dir)
} else if settings.check {
if files.len() > 1 {
Err(UUsageError::new(2, "only one file allowed with -c"))
Expand Down
27 changes: 26 additions & 1 deletion tests/by-util/test_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// For the full copyright and license information, please view the LICENSE
// file that was distributed with this source code.

// spell-checker:ignore (words) ints
// spell-checker:ignore (words) ints (linux) NOFILE
#![allow(clippy::cast_possible_wrap)]

use std::time::Duration;
Expand Down Expand Up @@ -1084,6 +1084,31 @@ fn test_merge_batch_size() {
.stdout_only_fixture("merge_ints_interleaved.expected");
}

#[test]
#[cfg(any(target_os = "linux", target_os = "android"))]
fn test_merge_batch_size_with_limit() {
use rlimit::Resource;
// Currently need...
// 3 descriptors for stdin, stdout, stderr
// 2 descriptors for CTRL+C handling logic (to be reworked at some point)
// 2 descriptors for the input files (i.e. batch-size of 2).
let limit_fd = 3 + 2 + 2;
TestScenario::new(util_name!())
.ucmd()
.limit(Resource::NOFILE, limit_fd, limit_fd)
.arg("--batch-size=2")
.arg("-m")
.arg("--unique")
.arg("merge_ints_interleaved_1.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_1.txt")
.succeeds()
.stdout_only_fixture("merge_ints_interleaved.expected");
}

#[test]
fn test_sigpipe_panic() {
let mut cmd = new_ucmd!();
Expand Down

0 comments on commit 913d5d4

Please sign in to comment.