Skip to content

Commit

Permalink
vcf/async/reader: Buffer lazy record read
Browse files Browse the repository at this point in the history
This allows the blocking lazy record reader to be used.
  • Loading branch information
zaeleus committed Jan 22, 2024
1 parent 3907108 commit 4a59eac
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 71 deletions.
89 changes: 20 additions & 69 deletions noodles-vcf/src/async/reader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
mod header;
mod query;

use std::str;

use futures::{stream, Stream};
use noodles_bgzf as bgzf;
use noodles_core::Region;
Expand Down Expand Up @@ -228,7 +226,7 @@ where
/// # }
/// ```
pub async fn read_lazy_record(&mut self, record: &mut lazy::Record) -> io::Result<usize> {
read_lazy_record(&mut self.inner, record).await
read_lazy_record(&mut self.inner, &mut self.buf, record).await
}

/// Returns an (async) stream over records starting from the current (input) stream position.
Expand Down Expand Up @@ -412,76 +410,22 @@ where
}
}

async fn read_lazy_record<R>(reader: &mut R, record: &mut lazy::Record) -> io::Result<usize>
async fn read_lazy_record<R>(
reader: &mut R,
buf: &mut String,
record: &mut lazy::Record,
) -> io::Result<usize>
where
R: AsyncBufRead + Unpin,
{
record.buf.clear();

let mut len = 0;

len += read_field(reader, &mut record.buf).await?;
record.bounds.chromosome_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.position_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.ids_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.reference_bases_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.alternate_bases_end = record.buf.len();
buf.clear();

len += read_field(reader, &mut record.buf).await?;
record.bounds.quality_score_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.filters_end = record.buf.len();

len += read_field(reader, &mut record.buf).await?;
record.bounds.info_end = record.buf.len();

len += read_line(reader, &mut record.buf).await?;

Ok(len)
}

async fn read_field<R>(reader: &mut R, dst: &mut String) -> io::Result<usize>
where
R: AsyncBufRead + Unpin,
{
const DELIMITER: u8 = b'\t';

let mut is_delimiter = false;
let mut len = 0;

loop {
let src = reader.fill_buf().await?;

if is_delimiter || src.is_empty() {
break;
}

let (buf, n) = match src.iter().position(|&b| b == DELIMITER) {
Some(i) => {
is_delimiter = true;
(&src[..i], i + 1)
}
None => (src, src.len()),
};

let s = str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
dst.push_str(s);

len += n;

reader.consume(n);
if reader.read_line(buf).await? == 0 {
return Ok(0);
}

Ok(len)
let mut buf = buf.as_bytes();
crate::reader::lazy_record::read_lazy_record(&mut buf, record)
}

#[cfg(test)]
Expand All @@ -508,10 +452,11 @@ mod tests {

#[tokio::test]
async fn test_read_lazy_record() -> io::Result<()> {
let mut src = &b"sq0\t1\t.\tA\t.\t.\tPASS\t."[..];
let mut src = &b"sq0\t1\t.\tA\t.\t.\tPASS\t.\n"[..];
let mut buf = String::new();

let mut record = lazy::Record::default();
read_lazy_record(&mut src, &mut record).await?;
read_lazy_record(&mut src, &mut buf, &mut record).await?;

assert_eq!(record.buf, "sq01.A..PASS.");

Expand All @@ -524,6 +469,12 @@ mod tests {
assert_eq!(record.bounds.filters_end, 12);
assert_eq!(record.bounds.info_end, 13);

let mut src = &b"\n"[..];
assert!(matches!(
read_lazy_record(&mut src, &mut buf, &mut record).await,
Err(e) if e.kind() == io::ErrorKind::InvalidData,
));

Ok(())
}
}
2 changes: 1 addition & 1 deletion noodles-vcf/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod builder;
mod header;
mod lazy_record;
pub(crate) mod lazy_record;
pub(crate) mod query;
pub mod record;
mod records;
Expand Down
2 changes: 1 addition & 1 deletion noodles-vcf/src/reader/lazy_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use super::read_line;
use crate::lazy;

pub(super) fn read_lazy_record<R>(reader: &mut R, record: &mut lazy::Record) -> io::Result<usize>
pub(crate) fn read_lazy_record<R>(reader: &mut R, record: &mut lazy::Record) -> io::Result<usize>
where
R: BufRead,
{
Expand Down

0 comments on commit 4a59eac

Please sign in to comment.