Skip to content

Commit

Permalink
chore(core): add more event metadata to proto
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Oct 13, 2023
1 parent 3c4ae86 commit ed12d34
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 113 deletions.
19 changes: 17 additions & 2 deletions lib/vector-core/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ message Log {
// Deprecated, use value instead
map<string, Value> fields = 1;
Value value = 2;
Value metadata = 3;
Value metadata = 3 [deprecated = true];
Metadata metadata_full = 4;
}

message Trace {
map<string, Value> fields = 1;
Value metadata = 2;
Value metadata = 2 [deprecated = true];
Metadata metadata_full = 3;
}

message ValueMap {
Expand Down Expand Up @@ -75,9 +77,22 @@ message DatadogOriginMetadata {
optional uint32 origin_service = 3;
}

message Secrets {
map<string, string> entries = 1;
}

message OutputId {
string component = 1;
optional string port = 2;
}

message Metadata {
Value value = 1;
DatadogOriginMetadata datadog_origin_metadata = 2;
optional string source_id = 3;
optional string source_type = 4;
OutputId upstream_id = 5;
Secrets secrets = 6;
}

message Metric {
Expand Down
9 changes: 9 additions & 0 deletions lib/vector-core/src/config/output_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ impl From<(&ComponentKey, String)> for OutputId {
}
}

impl From<(String, Option<String>)> for OutputId {
fn from((component, port): (String, Option<String>)) -> Self {
Self {
component: component.into(),
port,
}
}
}

// This panicking implementation is convenient for testing, but should never be enabled for use
// outside of tests.
#[cfg(any(test, feature = "test"))]
Expand Down
127 changes: 110 additions & 17 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#![deny(missing_docs)]

use std::{collections::BTreeMap, sync::Arc};
use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc};

use serde::{Deserialize, Serialize};
use vector_common::{config::ComponentKey, EventDataEq};
use vrl::value::{Kind, Secrets, Value};
use vrl::{
compiler::SecretTarget,
value::{Kind, Value},
};

