From 9930e5019965e533503fedcfe71902747fbf4d85 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 10 Dec 2021 13:49:04 +0000 Subject: [PATCH 1/3] Simplify record reader --- parquet/src/arrow/record_reader.rs | 73 ++++++++++++++---------------- 1 file changed, 34 insertions(+), 39 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 4dd7da910fd0..82c7ee98a9f3 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -43,10 +43,8 @@ pub struct RecordReader { /// Number of values `num_records` contains. num_values: usize, - values_seen: usize, /// Starts from 1, number of values have been written to buffer values_written: usize, - in_middle_of_record: bool, } impl RecordReader { @@ -75,9 +73,7 @@ impl RecordReader { column_desc: column_schema, num_records: 0, num_values: 0, - values_seen: 0, values_written: 0, - in_middle_of_record: false, } } @@ -107,21 +103,25 @@ impl RecordReader { loop { // Try to find some records from buffers that has been read into memory // but not counted as seen records. - records_read += self.split_records(num_records - records_read)?; - - // Since page reader contains complete records, so if we reached end of a - // page reader, we should reach the end of a record - if end_of_column - && self.values_seen >= self.values_written - && self.in_middle_of_record - { - self.num_records += 1; - self.num_values = self.values_seen; - self.in_middle_of_record = false; - records_read += 1; + let (record_count, value_count) = + self.count_records(num_records - records_read); + + self.num_records += record_count; + self.num_values += value_count; + records_read += record_count; + + if records_read == num_records { + break; } - if (records_read >= num_records) || end_of_column { + if end_of_column { + // Since page reader contains complete records, if we reached end of a + // page reader, we should reach the end of a record + if self.rep_levels.is_some() { + self.num_records += 1; + self.num_values = self.values_written; + records_read += 1; + } break; } @@ -265,8 +265,6 @@ impl RecordReader { self.values_written -= self.num_values; self.num_records = 0; self.num_values = 0; - self.values_seen = 0; - self.in_middle_of_record = false; } /// Returns bitmap data. @@ -367,10 +365,11 @@ impl RecordReader { Ok(values_read) } - /// Split values into records according repetition definition and returns number of - /// records read. - #[allow(clippy::unnecessary_wraps)] - fn split_records(&mut self, records_to_read: usize) -> Result { + /// Inspects the buffered repetition levels in the range `self.num_values..self.values_written` + /// and returns the number of "complete" records along with the corresponding number of values + /// + /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 + fn count_records(&self, records_to_read: usize) -> (usize, usize) { let rep_levels = self.rep_levels.as_ref().map(|buf| { let (prefix, rep_levels, suffix) = unsafe { buf.as_slice().align_to::() }; @@ -381,32 +380,28 @@ impl RecordReader { match rep_levels { Some(buf) => { let mut records_read = 0; + let mut end_of_last_record = self.num_values; - while (self.values_seen < self.values_written) - && (records_read < records_to_read) - { - if buf[self.values_seen] == 0 { - if self.in_middle_of_record { + for current in self.num_values..self.values_written { + if buf[current] == 0 { + if current != end_of_last_record { records_read += 1; - self.num_records += 1; - self.num_values = self.values_seen; + end_of_last_record = current; + + if records_read == records_to_read { + break; + } } - self.in_middle_of_record = true; } - self.values_seen += 1; } - Ok(records_read) + (records_read, end_of_last_record - self.num_values) } None => { let records_read = - min(records_to_read, self.values_written - self.values_seen); - self.num_records += records_read; - self.num_values += records_read; - self.values_seen += records_read; - self.in_middle_of_record = false; + min(records_to_read, self.values_written - self.num_values); - Ok(records_read) + (records_read, records_read) } } } From cd0f759a1e8478977e5d611b16d99e25f6971e2c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 10 Dec 2021 21:19:56 +0000 Subject: [PATCH 2/3] Fix clippy lints --- parquet/src/arrow/record_reader.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 82c7ee98a9f3..e4529ef6de5f 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -383,14 +383,12 @@ impl RecordReader { let mut end_of_last_record = self.num_values; for current in self.num_values..self.values_written { - if buf[current] == 0 { - if current != end_of_last_record { - records_read += 1; - end_of_last_record = current; - - if records_read == records_to_read { - break; - } + if buf[current] == 0 && current != end_of_last_record { + records_read += 1; + end_of_last_record = current; + + if records_read == records_to_read { + break; } } } From a47bff5e0980ad97fb27b071f0c3cc5725e1010d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 13 Dec 2021 20:29:17 +0000 Subject: [PATCH 3/3] Tweak count_records predicate --- parquet/src/arrow/record_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index e4529ef6de5f..a5c0b47efcd2 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -383,7 +383,7 @@ impl RecordReader { let mut end_of_last_record = self.num_values; for current in self.num_values..self.values_written { - if buf[current] == 0 && current != end_of_last_record { + if buf[current] == 0 && current != self.num_values { records_read += 1; end_of_last_record = current;