Skip to content

Commit

Permalink
Add instrumentation
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <[email protected]>
  • Loading branch information
MOZGIII committed Jun 8, 2020
1 parent 2b600d8 commit 675a000
Show file tree
Hide file tree
Showing 16 changed files with 482 additions and 4 deletions.
5 changes: 4 additions & 1 deletion skaffold/manifests/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ metadata:
name: vector-config
data:
vector.toml: |
[sources.internal_metrics]
type = "internal_metrics"
[sinks.stdout]
type = "console"
inputs = ["kubernetes_logs"]
inputs = ["kubernetes_logs", "internal_metrics"]
target = "stdout"
encoding = "json"
24 changes: 24 additions & 0 deletions src/internal_events/kubernetes/api_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use super::InternalEvent;
use std::fmt::Debug;

#[derive(Debug)]
pub struct RequestPrepared<R> {
pub request: R,
}

impl<R: Debug> InternalEvent for RequestPrepared<R> {
fn emit_logs(&self) {
trace!(message = "request prepared", ?self.request);
}
}

#[derive(Debug)]
pub struct ResponseReceived<R> {
pub response: R,
}

impl<R: Debug> InternalEvent for ResponseReceived<R> {
fn emit_logs(&self) {
trace!(message = "got response", ?self.response);
}
}
39 changes: 39 additions & 0 deletions src/internal_events/kubernetes/instrumenting_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use super::InternalEvent;
use metrics::counter;
use std::fmt::Debug;

#[derive(Debug)]
pub struct StateItemAdded;

#[derive(Debug)]
pub struct StateItemUpdated;

#[derive(Debug)]
pub struct StateItemDeleted;

#[derive(Debug)]
pub struct StateResynced;

impl InternalEvent for StateItemAdded {
fn emit_metrics(&self) {
counter!("k8s_state_ops", 1, "op_kind" => "item_added");
}
}

impl InternalEvent for StateItemUpdated {
fn emit_metrics(&self) {
counter!("k8s_state_ops", 1, "op_kind" => "item_updated");
}
}

impl InternalEvent for StateItemDeleted {
fn emit_metrics(&self) {
counter!("k8s_state_ops", 1, "op_kind" => "item_deleted");
}
}

impl InternalEvent for StateResynced {
fn emit_metrics(&self) {
counter!("k8s_state_ops", 1, "op_kind" => "resynced");
}
}
43 changes: 43 additions & 0 deletions src/internal_events/kubernetes/instrumenting_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use super::InternalEvent;
use metrics::counter;
use std::fmt::Debug;

#[derive(Debug)]
pub struct WatchRequestInvoked;

impl InternalEvent for WatchRequestInvoked {
fn emit_metrics(&self) {
counter!("k8s_watch_request_invoked", 1);
}
}

#[derive(Debug)]
pub struct WatchRequestInvocationFailed<E> {
pub error: E,
}

impl<E: Debug> InternalEvent for WatchRequestInvocationFailed<E> {
fn emit_logs(&self) {
error!(message = "watch invocation failed", ?self.error);
}
}

#[derive(Debug)]
pub struct WatchStreamItemObtained;

impl InternalEvent for WatchStreamItemObtained {
fn emit_metrics(&self) {
counter!("k8s_watch_stream_items_obtained", 1);
}
}

#[derive(Debug)]
pub struct WatchStreamErrored<E> {
pub error: E,
}

impl<E: Debug> InternalEvent for WatchStreamErrored<E> {
fn emit_logs(&self) {
error!(message = "watch stream errored", ?self.error);
}
}
9 changes: 9 additions & 0 deletions src/internal_events/kubernetes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#![cfg(feature = "kubernetes")]

use super::InternalEvent;

pub mod api_watcher;
pub mod instrumenting_state;
pub mod instrumenting_watcher;
pub mod reflector;
pub mod stream;
19 changes: 19 additions & 0 deletions src/internal_events/kubernetes/reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use super::InternalEvent;
use metrics::counter;

