Skip to content

Commit

Permalink
Raw output by default (#56)
Browse files Browse the repository at this point in the history
Raw output by default (#56)
  • Loading branch information
gr211 authored Apr 26, 2024
1 parent 03d8abf commit e73c6ee
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Arch Linux.
-o, --output-file <OUTPUT_FILE> Output file to write to
-c, --concurrent <CONCURRENT> Concurrent number of shards to tail
-v, --verbose Display additional information
-n, --no-base64 Do not base64 encode the payload upon invalid UTF-8 payloads. Print it raw instead
-b, --base64-encoding Base64 encode the payload (for binary payloads)
-h, --help Print help
-V, --version Print version

Expand Down
4 changes: 2 additions & 2 deletions src/cli_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ pub struct Opt {
#[structopt(short, long)]
pub verbose: bool,

/// Do not base64 encode the payload upon invalid UTF-8 payloads. Print it raw instead.
/// Base64 encode the payload (eg. for binary payloads)
#[structopt(short, long)]
pub no_base64: bool,
pub base64_encoding: bool,
}

pub(crate) fn selected_shards(
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn main() -> Result<()> {
opt.print_shard_id,
opt.print_timestamp,
opt.print_delimiter,
opt.no_base64,
opt.base64_encoding,
shard_count,
file,
)
Expand All @@ -83,7 +83,7 @@ async fn main() -> Result<()> {
opt.print_shard_id,
opt.print_timestamp,
opt.print_delimiter,
opt.no_base64,
opt.base64_encoding,
shard_count,
)
.run(tx_records, rx_records)
Expand Down
122 changes: 73 additions & 49 deletions src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::io;
use std::borrow::Cow;
use std::cmp::{max, min};
use std::io::{BufWriter, Write};
use std::{io, str};

use anyhow::Error;
use anyhow::Result;
use async_trait::async_trait;
use buffer_flush::BufferTicker;
use chrono::TimeZone;
use log::{debug, error, warn};
use tokio::sync::mpsc::{Receiver, Sender};

use buffer_flush::BufferTicker;

use crate::kinesis::models::{ProcessError, RecordResult, ShardProcessorADT};

pub mod console;
Expand All @@ -24,7 +27,7 @@ pub struct SinkConfig {
print_timestamp: bool,
print_delimiter: bool,
exit_after_termination: bool,
no_base64: bool,
base64_encoding: bool,
}

pub trait Configurable {
Expand Down Expand Up @@ -90,7 +93,7 @@ where
}
}

fn format_record(&self, record_result: &RecordResult) -> String;
fn format_record(&self, record_result: &RecordResult) -> Vec<u8>;

fn termination_message_and_exit(
&self,
Expand All @@ -104,7 +107,7 @@ where
impl<T, W> Sink<T, W> for T
where
W: Write + Send,
T: SinkOutput<W> + Configurable + Send + Sync,
T: SinkOutput<W> + Configurable + Send,
{
async fn run_inner(
&mut self,
Expand All @@ -115,68 +118,75 @@ where
self.delimiter(handle).unwrap();

/*
* Start the buffer ticker to flush the buffer every 5 second
* Start the buffer ticker to flush the buffer every 5 seconds.
* This is needed because if the buffer is not full (not enough message to trigger a nature flush),
* then no output is displayed until ctrl^c is pressed.
*/
BufferTicker::new(tx_records.clone()).start();

let mut count = 0;
let mut sc = self.shard_count();
let mut total_records_processed = 0;
let mut active_shards_count = self.shard_count();

self.handle_termination(tx_records.clone());

while let Some(res) = rx_records.recv().await {
match res {
Ok(adt) => match adt {
ShardProcessorADT::BeyondToTimestamp => {
if sc > 0 {
sc = sc.saturating_sub(1);
if active_shards_count > 0 {
active_shards_count = active_shards_count.saturating_sub(1);
}

if sc == 0 {
if active_shards_count == 0 {
tx_records
.send(Ok(ShardProcessorADT::Termination))
.await
.expect("Could not send termination message");
}
}
ShardProcessorADT::Progress(res) => match self.get_config().max_messages {
ShardProcessorADT::Progress(records) => match self.get_config().max_messages {
Some(max_messages) if total_records_processed >= max_messages => self
.termination_message_and_exit(
handle,
total_records_processed,
&mut rx_records,
)?,
Some(max_messages) => {
if count >= max_messages {
self.termination_message_and_exit(handle, count, &mut rx_records)?;
}

let remaining_records_to_display =
std::cmp::max(max_messages - count, 0);
max(max_messages - total_records_processed, 0);

if remaining_records_to_display > 0 && !res.is_empty() {
let split_at =
std::cmp::min(remaining_records_to_display as usize, res.len());
count += split_at as u32;
if remaining_records_to_display > 0 && !records.is_empty() {
let records_to_display_count =
min(remaining_records_to_display as usize, records.len());
total_records_processed += records_to_display_count as u32;

let (to_display, _) = res.split_at(split_at);
let (records_to_display, _) =
records.split_at(records_to_display_count);

to_display.iter().for_each(|record| {
records_to_display.iter().for_each(|record| {
let data = self.format_record(record);
writeln!(handle, "{}", data).unwrap();

let _ = handle.write(data.as_slice()).unwrap();
// writeln!(handle, "{}", data).unwrap();

self.delimiter(handle).unwrap();
});
}
}
None => {
count += res.len() as u32;
res.iter().for_each(|record| {
total_records_processed += records.len() as u32;
records.iter().for_each(|record| {
let data = self.format_record(record);
writeln!(handle, "{}", data).unwrap();
// writeln!(handle, "{}", data).unwrap();
let _ = handle.write(data.as_slice()).unwrap();
self.delimiter(handle).unwrap()
});
}
},
ShardProcessorADT::Flush => handle.flush().unwrap(),
ShardProcessorADT::Termination => {
debug!("Termination message received");
let messages_processed = count;
let messages_processed = total_records_processed;

self.termination_message_and_exit(
handle,
Expand Down Expand Up @@ -235,57 +245,71 @@ where
Ok(())
}

fn format_record(&self, record_result: &RecordResult) -> String {
let data = match std::str::from_utf8(record_result.data.as_slice()) {
Ok(payload) => payload.to_string(),
Err(_) if self.get_config().no_base64 => {
String::from_utf8_lossy(record_result.data.as_slice()).to_string()
}
Err(_) => {
fn format_record(&self, record_result: &RecordResult) -> Vec<u8> {
let line_feed = vec![b'\n'];

let payload = match &record_result.data {
payload if self.get_config().base64_encoding => {
use base64::{engine::general_purpose, Engine as _};
general_purpose::STANDARD.encode(record_result.data.as_slice())
Cow::Owned(
general_purpose::STANDARD
.encode(payload)
.as_bytes()
.to_vec(),
)
}
payload => Cow::Borrowed(payload),
};

let data = if self.get_config().print_key {
let partition_key = if self.get_config().print_key {
let key = record_result.partition_key.to_string();
let key = self.write_key(&key);

format!("{} {}", key, data)
format!("{} ", key)
} else {
data
"".to_string()
};

let data = if self.get_config().print_sequence_number {
let sequence_number = if self.get_config().print_sequence_number {
let key = record_result.sequence_id.to_string();
let key = self.write_sequence_number(&key);

format!("{} {}", key, data)
format!("{} ", key)
} else {
data
"".to_string()
};

let data = if self.get_config().print_shard_id {
let shard_id = if self.get_config().print_shard_id {
let shard_id = record_result.shard_id.to_string();
let shard_id = self.write_shard_id(&shard_id);

format!("{} {}", shard_id, data)
format!("{} ", shard_id)
} else {
data
"".to_string()
};

if self.get_config().print_timestamp {
let date = if self.get_config().print_timestamp {
let date = chrono::Utc
.timestamp_opt(record_result.datetime.secs(), 0)
.unwrap();

let date = date.format("%+").to_string();
let date = self.write_date(&date);

format!("{} {}", date, data)
format!("{} ", date)
} else {
data
}
"".to_string()
};

[
partition_key.as_bytes(),
sequence_number.as_bytes(),
shard_id.as_bytes(),
date.as_bytes(),
payload.as_slice(),
line_feed.as_slice(),
]
.concat()
}

fn termination_message_and_exit(
Expand Down
2 changes: 1 addition & 1 deletion src/sink/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl ConsoleSink {
print_shard_id,
print_timestamp,
print_delimiter,
no_base64,
base64_encoding: no_base64,
exit_after_termination: true,
},
shard_count,
Expand Down
25 changes: 17 additions & 8 deletions src/sink/console_tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use super::*;
use crate::kinesis::models::ShardProcessorADT::{BeyondToTimestamp, Progress, Termination};
use crate::sink::console::ConsoleSink;
use aws_sdk_kinesis::primitives::DateTime;
use std::sync::Arc;

use aws_sdk_kinesis::primitives::DateTime;
use tokio::sync::mpsc;

use crate::kinesis::models::ShardProcessorADT::{BeyondToTimestamp, Progress, Termination};
use crate::sink::console::ConsoleSink;

use super::*;

#[test]
fn format_nb_messages_ok() {
let console = ConsoleSink {
Expand Down Expand Up @@ -50,6 +53,7 @@ fn format_outputs() {
fn format_outputs_base64() {
let console = ConsoleSink {
config: SinkConfig {
base64_encoding: true,
no_color: true,
..Default::default()
},
Expand All @@ -66,14 +70,17 @@ fn format_outputs_base64() {
data: input.to_vec(),
};

assert_eq!(console.format_record(&record), "SGVsbG8g8JCAV29ybGQ=");
let vec = console.format_record(&record);
let result = String::from_utf8_lossy(vec.as_slice());
assert_eq!(result, "SGVsbG8g8JCAV29ybGQ=\n");
}

#[test]
fn format_outputs_no_base64() {
fn format_outputs_raw() {
let console = ConsoleSink {
config: SinkConfig {
no_base64: true,
base64_encoding: false,
no_color: true,
..Default::default()
},
shard_count: 1,
Expand All @@ -89,7 +96,9 @@ fn format_outputs_no_base64() {
data: input.to_vec(),
};

assert_eq!(console.format_record(&record), "Hello �World");
let vec = console.format_record(&record);
let result = String::from_utf8_lossy(vec.as_slice());
assert_eq!(result, "Hello �World\n");
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion src/sink/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl FileSink {
print_shard_id,
print_timestamp,
print_delimiter,
no_base64,
base64_encoding: no_base64,
exit_after_termination: true,
},
file: file.into(),
Expand Down

0 comments on commit e73c6ee

Please sign in to comment.