Skip to content

Commit

Permalink
feat(k8s/log): Library now supports logging with per-line granularity.
Browse files Browse the repository at this point in the history
  • Loading branch information
sarub0b0 committed Jul 28, 2023
1 parent c786b51 commit 5d0a6b9
Showing 1 changed file with 7 additions and 62 deletions.
69 changes: 7 additions & 62 deletions src/event/kubernetes/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use chrono::Local;
use crossbeam::channel::Sender;
use futures::{future::try_join_all, StreamExt, TryStreamExt};
use futures::{future::try_join_all, AsyncBufReadExt, StreamExt, TryStreamExt};
use k8s_openapi::{
api::core::v1::{Container, ContainerState, ContainerStatus, Event as v1Event, Pod},
apimachinery::pkg::apis::meta::v1::Time,
Expand Down Expand Up @@ -832,75 +832,20 @@ impl Worker for FetchLogStream {

let prefix = self.prefix.clone().map(|p| p + " ").unwrap_or_default();

let mut logs = self.pod_api.log_stream(&self.pod_name, &lp).await?.boxed();
let mut logs = self.pod_api.log_stream(&self.pod_name, &lp).await?.lines();

let mut line_buffer = None;

// NOTE:
// ログが行区切りで取得できないため、行区切りになるように処理する。
// 受信したBytesの最後が改行コードでない場合に、中途半端な最終行をline_bufferに保存し次のループ時に先頭に追加する。
while let Some(bytes) = logs.try_next().await? {
let logs = String::from_utf8_lossy(&bytes);
while let Some(line) = logs.try_next().await? {
let mut buf = self.buf.write().await;

let mut lines: Vec<&str> = logs.lines().collect();

let mut remaining = None;
if let Some(b'\n') = bytes.last() {
// Do nothing
} else if let Some(pop) = lines.pop() {
remaining = Some(pop.to_string());
}
buf.push(format!("{}{}", prefix, line));

logger!(
debug,
"Container log stream [{}:{}] - logs: {}",
"Container log stream [{}:{}] - line: {}",
self.pod_name,
self.container_name,
logs
line
);

let mut buf = self.buf.write().await;

for line in lines {
if let Some(remaining) = line_buffer.take() {
buf.push(format!("{}{}{}", prefix, remaining, line));

logger!(
debug,
"Container log stream [{}:{}] - remaining: {}",
self.pod_name,
self.container_name,
remaining,
);
logger!(
debug,
"Container log stream [{}:{}] - line: {}",
self.pod_name,
self.container_name,
line
);
} else {
buf.push(format!("{}{}", prefix, line));

logger!(
debug,
"Container log stream [{}:{}] - line: {}",
self.pod_name,
self.container_name,
line
);
}
}

if let Some(remaining) = remaining {
line_buffer = Some(remaining);
}
}

if let Some(line) = line_buffer {
let mut buf = self.buf.write().await;

buf.push(line);
}

logger!(
Expand Down

0 comments on commit 5d0a6b9

Please sign in to comment.