/// Emitted when reflector gets a desync from the watch command.
#[derive(Debug)]
pub struct DesyncReceived<E> {
/// The underlying error.
pub error: E,
}

impl<E: std::fmt::Debug> InternalEvent for DesyncReceived<E> {
fn emit_logs(&self) {
warn!(message = "handling desync", error = ?self.error);
}

fn emit_metrics(&self) {
counter!("k8s_reflector_desyncs", 1);
}
}
14 changes: 14 additions & 0 deletions src/internal_events/kubernetes/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use super::InternalEvent;
use metrics::counter;

#[derive(Debug)]
pub struct ChunkProcessed {
pub byte_size: usize,
}

impl InternalEvent for ChunkProcessed {
fn emit_metrics(&self) {
counter!("k8s_stream_chunks_processed", 1);
counter!("k8s_stream_bytes_processed", self.byte_size as u64);
}
}
2 changes: 2 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ mod udp;
mod unix;
mod vector;

pub mod kubernetes;

pub use self::add_fields::*;
pub use self::aws_kinesis_streams::*;
pub use self::blackhole::*;
Expand Down
7 changes: 5 additions & 2 deletions src/kubernetes/api_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::{
watch_request_builder::WatchRequestBuilder,
watcher::{self, Watcher},
};
use crate::internal_events::kubernetes::api_watcher as internal_events;
use futures::{
future::BoxFuture,
stream::{BoxStream, Stream},
Expand Down Expand Up @@ -59,15 +60,17 @@ where
.request_builder
.build(watch_optional)
.context(invocation::RequestPreparation)?;
trace!(message = "request prepared", ?request);
emit!(internal_events::RequestPrepared { request: &request });

// Send request, get response.
let response = self
.client
.send(request)
.await
.context(invocation::Request)?;
trace!(message = "got response", ?response);
emit!(internal_events::ResponseReceived {
response: &response
});

// Handle response status code.
let status = response.status();
Expand Down
67 changes: 67 additions & 0 deletions src/kubernetes/instrumenting_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! A watcher that adds instrumentation.
use super::watcher::{self, Watcher};
use crate::internal_events::kubernetes::instrumenting_watcher as internal_events;
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, StreamExt};
use k8s_openapi::{WatchOptional, WatchResponse};

/// A watcher that wraps another watcher with instrumentation calls.
pub struct InstrumentingWatcher<T>
where
T: Watcher,
{
inner: T,
}

impl<T> InstrumentingWatcher<T>
where
T: Watcher,
{
/// Create a new [`InstrumentingWatcher`].
pub fn new(inner: T) -> Self {
Self { inner }
}
}

