From 6fe6fdf3b16a84c01bed5de72f752ed37e514d93 Mon Sep 17 00:00:00 2001 From: MOZGIII Date: Thu, 21 May 2020 22:18:09 +0300 Subject: [PATCH] Add kubernetes logs source implementation Signed-off-by: MOZGIII --- .../kubernetes/vector-namespaced.yaml | 6 +- src/internal_events/kubernetes_logs.rs | 31 +++ src/internal_events/mod.rs | 4 + src/sources/kubernetes_logs/mod.rs | 232 ++++++++++++++++++ 4 files changed, 270 insertions(+), 3 deletions(-) create mode 100644 src/internal_events/kubernetes_logs.rs diff --git a/distribution/kubernetes/vector-namespaced.yaml b/distribution/kubernetes/vector-namespaced.yaml index dec678f2bb6a9f..4fd6581be9f3c7 100644 --- a/distribution/kubernetes/vector-namespaced.yaml +++ b/distribution/kubernetes/vector-namespaced.yaml @@ -11,8 +11,8 @@ data: data_dir = "/vector-data-dir" # Ingest logs from Kubernetes. - [sources.kubernetes] - type = "kubernetes" + [sources.kubernetes_logs] + type = "kubernetes_logs" --- apiVersion: apps/v1 kind: DaemonSet @@ -28,11 +28,11 @@ spec: metadata: labels: name: vector + vector.dev/exclude: "true" spec: containers: - name: vector image: timberio/vector:latest-alpine - imagePullPolicy: Always args: - --config - /etc/vector/*.toml diff --git a/src/internal_events/kubernetes_logs.rs b/src/internal_events/kubernetes_logs.rs new file mode 100644 index 00000000000000..0103b6e8435014 --- /dev/null +++ b/src/internal_events/kubernetes_logs.rs @@ -0,0 +1,31 @@ +use super::InternalEvent; +use metrics::counter; + +#[derive(Debug)] +pub struct KubernetesLogsEventReceived<'a> { + pub file: &'a str, + pub byte_size: usize, +} + +impl InternalEvent for KubernetesLogsEventReceived<'_> { + fn emit_logs(&self) { + trace!( + message = "received one event.", + %self.file, + rate_limit_secs = 10 + ); + } + + fn emit_metrics(&self) { + counter!( + "events_processed", 1, + "component_kind" => "source", + "component_type" => "kubernetes_logs", + ); + counter!( + "bytes_processed", self.byte_size as u64, + "component_kind" => "source", + "component_type" => "kubernetes_logs", + ); + } +} diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 532039b9bf0984..3f9de6d7fbb5da 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -4,6 +4,8 @@ mod blackhole; mod elasticsearch; mod file; mod json; +#[cfg(feature = "sources-kubernetes-logs")] +mod kubernetes_logs; #[cfg(feature = "transforms-lua")] mod lua; #[cfg(feature = "sources-prometheus")] @@ -22,6 +24,8 @@ pub use self::blackhole::*; pub use self::elasticsearch::*; pub use self::file::*; pub use self::json::*; +#[cfg(feature = "sources-kubernetes-logs")] +pub use self::kubernetes_logs::*; #[cfg(feature = "transforms-lua")] pub use self::lua::*; #[cfg(feature = "sources-prometheus")] diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index e7501532e6d962..dc7d6e509afffd 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -10,4 +10,236 @@ mod partial_events_merger; mod path_helpers; mod pod_metadata_annotator; +use crate::event::{self, Event}; +use crate::internal_events::KubernetesLogsEventReceived; +use crate::kubernetes as k8s; +use crate::{ + dns::Resolver, + shutdown::ShutdownSignal, + sources, + topology::config::{DataType, GlobalOptions, SourceConfig, SourceDescription}, + transforms::Transform, +}; +use evmap10::{self as evmap}; +use file_source::{FileServer, FileServerShutdown, Fingerprinter}; +use futures::{future::FutureExt, sink::Sink, stream::StreamExt}; +use futures01::sync::mpsc; +use k8s_paths_provider::K8sPathsProvider; +use pod_metadata_annotator::PodMetadataAnnotator; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::time::Duration; +use tokio::task::spawn_blocking; + +/// Configuration for the `kubernetes_logs` source. +#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[serde(deny_unknown_fields, default)] +pub struct Config { + self_node_name: String, +} + +inventory::submit! { + SourceDescription::new_without_default::(COMPONENT_NAME) +} + +const COMPONENT_NAME: &str = "kubernetes_logs"; + +#[typetag::serde(name = "kubernetes_logs")] +impl SourceConfig for Config { + fn build( + &self, + name: &str, + globals: &GlobalOptions, + shutdown: ShutdownSignal, + out: mpsc::Sender, + ) -> crate::Result { + // TODO: switch to the ones provided by the topology once PRs for that + // pass. + let rt = crate::runtime::Runtime::new()?; + let exec = rt.executor(); + let resolver = Resolver::new(globals.dns_servers.clone(), exec)?; + + let source = Source::init(self, resolver, globals, name)?; + + // TODO: this is a workaround for the legacy futures 0.1. + // When the core is updated to futures 0.3 this should be simplied + // significantly. + let out = futures::compat::Compat01As03Sink::new(out); + let fut = source.run(out, shutdown); + let fut = fut.map(|result| { + result.map_err(|error| { + error!(message = "source future failed", ?error); + }) + }); + let fut = Box::pin(fut); + let fut = futures::compat::Compat::new(fut); + let fut: sources::Source = Box::new(fut); + Ok(fut) + } + + fn output_type(&self) -> DataType { + DataType::Log + } + + fn source_type(&self) -> &'static str { + COMPONENT_NAME + } +} + +#[derive(Clone)] +struct Source { + client: k8s::client::Client, + self_node_name: String, + data_dir: PathBuf, +} + +impl Source { + fn init( + config: &Config, + resolver: Resolver, + globals: &GlobalOptions, + name: &str, + ) -> crate::Result { + let self_node_name = if config.self_node_name.is_empty() { + std::env::var("VECTOR_SELF_NODE_NAME") + .map_err(|_| "VECTOR_SELF_NODE_NAME is not set")? + } else { + config.self_node_name.clone() + }; + info!( + message = "obtained Kubernetes Node name to collect logs for (self)", + ?self_node_name + ); + + let k8s_config = k8s::client::config::Config::in_cluster()?; + let client = k8s::client::Client::new(k8s_config, resolver)?; + + let data_dir = globals.resolve_and_make_data_subdir(None, name)?; + + Ok(Self { + client, + self_node_name, + data_dir, + }) + } + + async fn run(self, out: O, shutdown: ShutdownSignal) -> crate::Result<()> + where + O: Sink + Send, + { + let Self { + client, + self_node_name, + data_dir, + } = self; + + let field_selector = format!("spec.nodeName={}", self_node_name); + let label_selector = "vector.dev/exclude!=true".to_owned(); + + let watcher = k8s::api_watcher::ApiWatcher::new(client); + let (state_reader, state_writer) = evmap::new(); + + let mut reflector = k8s::reflector::Reflector::new( + watcher, + state_writer, + Some(field_selector), + Some(label_selector), + std::time::Duration::from_secs(1), + ); + let reflector_process = reflector.run(); + + let paths_provider = K8sPathsProvider::new(state_reader.clone()); + let annotator = PodMetadataAnnotator::new(state_reader); + + // TODO: maybe some of the parameters have to be configurable. + let file_server = FileServer { + paths_provider, + max_read_bytes: 2048, + start_at_beginning: true, + ignore_before: None, + max_line_bytes: 32 * 1024, // 32 KiB + data_dir, + glob_minimum_cooldown: Duration::from_secs(10), + fingerprinter: Fingerprinter::Checksum { + fingerprint_bytes: 256, + ignored_header_bytes: 0, + }, + oldest_first: false, + }; + + let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel(100); + + let span = info_span!("file_server"); + let file_server_join_handle = spawn_blocking(move || { + let _enter = span.enter(); + let result = + file_server.run(file_source_tx, futures::compat::Compat01As03::new(shutdown)); + result.unwrap() + }); + + let mut parser = parser::build(); + let mut partial_events_merger = partial_events_merger::build(true); + + let events = file_source_rx.map(|(bytes, file)| { + emit!(KubernetesLogsEventReceived { + file: &file, + byte_size: bytes.len(), + }); + create_event(bytes, file) + }); + let events = events + .filter_map(move |event| futures::future::ready(parser.transform(event))) + .filter_map(move |event| futures::future::ready(partial_events_merger.transform(event))) + .map(move |mut event| { + if annotator.annotate(&mut event).is_none() { + warn!( + message = "failed to annotate event with pod metadata", + ?event + ); + } + event + }); + + let event_processing_loop = events.map(Ok).forward(out); + + use std::future::Future; + use std::pin::Pin; + let list: Vec + Send>>> = vec![ + Box::pin(reflector_process.map(|_| todo!())), + Box::pin(file_server_join_handle.map(|result| match result { + Ok(FileServerShutdown) => info!(message = "file server completed gracefully"), + Err(error) => error!(message = "file server exited with an error", ?error), + })), + Box::pin(event_processing_loop.map(|result| { + match result { + Ok(_) => info!(message = "event_processing_loop ok"), + Err(_) => error!(message = "event_processing_loop err"), + }; + })), + ]; + let mut futs: futures::stream::FuturesUnordered<_> = list.into_iter().collect(); + + while let Some(()) = futs.next().await { + trace!("another future complete"); + } + + info!("Done"); + Ok(()) + } +} + const FILE_KEY: &str = "file"; + +fn create_event(line: bytes::Bytes, file: String) -> Event { + let mut event = Event::from(line); + + // Add source type. + event + .as_mut_log() + .insert(event::log_schema().source_type_key(), COMPONENT_NAME); + + // Add file. + event.as_mut_log().insert(FILE_KEY, file); + + event +}