use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus};
use crate::{
Expand All @@ -20,27 +23,27 @@ const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct EventMetadata {
/// Arbitrary data stored with an event
#[serde(default = "default_metadata_value", skip)]
value: Value,
#[serde(default = "default_metadata_value")]
pub(crate) value: Value,

/// Storage for secrets
#[serde(default, skip)]
secrets: Secrets,
#[serde(default)]
pub(crate) secrets: Secrets,

#[serde(default, skip)]
finalizers: EventFinalizers,

/// The id of the source
source_id: Option<Arc<ComponentKey>>,
pub(crate) source_id: Option<Arc<ComponentKey>>,

/// The type of the source
source_type: Option<&'static str>,
pub(crate) source_type: Option<Cow<'static, str>>,

/// The id of the component this event originated from. This is used to
/// determine which schema definition to attach to an event in transforms.
/// This should always have a value set for events in transforms. It will always be `None`
/// in a source, and there is currently no use-case for reading the value in a sink.
upstream_id: Option<Arc<OutputId>>,
pub(crate) upstream_id: Option<Arc<OutputId>>,

/// An identifier for a globally registered schema definition which provides information about
/// the event shape (type information, and semantic meaning of fields).
Expand All @@ -61,7 +64,7 @@ pub struct EventMetadata {
/// Metadata to track the origin of metrics. This is always `None` for log and trace events.
/// Only a small set of Vector sources and transforms explicitly set this field.
#[serde(default)]
datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
pub(crate) datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
}

/// Metric Origin metadata for submission to Datadog.
Expand Down Expand Up @@ -147,8 +150,8 @@ impl EventMetadata {

/// Returns a reference to the metadata source type.
#[must_use]
pub fn source_type(&self) -> Option<&'static str> {
self.source_type
pub fn source_type(&self) -> Option<&str> {
self.source_type.as_deref()
}

/// Returns a reference to the metadata parent id. This is the `OutputId`
Expand All @@ -164,8 +167,8 @@ impl EventMetadata {
}

/// Sets the `source_type` in the metadata to the provided value.
pub fn set_source_type(&mut self, source_type: &'static str) {
self.source_type = Some(source_type);
pub fn set_source_type<S: Into<Cow<'static, str>>>(&mut self, source_type: S) {
self.source_type = Some(source_type.into());
}

/// Sets the `upstream_id` in the metadata to the provided value.
Expand Down Expand Up @@ -283,8 +286,8 @@ impl EventMetadata {

/// Replaces the existing `source_type` with the given one.
#[must_use]
pub fn with_source_type(mut self, source_type: &'static str) -> Self {
self.source_type = Some(source_type);
pub fn with_source_type<S: Into<Cow<'static, str>>>(mut self, source_type: S) -> Self {
self.source_type = Some(source_type.into());
self
}

Expand Down Expand Up @@ -374,6 +377,79 @@ impl<T> WithMetadata<T> {
}
}

/// A container that holds secrets.
#[derive(Clone, Default, Deserialize, Eq, PartialEq, PartialOrd, Serialize)]
pub struct Secrets(BTreeMap<String, Arc<str>>);

This comment has been minimized.

Copy link
@bruceg

bruceg Oct 13, 2023

Member

I presume this was largely just copied in from the VRL type? Would it make sense to break this out into its own module like it was in VRL?


impl fmt::Debug for Secrets {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut map = f.debug_map();
for key in self.0.keys() {
map.entry(key, &"<redacted secret>");
}
map.finish()
}
}

impl Secrets {
/// Creates a new, empty container.
#[must_use]
pub fn new() -> Self {
Self(BTreeMap::new())
}

/// Returns `true` if the container contains no secrets.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Gets a secret by its name.
#[must_use]
pub fn get(&self, key: &str) -> Option<&Arc<str>> {
self.0.get(key)
}

/// Inserts a new secret into the container.
pub fn insert(&mut self, key: impl Into<String>, value: impl Into<Arc<str>>) {
self.0.insert(key.into(), value.into());
}

/// Removes a secret
pub fn remove(&mut self, key: &str) {
self.0.remove(&key.to_owned());
}

/// Merged both together. If there are collisions, the value from `self` is kept.
pub fn merge(&mut self, other: Self) {
for (key, value) in other.0 {
self.0.entry(key).or_insert(value);
}
}
}

impl SecretTarget for Secrets {
fn get_secret(&self, key: &str) -> Option<&str> {
self.get(key).map(AsRef::as_ref)
}

fn insert_secret(&mut self, key: &str, value: &str) {
self.insert(key, value);
}

fn remove_secret(&mut self, key: &str) {
self.remove(key);
}
}

impl IntoIterator for Secrets {
type Item = (String, Arc<str>);
type IntoIter = std::collections::btree_map::IntoIter<String, Arc<str>>;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -382,11 +458,28 @@ mod test {
const SECRET2: &str = "secret2";

#[test]
fn get_set_secret() {
fn metadata_hardcoded_secrets_get_set() {
let mut metadata = EventMetadata::default();
metadata.set_datadog_api_key(Arc::from(SECRET));
metadata.set_splunk_hec_token(Arc::from(SECRET2));
assert_eq!(metadata.datadog_api_key().unwrap().as_ref(), SECRET);
assert_eq!(metadata.splunk_hec_token().unwrap().as_ref(), SECRET2);
}

#[test]
fn secrets_merge() {
let mut a = Secrets::new();
a.insert("key-a", "value-a1");
a.insert("key-b", "value-b1");

let mut b = Secrets::new();
b.insert("key-b", "value-b2");
b.insert("key-c", "value-c2");

a.merge(b);

assert_eq!(a.get("key-a").unwrap().as_ref(), "value-a1");
assert_eq!(a.get("key-b").unwrap().as_ref(), "value-b1");
assert_eq!(a.get("key-c").unwrap().as_ref(), "value-c2");
}
}
Loading

0 comments on commit ed12d34

Please sign in to comment.