impl<T> Watcher for InstrumentingWatcher<T>
where
T: Watcher,
<T as Watcher>::Stream: 'static,
{
type Object = <T as Watcher>::Object;

type InvocationError = <T as Watcher>::InvocationError;

type StreamError = <T as Watcher>::StreamError;
type Stream = BoxStream<'static, Result<WatchResponse<Self::Object>, Self::StreamError>>;

fn watch<'a>(
&'a mut self,
watch_optional: WatchOptional<'a>,
) -> BoxFuture<'a, Result<Self::Stream, watcher::invocation::Error<Self::InvocationError>>>
{
Box::pin(self.inner.watch(watch_optional).map(|result| {
result
.map(|stream| {
emit!(internal_events::WatchRequestInvoked);
Box::pin(stream.map(|item_result| {
item_result
.map(|item| {
emit!(internal_events::WatchStreamItemObtained);
item
})
.map_err(|error| {
emit!(internal_events::WatchRequestInvocationFailed {
error: &error
});
error
})
})) as BoxStream<'static, _>
})
.map_err(|error| {
emit!(internal_events::WatchRequestInvocationFailed { error: &error });
error
})
}))
}
}
1 change: 1 addition & 0 deletions src/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod api_watcher;
pub mod client;
pub mod delayed_delete;
pub mod hash_value;
pub mod instrumenting_watcher;
pub mod mock_watcher;
pub mod multi_response_decoder;
pub mod reflector;
Expand Down
13 changes: 12 additions & 1 deletion src/kubernetes/reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{
resource_version,
watcher::{self, Watcher},
};
use crate::internal_events::kubernetes::reflector as internal_events;
use futures::{
pin_mut,
stream::{Stream, StreamExt},
Expand Down Expand Up @@ -90,7 +91,7 @@ where
let stream = match invocation_result {
Ok(val) => val,
Err(watcher::invocation::Error::Desync { source }) => {
warn!(message = "handling desync", error = ?source);
emit!(internal_events::DesyncReceived { error: source });
// We got desynced, reset the state and retry fetching.
// By omiting the flush here, we cache the results from the
// previous run until flush is issued when the new events
Expand Down Expand Up @@ -278,6 +279,7 @@ mod tests {
};
use crate::{
kubernetes::{
instrumenting_watcher::InstrumentingWatcher,
mock_watcher::{self, MockWatcher},
state,
},
Expand Down Expand Up @@ -356,11 +358,13 @@ mod tests {
let (state_events_tx, _state_events_rx) = mpsc::channel(0);
let (_state_action_tx, state_action_rx) = mpsc::channel(0);
let state_writer = state::mock::Writer::new(state_events_tx, state_action_rx);
let state_writer = state::instrumenting::Writer::new(state_writer);

// Prepare watcher.
let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0);
let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0);
let watcher = MockWatcher::<Pod>::new(watcher_events_tx, watcher_invocations_rx);
let watcher = InstrumentingWatcher::new(watcher);

// Prepare reflector.
let mut reflector = Reflector::new(
Expand Down Expand Up @@ -521,6 +525,7 @@ mod tests {
let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0);
let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0);
let watcher = MockWatcher::<Pod>::new(watcher_events_tx, watcher_invocations_rx);
let watcher = InstrumentingWatcher::new(watcher);

// Prepare reflector.
let mut reflector = Reflector::new(
Expand Down Expand Up @@ -592,11 +597,13 @@ mod tests {
let (state_events_tx, mut state_events_rx) = mpsc::channel(0);
let (mut state_action_tx, state_action_rx) = mpsc::channel(0);
let state_writer = state::mock::Writer::new(state_events_tx, state_action_rx);
let state_writer = state::instrumenting::Writer::new(state_writer);

// Prepare watcher.
let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0);
let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0);
let watcher = MockWatcher::<Pod>::new(watcher_events_tx, watcher_invocations_rx);
let watcher = InstrumentingWatcher::new(watcher);

// Prepare reflector.
let deletion_delay = Duration::from_secs(600);
Expand Down Expand Up @@ -745,11 +752,13 @@ mod tests {
let (state_events_tx, _state_events_rx) = mpsc::channel(0);
let (_state_action_tx, state_action_rx) = mpsc::channel(0);
let state_writer = state::mock::Writer::new(state_events_tx, state_action_rx);
let state_writer = state::instrumenting::Writer::new(state_writer);

// Prepare watcher.
let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0);
let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0);
let watcher = MockWatcher::<Pod>::new(watcher_events_tx, watcher_invocations_rx);
let watcher = InstrumentingWatcher::new(watcher);

// Prepare reflector.
let mut reflector = Reflector::new(
Expand Down Expand Up @@ -821,13 +830,15 @@ mod tests {
// Prepare state.
let (state_reader, state_writer) = evmap10::new();
let state_writer = Writer::new(state_writer);
let state_writer = state::instrumenting::Writer::new(state_writer);
let resulting_state_reader = state_reader.clone();

// Prepare watcher.
let (watcher_events_tx, mut watcher_events_rx) = mpsc::channel(0);
let (mut watcher_invocations_tx, watcher_invocations_rx) = mpsc::channel(0);
let watcher: MockWatcher<Pod> =
MockWatcher::new(watcher_events_tx, watcher_invocations_rx);
let watcher = InstrumentingWatcher::new(watcher);

// Prepare reflector.
let mut reflector = Reflector::new(
Expand Down
Loading

0 comments on commit 675a000

Please sign in to comment.