Skip to content

Commit

Permalink
fix: flush BufWriter (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgbalogh authored Nov 7, 2024
1 parent 75a187c commit bdd94c1
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
io::{self, AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter},
};
use tokio_stream::StreamExt;
use tracing::trace;
use tracing_subscriber::{fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt};
use types::{BasinConfig, StreamConfig, RETENTION_POLICY_PATH, STORAGE_CLASS_PATH};

Expand Down Expand Up @@ -237,16 +238,20 @@ impl RecordsIO {
pub async fn into_writer(&self) -> io::Result<Box<dyn AsyncWrite + Send + Unpin>> {
match self {
RecordsIO::File(path) => {
trace!(?path, "opening file writer");
let file = OpenOptions::new()
.write(true)
.create(true)
.append(true)
.open(path)
.await?;

Ok(Box::new(tokio::io::BufWriter::new(file)))
Ok(Box::new(BufWriter::new(file)))
}
RecordsIO::Stdout => {
trace!("stdout writer");
Ok(Box::new(BufWriter::new(tokio::io::stdout())))
}
RecordsIO::Stdout => Ok(Box::new(BufWriter::new(tokio::io::stdout()))),
RecordsIO::Stdin => panic!("unsupported record source"),
}
}
Expand Down Expand Up @@ -538,6 +543,9 @@ async fn run() -> Result<(), S2CliError> {
);
}
}
if let Some(ref mut writer) = writer {
writer.flush().await.expect("writer flush");
}
}
}
}
Expand Down

0 comments on commit bdd94c1

Please sign in to comment.