From 4a59eac61daf9dd72ad28f213cb609fa92126760 Mon Sep 17 00:00:00 2001 From: Michael Macias Date: Mon, 22 Jan 2024 15:58:08 -0600 Subject: [PATCH] vcf/async/reader: Buffer lazy record read This allows the blocking lazy record reader to be used. --- noodles-vcf/src/async/reader.rs | 89 ++++++--------------------- noodles-vcf/src/reader.rs | 2 +- noodles-vcf/src/reader/lazy_record.rs | 2 +- 3 files changed, 22 insertions(+), 71 deletions(-) diff --git a/noodles-vcf/src/async/reader.rs b/noodles-vcf/src/async/reader.rs index 09d1ecda5..1a88a7764 100644 --- a/noodles-vcf/src/async/reader.rs +++ b/noodles-vcf/src/async/reader.rs @@ -1,8 +1,6 @@ mod header; mod query; -use std::str; - use futures::{stream, Stream}; use noodles_bgzf as bgzf; use noodles_core::Region; @@ -228,7 +226,7 @@ where /// # } /// ``` pub async fn read_lazy_record(&mut self, record: &mut lazy::Record) -> io::Result { - read_lazy_record(&mut self.inner, record).await + read_lazy_record(&mut self.inner, &mut self.buf, record).await } /// Returns an (async) stream over records starting from the current (input) stream position. @@ -412,76 +410,22 @@ where } } -async fn read_lazy_record(reader: &mut R, record: &mut lazy::Record) -> io::Result +async fn read_lazy_record( + reader: &mut R, + buf: &mut String, + record: &mut lazy::Record, +) -> io::Result where R: AsyncBufRead + Unpin, { - record.buf.clear(); - - let mut len = 0; - - len += read_field(reader, &mut record.buf).await?; - record.bounds.chromosome_end = record.buf.len(); - - len += read_field(reader, &mut record.buf).await?; - record.bounds.position_end = record.buf.len(); - - len += read_field(reader, &mut record.buf).await?; - record.bounds.ids_end = record.buf.len(); - - len += read_field(reader, &mut record.buf).await?; - record.bounds.reference_bases_end = record.buf.len(); - - len += read_field(reader, &mut record.buf).await?; - record.bounds.alternate_bases_end = record.buf.len(); + buf.clear(); - len += read_field(reader, &mut record.buf).await?; - record.bounds.quality_score_end = record.buf.len(); - - len += read_field(reader, &mut record.buf).await?; - record.bounds.filters_end = record.buf.len(); - - len += read_field(reader, &mut record.buf).await?; - record.bounds.info_end = record.buf.len(); - - len += read_line(reader, &mut record.buf).await?; - - Ok(len) -} - -async fn read_field(reader: &mut R, dst: &mut String) -> io::Result -where - R: AsyncBufRead + Unpin, -{ - const DELIMITER: u8 = b'\t'; - - let mut is_delimiter = false; - let mut len = 0; - - loop { - let src = reader.fill_buf().await?; - - if is_delimiter || src.is_empty() { - break; - } - - let (buf, n) = match src.iter().position(|&b| b == DELIMITER) { - Some(i) => { - is_delimiter = true; - (&src[..i], i + 1) - } - None => (src, src.len()), - }; - - let s = str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; - dst.push_str(s); - - len += n; - - reader.consume(n); + if reader.read_line(buf).await? == 0 { + return Ok(0); } - Ok(len) + let mut buf = buf.as_bytes(); + crate::reader::lazy_record::read_lazy_record(&mut buf, record) } #[cfg(test)] @@ -508,10 +452,11 @@ mod tests { #[tokio::test] async fn test_read_lazy_record() -> io::Result<()> { - let mut src = &b"sq0\t1\t.\tA\t.\t.\tPASS\t."[..]; + let mut src = &b"sq0\t1\t.\tA\t.\t.\tPASS\t.\n"[..]; + let mut buf = String::new(); let mut record = lazy::Record::default(); - read_lazy_record(&mut src, &mut record).await?; + read_lazy_record(&mut src, &mut buf, &mut record).await?; assert_eq!(record.buf, "sq01.A..PASS."); @@ -524,6 +469,12 @@ mod tests { assert_eq!(record.bounds.filters_end, 12); assert_eq!(record.bounds.info_end, 13); + let mut src = &b"\n"[..]; + assert!(matches!( + read_lazy_record(&mut src, &mut buf, &mut record).await, + Err(e) if e.kind() == io::ErrorKind::InvalidData, + )); + Ok(()) } } diff --git a/noodles-vcf/src/reader.rs b/noodles-vcf/src/reader.rs index 1f458c3c8..c544c8246 100644 --- a/noodles-vcf/src/reader.rs +++ b/noodles-vcf/src/reader.rs @@ -2,7 +2,7 @@ mod builder; mod header; -mod lazy_record; +pub(crate) mod lazy_record; pub(crate) mod query; pub mod record; mod records; diff --git a/noodles-vcf/src/reader/lazy_record.rs b/noodles-vcf/src/reader/lazy_record.rs index 38bc53219..babfe9fed 100644 --- a/noodles-vcf/src/reader/lazy_record.rs +++ b/noodles-vcf/src/reader/lazy_record.rs @@ -6,7 +6,7 @@ use std::{ use super::read_line; use crate::lazy; -pub(super) fn read_lazy_record(reader: &mut R, record: &mut lazy::Record) -> io::Result +pub(crate) fn read_lazy_record(reader: &mut R, record: &mut lazy::Record) -> io::Result where R: BufRead, {