Skip to content

Commit

Permalink
dump: autodetect and decompress gzip and zstd (#79)
Browse files Browse the repository at this point in the history
  • Loading branch information
artemkach authored Feb 1, 2024
1 parent 8138f11 commit a8f7c28
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
/.idea
/target
*.swp
*.swo
*.swn
109 changes: 109 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ keywords = ["format", "parse", "encode"]
anyhow = "1.0"
clap = { version = "4.0.17", features = ["cargo"] }
colored = "2.0.0"
flate2 = "1.0"
infer = "0.15.0"
ion-rs = "0.18.1"
memmap = "0.7.0"
tempfile = "3.2.0"
Expand All @@ -26,6 +28,7 @@ tera = { version = "1.18.1", optional = true }
convert_case = { version = "0.6.0", optional = true }
matches = "0.1.10"
thiserror = "1.0.50"
zstd = "0.13.0"

[dev-dependencies]
rstest = "~0.17.0"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.65-slim-buster as builder
FROM rust:1.75-slim-buster as builder
WORKDIR /usr/src/ion-cli
COPY . .
RUN cargo install --verbose --path .
Expand Down
30 changes: 20 additions & 10 deletions src/bin/ion/commands/beta/head.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::commands::{dump, IonCliCommand, WithIonCliArgument};
use anyhow::Result;
use clap::{value_parser, Arg, ArgMatches, Command};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};

pub struct HeadCommand;

Expand All @@ -14,15 +14,25 @@ impl IonCliCommand for HeadCommand {
}

fn configure_args(&self, command: Command) -> Command {
command.with_input().with_output().with_format().arg(
Arg::new("values")
.long("values")
.short('n')
.value_parser(value_parser!(usize))
.allow_negative_numbers(false)
.default_value("10")
.help("Specifies the number of output top-level values."),
)
command
.with_input()
.with_output()
.with_format()
.arg(
Arg::new("values")
.long("values")
.short('n')
.value_parser(value_parser!(usize))
.allow_negative_numbers(false)
.default_value("10")
.help("Specifies the number of output top-level values."),
)
.arg(
Arg::new("no-auto-decompress")
.long("no-auto-decompress")
.action(ArgAction::SetTrue)
.help("Turn off automatic decompression detection."),
)
}

fn run(&self, command_path: &mut Vec<String>, args: &ArgMatches) -> Result<()> {
Expand Down
97 changes: 93 additions & 4 deletions src/bin/ion/commands/dump.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::commands::{IonCliCommand, WithIonCliArgument};
use anyhow::{Context, Result};
use clap::{value_parser, Arg, ArgMatches, Command};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use ion_rs::*;
use std::fs::File;
use std::io::{stdin, stdout, StdinLock, Write};
use std::io::{self, stdin, stdout, BufRead, BufReader, Chain, Cursor, Read, StdinLock, Write};

pub struct DumpCommand;

const BUF_READER_CAPACITY: usize = 2 << 20; // 1 MiB
const INFER_HEADER_LENGTH: usize = 8;

impl IonCliCommand for DumpCommand {
fn name(&self) -> &'static str {
"dump"
Expand All @@ -31,6 +34,12 @@ impl IonCliCommand for DumpCommand {
.with_input()
.with_output()
.with_format()
.arg(
Arg::new("no-auto-decompress")
.long("no-auto-decompress")
.action(ArgAction::SetTrue)
.help("Turn off automatic decompression detection."),
)
}

fn run(&self, _command_path: &mut Vec<String>, args: &ArgMatches) -> Result<()> {
Expand Down Expand Up @@ -60,12 +69,23 @@ impl IonCliCommand for DumpCommand {
for input_file in input_file_iter {
let file = File::open(input_file)
.with_context(|| format!("Could not open file '{}'", input_file))?;
let mut reader = ReaderBuilder::new().build(file)?;
let mut reader = if let Some(true) = args.get_one::<bool>("no-auto-decompress") {
ReaderBuilder::new().build(file)?
} else {
let bfile = BufReader::with_capacity(BUF_READER_CAPACITY, file);
let zfile = auto_decompressing_reader(bfile, INFER_HEADER_LENGTH)?;
ReaderBuilder::new().build(zfile)?
};
write_in_format(&mut reader, &mut output, format, values)?;
}
} else {
let input: StdinLock = stdin().lock();
let mut reader = ReaderBuilder::new().build(input)?;
let mut reader = if let Some(true) = args.get_one::<bool>("no-auto-decompress") {
ReaderBuilder::new().build(input)?
} else {
let zinput = auto_decompressing_reader(input, INFER_HEADER_LENGTH)?;
ReaderBuilder::new().build(zinput)?
};
write_in_format(&mut reader, &mut output, format, values)?;
}

Expand Down Expand Up @@ -216,3 +236,72 @@ fn transcribe_n_values<W: IonWriter>(
writer.flush()?;
Ok(index)
}

/// Autodetects a compressed byte stream and wraps the original reader
/// into a reader that transparently decompresses.
///
/// To support non-seekable readers like `Stdin`, we could have used a
/// full-blown buffering wrapper with unlimited rewinds, but since we only
/// need the first few magic bytes at offset 0, we cheat and instead make a
/// `Chain` reader from the buffered header followed by the original reader.
///
/// The choice of `Chain` type here is not quite necessary: it could have
/// been simply `dyn BufRead`, but there is no `ToIonDataSource` trait
/// implementation for `dyn BufRead` at the moment.
type AutoDecompressingReader = Chain<Box<dyn BufRead>, Box<dyn BufRead>>;

fn auto_decompressing_reader<R>(
mut reader: R,
header_len: usize,
) -> IonResult<AutoDecompressingReader>
where
R: BufRead + 'static,
{
// read header
let mut header_bytes = vec![0; header_len];
let nread = read_reliably(&mut reader, &mut header_bytes)?;
header_bytes.truncate(nread);

// detect compression type and wrap reader in a decompressor
match infer::get(&header_bytes) {
Some(t) => match t.extension() {
"gz" => {
// "rewind" to let the decompressor read magic bytes again
let header: Box<dyn BufRead> = Box::new(Cursor::new(header_bytes));
let chain = header.chain(reader);
let zreader = Box::new(BufReader::new(flate2::read::GzDecoder::new(chain)));
// must return a `Chain`, so prepend an empty buffer
let nothing: Box<dyn BufRead> = Box::new(Cursor::new(&[] as &[u8]));
Ok(nothing.chain(zreader))
}
"zst" => {
let header: Box<dyn BufRead> = Box::new(Cursor::new(header_bytes));
let chain = header.chain(reader);
let zreader = Box::new(BufReader::new(zstd::stream::read::Decoder::new(chain)?));
let nothing: Box<dyn BufRead> = Box::new(Cursor::new(&[] as &[u8]));
Ok(nothing.chain(zreader))
}
_ => {
let header: Box<dyn BufRead> = Box::new(Cursor::new(header_bytes));
Ok(header.chain(Box::new(reader)))
}
},
None => {
let header: Box<dyn BufRead> = Box::new(Cursor::new(header_bytes));
Ok(header.chain(Box::new(reader)))
}
}
}

/// same as `Read` trait's read() method, but loops in case of fragmented reads
fn read_reliably<R: Read>(reader: &mut R, buf: &mut [u8]) -> io::Result<usize> {
let mut nread = 0;
while nread < buf.len() {
match reader.read(&mut buf[nread..]) {
Ok(0) => break,
Ok(n) => nread += n,
Err(e) => return Err(e),
}
}
Ok(nread)
}
Loading

0 comments on commit a8f7c28

Please sign in to comment.