Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanups #57

Merged
merged 6 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

![Continuous integration](https://github.com/grumlimited/kinesis-tailr/workflows/Continuous%20integration/badge.svg?branch=main)

A simple tool to tail a Kinesis stream built with Rust.
A simple tool to tail a Kinesis stream. Built with Rust.

## Installation

Expand Down
12 changes: 6 additions & 6 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub trait SinkOutput<W>
where
W: Write,
{
fn output(&self) -> BufWriter<W>;
fn output(&self) -> Result<BufWriter<W>>;

fn write_date(&self, date: &str) -> String {
date.to_string()
Expand Down Expand Up @@ -73,13 +73,13 @@ where
tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
rx_records: Receiver<Result<ShardProcessorADT, ProcessError>>,
handle: &mut BufWriter<W>,
) -> io::Result<()>;
) -> Result<()>;

async fn run(
&mut self,
tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
rx_records: Receiver<Result<ShardProcessorADT, ProcessError>>,
) -> io::Result<()>;
) -> Result<()>;

fn handle_termination(&self, tx_records: Sender<Result<ShardProcessorADT, ProcessError>>);

Expand Down Expand Up @@ -114,7 +114,7 @@ where
tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
mut rx_records: Receiver<Result<ShardProcessorADT, ProcessError>>,
handle: &mut BufWriter<W>,
) -> io::Result<()> {
) -> Result<()> {
self.delimiter(handle).unwrap();

/*
Expand Down Expand Up @@ -213,8 +213,8 @@ where
&mut self,
tx_records: Sender<Result<ShardProcessorADT, ProcessError>>,
rx_records: Receiver<Result<ShardProcessorADT, ProcessError>>,
) -> io::Result<()> {
let output = &mut self.output();
) -> Result<()> {
let output = &mut self.output()?;
self.run_inner(tx_records, rx_records, output).await
}

Expand Down
18 changes: 8 additions & 10 deletions src/sink/buffer_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ impl BufferTicker {
pub fn start(&self) {
let tx_records = self.tx_records.clone();

tokio::spawn({
async move {
let delay = Duration::from_secs(5);
tokio::spawn(async move {
let delay = Duration::from_secs(5);

loop {
sleep(delay).await;
tx_records
.send(Ok(ShardProcessorADT::Flush))
.await
.expect("Count not send ");
}
loop {
sleep(delay).await;
tx_records
.send(Ok(ShardProcessorADT::Flush))
.await
.expect("Count not send ShardProcessorADT::Flush");
}
});
}
Expand Down
5 changes: 3 additions & 2 deletions src/sink/console.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::sink::{Configurable, SinkConfig, SinkOutput};
use anyhow::Result;
use colored::Colorize;
use std::io;
use std::io::{BufWriter, Stdout};
Expand Down Expand Up @@ -50,9 +51,9 @@ impl Configurable for ConsoleSink {
}

impl SinkOutput<Stdout> for ConsoleSink {
fn output(&self) -> BufWriter<Stdout> {
fn output(&self) -> Result<BufWriter<Stdout>> {
let stdout = io::stdout(); // get the global stdout entity
io::BufWriter::with_capacity(CONSOLE_BUF_SIZE, stdout)
Ok(BufWriter::with_capacity(CONSOLE_BUF_SIZE, stdout))
}

fn write_date(&self, date: &str) -> String {
Expand Down
2 changes: 1 addition & 1 deletion src/sink/console_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub struct StringSink {
}

impl SinkOutput<Vec<u8>> for StringSink {
fn output(&self) -> BufWriter<Vec<u8>> {
fn output(&self) -> Result<BufWriter<Vec<u8>>> {
unimplemented!()
}
}
Expand Down
19 changes: 9 additions & 10 deletions src/sink/file.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Result;
use std::fs::File;
use std::io;
use std::io::BufWriter;
Expand Down Expand Up @@ -56,15 +57,13 @@ impl Configurable for FileSink {
}

impl SinkOutput<File> for FileSink {
fn output(&self) -> BufWriter<File> {
let file = File::create(&self.file)
.map_err(|e| {
io::Error::new(
e.kind(),
format!("Could not write to file {}", self.file.display()),
)
})
.unwrap();
BufWriter::with_capacity(FILE_BUF_SIZE, file)
fn output(&self) -> Result<BufWriter<File>> {
let file = File::create(&self.file).map_err(|e| {
io::Error::new(
e.kind(),
format!("Could not write to file {}", self.file.display()),
)
})?;
Ok(BufWriter::with_capacity(FILE_BUF_SIZE, file))
}
}
Loading