Skip to content
This repository has been archived by the owner on Apr 23, 2023. It is now read-only.

Commit

Permalink
Throttle log stream to keep ui responsive
Browse files Browse the repository at this point in the history
  • Loading branch information
njust committed Feb 3, 2022
1 parent 525490f commit 80c75e8
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "ktail"
edition = "2021"
version = "0.4.4"
version = "0.4.5"
authors = ["nico.just <[email protected]>"]
description = "Kubernetes Log Viewer"
license = "Copyright © 2022 Nico Just"
Expand Down
44 changes: 23 additions & 21 deletions src/log_overview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use gtk4_helper::{
use itertools::Itertools;

enum WorkerData {
Timestamp(DateTime<Utc>),
Timestamp(Vec<DateTime<Utc>>),
Highlight(HighlightResultData)
}

Expand All @@ -33,7 +33,7 @@ pub enum LogOverviewMsg {
Redraw,
Clear,
HighlightResults(HighlightResultData),
LogData(DateTime<Utc>),
LogData(Vec<DateTime<Utc>>),
}

impl Component for LogOverview {
Expand Down Expand Up @@ -67,29 +67,31 @@ impl Component for LogOverview {
std::thread::spawn(move|| {
while let Ok(data) = r.recv() {
match data {
WorkerData::Timestamp(timestamp) => {
if let Ok(mut chart_data) = cd.lock() {
if let Some(ts) = chart_data.start_date {
if timestamp < ts {
chart_data.start_date.replace(timestamp);
WorkerData::Timestamp(data) => {
for timestamp in data {
if let Ok(mut chart_data) = cd.lock() {
if let Some(ts) = chart_data.start_date {
if timestamp < ts {
chart_data.start_date.replace(timestamp);
}
} else {
chart_data.start_date.replace(timestamp.clone());
}
} else {
chart_data.start_date.replace(timestamp.clone());
}

if let Some(ts) = chart_data.end_date {
if timestamp > ts {
chart_data.end_date.replace(timestamp.clone());
if let Some(ts) = chart_data.end_date {
if timestamp > ts {
chart_data.end_date.replace(timestamp.clone());
}
} else {
chart_data.end_date.replace(timestamp);
}
} else {
chart_data.end_date.replace(timestamp);
}

let time = timestamp.time();
let ts = Utc.ymd(timestamp.year(),timestamp.month(),timestamp.day()).and_hms(time.hour(), time.minute(), 0);
for (_, data) in chart_data.data.iter_mut() {
if data.len() > 0 && !data.contains_key(&ts) {
data.insert(ts.clone(), 0);
let time = timestamp.time();
let ts = Utc.ymd(timestamp.year(),timestamp.month(),timestamp.day()).and_hms(time.hour(), time.minute(), 0);
for (_, data) in chart_data.data.iter_mut() {
if data.len() > 0 && !data.contains_key(&ts) {
data.insert(ts.clone(), 0);
}
}
}
}
Expand Down
96 changes: 55 additions & 41 deletions src/log_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use gtk4_helper::glib::SourceId;
use regex::Regex;
use sourceview5::Buffer;
use stream_cancel::Trigger;
use tokio_stream::wrappers::IntervalStream;
use crate::cluster_list_view::NamespaceViewData;
use crate::config::{CONFIG};
use crate::gtk::{TextIter, ToggleButton};
Expand Down Expand Up @@ -84,8 +85,8 @@ pub enum LogViewMsg {
PodSelected(Vec<PodViewData>),
ContextSelected(NamespaceViewData),
Loaded(Arc<Trigger>),
LogDataLoaded(LogData),
LogDataProcessed(i64, LogData),
LogDataLoaded(Vec<LogData>),
LogDataProcessed(Vec<(i64, LogData)>),
EnableScroll(bool),
WrapText(bool),
AppendContainerNames(bool),
Expand Down Expand Up @@ -212,7 +213,7 @@ impl LogView {
}

enum WorkerData {
ProcessLogData(LogData),
ProcessLogData(Vec<LogData>),
ProcessHighlighters(Vec<SearchData>, LogData, String),
Clear,
}
Expand Down Expand Up @@ -323,21 +324,26 @@ impl Component for LogView {
while let Ok(data) = w_rx.recv() {
match data {
WorkerData::ProcessLogData(data) => {
let timestamp = data.timestamp.timestamp_nanos();
let mut offset = search_offset(&log_entry_times, timestamp);
let len = log_entry_times.len();
while offset < len && log_entry_times[offset] == timestamp {
offset += 1;
}
log_entry_times.insert(offset, timestamp);
// We need to insert a extra entry for lines starting with a linefeed or a new line
if data.text.starts_with("\r") || data.text.starts_with("\n") {
// Sourceview seems to ignore those
if data.text != "\r\n" && data.text != "\n" {
log_entry_times.insert(offset, timestamp);
let mut res = vec![];
for datum in data {
let timestamp = datum.timestamp.timestamp_nanos();
let mut offset = search_offset(&log_entry_times, timestamp);
let len = log_entry_times.len();
while offset < len && log_entry_times[offset] == timestamp {
offset += 1;
}
log_entry_times.insert(offset, timestamp);
// We need to insert a extra entry for lines starting with a linefeed or a new line
if datum.text.starts_with("\r") || datum.text.starts_with("\n") {
// Sourceview seems to ignore those
if datum.text != "\r\n" && datum.text != "\n" {
log_entry_times.insert(offset, timestamp);
}
}
res.push((offset as i64, datum));
}
tx(LogViewMsg::LogDataProcessed(offset as i64, data))

tx(LogViewMsg::LogDataProcessed(res))
}
WorkerData::Clear => {
log_entry_times.clear();
Expand Down Expand Up @@ -395,37 +401,40 @@ impl Component for LogView {
self.exit_trigger = Some(exit_tx);
}
LogViewMsg::LogDataLoaded(data) => {
self.overview.update(LogOverviewMsg::LogData(data.timestamp.clone()));
let timestamps: Vec<DateTime<Utc>> = data.iter().map(|d| d.timestamp.clone()).collect();
self.overview.update(LogOverviewMsg::LogData(timestamps));
if let Err(e) = self.worker_action.send(WorkerData::ProcessLogData(data)) {
eprint!("Could not send msg to worker: {}", e);
}
}
LogViewMsg::LogDataProcessed(idx, data) => {
if let Some(mut insert_at) = self.text_buffer.iter_at_line(idx as i32) {
if !self.append_container_names {
self.text_buffer.insert(&mut insert_at, &format!("{} {}", data.pod, data.text));
} else {
self.text_buffer.insert(&mut insert_at, &format!("{}-{} {}", data.pod, data.container, data.text));
}
LogViewMsg::LogDataProcessed(res) => {
for (idx, data) in res {
if let Some(mut insert_at) = self.text_buffer.iter_at_line(idx as i32) {
if !self.append_container_names {
self.text_buffer.insert(&mut insert_at, &format!("{} {}", data.pod, data.text));
} else {
self.text_buffer.insert(&mut insert_at, &format!("{}-{} {}", data.pod, data.container, data.text));
}

let mut highlighters = self.highlighters.clone();
if let Some(query) = self.active_search.as_ref() {
highlighters.push(SearchData {
search: query.clone(),
name: SEARCH_TAG.to_string(),
});
}
let mut highlighters = self.highlighters.clone();
if let Some(query) = self.active_search.as_ref() {
highlighters.push(SearchData {
search: query.clone(),
name: SEARCH_TAG.to_string(),
});
}

let id = Uuid::new_v4().to_string();
if let Some(iter) = self.text_buffer.iter_at_line(insert_at.line() - 1) {
self.text_buffer.add_mark(&gtk::TextMark::new(Some(&id), false), &iter);
}
let id = Uuid::new_v4().to_string();
if let Some(iter) = self.text_buffer.iter_at_line(insert_at.line() - 1) {
self.text_buffer.add_mark(&gtk::TextMark::new(Some(&id), false), &iter);
}

if let Err(e) = self.worker_action.send(WorkerData::ProcessHighlighters(highlighters, data, id)) {
eprintln!("Could not send msg to worker: {}", e);
if let Err(e) = self.worker_action.send(WorkerData::ProcessHighlighters(highlighters, data, id)) {
eprintln!("Could not send msg to worker: {}", e);
}
} else {
eprintln!("No iter at line: {}", idx);
}
} else {
eprintln!("No iter at line: {}", idx);
}
}
LogViewMsg::HighlightResult(res) => {
Expand Down Expand Up @@ -545,10 +554,15 @@ async fn search(query: Regex, text: String) -> LogViewMsg {

async fn load_log_stream(ctx: NamespaceViewData, pods: Vec<PodViewData>, tx: Arc<dyn MsgHandler<LogViewMsg>>, since_seconds: u32) -> LogViewMsg {
let client = crate::log_stream::k8s_client(&ctx.config_path, &ctx.context);
let (mut log_stream, exit) = crate::log_stream::log_stream(&client, &ctx.name, pods, since_seconds).await;
let (log_stream, exit) = crate::log_stream::log_stream(&client, &ctx.name, pods, since_seconds).await;
let tx = tx.clone();
tokio::task::spawn(async move {
while let Some(data) = log_stream.next().await {
// Throttle the stream to keep the ui responsive.
let mut throttled_stream = StreamExt::zip(
StreamExt::ready_chunks(log_stream, 1000),
IntervalStream::new(tokio::time::interval(std::time::Duration::from_millis(50)))
);
while let Some((data, _)) = throttled_stream.next().await {
tx(LogViewMsg::LogDataLoaded(data));
}
});
Expand Down

0 comments on commit 80c75e8

Please sign in to comment.