Skip to content

Commit

Permalink
chore(observability): consolidate EventCountTags with `TaggedEvents…
Browse files Browse the repository at this point in the history
…Sent` (#17865)

@bruceg pointed out that the structs `EventCountTags` and
`TaggedEventsSent` are basically the same, so there is no need for them
to be separate structs.

This consolidates the two structs. The main change has been updating the
`registered_event` macro to derive `Clone, PartialEq, Eq, PartialOrd,
Ord, Hash` to the event structs. This hasn't caused anything to break,
but maybe it could cause problems down the line? `PartialOrd` and `Ord`
weren't strictly needed, but they are used in a test to sort the events
to ensure the test is deterministic.

---------

Signed-off-by: Stephen Wakely <[email protected]>
  • Loading branch information
StephenWakely authored Jul 21, 2023
1 parent 0bf6abd commit 81f5c50
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 83 deletions.
7 changes: 4 additions & 3 deletions lib/vector-common/src/internal_event/cached_event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::BTreeMap,
collections::HashMap,
hash::Hash,
sync::{Arc, RwLock},
};

Expand All @@ -22,7 +23,7 @@ pub struct RegisteredEventCache<T, Event: RegisterTaggedInternalEvent> {
fixed_tags: T,
cache: Arc<
RwLock<
BTreeMap<
HashMap<
<Event as RegisterTaggedInternalEvent>::Tags,
<Event as RegisterInternalEvent>::Handle,
>,
Expand All @@ -48,7 +49,7 @@ impl<Event, EventHandle, Data, Tags, FixedTags> RegisteredEventCache<FixedTags,
where
Data: Sized,
EventHandle: InternalEventHandle<Data = Data>,
Tags: Ord + Clone,
Tags: Clone + Eq + Hash,
FixedTags: Clone,
Event: RegisterInternalEvent<Handle = EventHandle>
+ RegisterTaggedInternalEvent<Tags = Tags, Fixed = FixedTags>,
Expand Down
22 changes: 14 additions & 8 deletions lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use metrics::{register_counter, Counter};
use tracing::trace;

use crate::{config::ComponentKey, request_metadata::EventCountTags};
use crate::config::ComponentKey;

use super::{CountByteSize, OptionalTag, Output, SharedString};

Expand Down Expand Up @@ -91,19 +91,25 @@ crate::registered_event!(
self.event_bytes.increment(byte_size.get() as u64);
}

fn register(_fixed: (), tags: EventCountTags) {
super::register(TaggedEventsSent::new(
tags,
))
fn register(_fixed: (), tags: TaggedEventsSent) {
super::register(tags)
}
);

impl TaggedEventsSent {
#[must_use]
pub fn new(tags: EventCountTags) -> Self {
pub fn new_empty() -> Self {
Self {
source: tags.source,
service: tags.service,
source: OptionalTag::Specified(None),
service: OptionalTag::Specified(None),
}
}

#[must_use]
pub fn new_unspecified() -> Self {
Self {
source: OptionalTag::Ignored,
service: OptionalTag::Ignored,
}
}
}
4 changes: 2 additions & 2 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,15 @@ impl From<Protocol> for SharedString {
macro_rules! registered_event {
// A registered event struct with no fields (zero-sized type).
($event:ident => $($tail:tt)*) => {
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct $event;

$crate::registered_event!(=> $event $($tail)*);
};

// A normal registered event struct.
($event:ident { $( $field:ident: $type:ty, )* } => $($tail:tt)*) => {
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct $event {
$( pub $field: $type, )*
}
Expand Down
62 changes: 20 additions & 42 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,18 @@
use std::collections::HashMap;
use std::ops::Add;
use std::{collections::HashMap, sync::Arc};

use crate::{
config::ComponentKey,
internal_event::{
CountByteSize, InternalEventHandle, OptionalTag, RegisterTaggedInternalEvent,
RegisteredEventCache,
CountByteSize, InternalEventHandle, RegisterTaggedInternalEvent, RegisteredEventCache,
TaggedEventsSent,
},
json_size::JsonSize,
};

/// Tags that are used to group the events within a batch for emitting telemetry.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EventCountTags {
pub source: OptionalTag<Arc<ComponentKey>>,
pub service: OptionalTag<String>,
}

impl EventCountTags {
#[must_use]
pub fn new_empty() -> Self {
Self {
source: OptionalTag::Specified(None),
service: OptionalTag::Specified(None),
}
}

#[must_use]
pub fn new_unspecified() -> Self {
Self {
source: OptionalTag::Ignored,
service: OptionalTag::Ignored,
}
}
}

/// Must be implemented by events to get the tags that will be attached to
/// the `component_sent_event_*` emitted metrics.
pub trait GetEventCountTags {
fn get_tags(&self) -> EventCountTags;
fn get_tags(&self) -> TaggedEventsSent;
}

/// Keeps track of the estimated json size of a given batch of events by
Expand All @@ -48,7 +22,7 @@ pub enum GroupedCountByteSize {
/// When we need to keep track of the events by certain tags we use this
/// variant.
Tagged {
sizes: HashMap<EventCountTags, CountByteSize>,
sizes: HashMap<TaggedEventsSent, CountByteSize>,
},
/// If we don't need to track the events by certain tags we can use
/// this variant to avoid allocating a `HashMap`,
Expand Down Expand Up @@ -86,7 +60,7 @@ impl GroupedCountByteSize {
/// Returns `None` if we are not tracking by tags.
#[must_use]
#[cfg(test)]
pub fn sizes(&self) -> Option<&HashMap<EventCountTags, CountByteSize>> {
pub fn sizes(&self) -> Option<&HashMap<TaggedEventsSent, CountByteSize>> {
match self {
Self::Tagged { sizes } => Some(sizes),
Self::Untagged { .. } => None,
Expand Down Expand Up @@ -131,7 +105,7 @@ impl GroupedCountByteSize {
/// Emits our counts to a `RegisteredEvent` cached event.
pub fn emit_event<T, H>(&self, event_cache: &RegisteredEventCache<(), T>)
where
T: RegisterTaggedInternalEvent<Tags = EventCountTags, Fixed = (), Handle = H>,
T: RegisterTaggedInternalEvent<Tags = TaggedEventsSent, Fixed = (), Handle = H>,
H: InternalEventHandle<Data = CountByteSize>,
{
match self {
Expand All @@ -141,7 +115,7 @@ impl GroupedCountByteSize {
}
}
GroupedCountByteSize::Untagged { size } => {
event_cache.emit(&EventCountTags::new_unspecified(), *size);
event_cache.emit(&TaggedEventsSent::new_unspecified(), *size);
}
}
}
Expand Down Expand Up @@ -177,21 +151,21 @@ impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {

// The following two scenarios shouldn't really occur in practice, but are provided for completeness.
(Self::Tagged { mut sizes }, Self::Untagged { size }) => {
match sizes.get_mut(&EventCountTags::new_empty()) {
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += *size,
None => {
sizes.insert(EventCountTags::new_empty(), *size);
sizes.insert(TaggedEventsSent::new_empty(), *size);
}
}

Self::Tagged { sizes }
}
(Self::Untagged { size }, Self::Tagged { sizes }) => {
let mut sizes = sizes.clone();
match sizes.get_mut(&EventCountTags::new_empty()) {
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += size,
None => {
sizes.insert(EventCountTags::new_empty(), size);
sizes.insert(TaggedEventsSent::new_empty(), size);
}
}

Expand Down Expand Up @@ -307,6 +281,10 @@ pub trait MetaDescriptive {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::{config::ComponentKey, internal_event::OptionalTag};

use super::*;

struct DummyEvent {
Expand All @@ -315,8 +293,8 @@ mod tests {
}

impl GetEventCountTags for DummyEvent {
fn get_tags(&self) -> EventCountTags {
EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
TaggedEventsSent {
source: self.source.clone(),
service: self.service.clone(),
}
Expand Down Expand Up @@ -380,14 +358,14 @@ mod tests {
assert_eq!(
vec![
(
EventCountTags {
TaggedEventsSent {
source: OptionalTag::Ignored,
service: Some("cabbage".to_string()).into()
},
CountByteSize(2, JsonSize::new(78))
),
(
EventCountTags {
TaggedEventsSent {
source: OptionalTag::Ignored,
service: Some("tomato".to_string()).into()
},
Expand Down
8 changes: 4 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use lookup::lookup_v2::TargetPath;
use lookup::PathPrefix;
use serde::{Deserialize, Serialize, Serializer};
use vector_common::{
internal_event::OptionalTag,
internal_event::{OptionalTag, TaggedEventsSent},
json_size::{JsonSize, NonZeroJsonSize},
request_metadata::{EventCountTags, GetEventCountTags},
request_metadata::GetEventCountTags,
EventDataEq,
};

Expand Down Expand Up @@ -215,7 +215,7 @@ impl EstimatedJsonEncodedSizeOf for LogEvent {
}

impl GetEventCountTags for LogEvent {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
let source = if telemetry().tags().emit_source {
self.metadata().source_id().cloned().into()
} else {
Expand All @@ -230,7 +230,7 @@ impl GetEventCountTags for LogEvent {
OptionalTag::Ignored
};

EventCountTags { source, service }
TaggedEventsSent { source, service }
}
}

Expand Down
8 changes: 4 additions & 4 deletions lib/vector-core/src/event/metric/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use std::{

use chrono::{DateTime, Utc};
use vector_common::{
internal_event::OptionalTag,
internal_event::{OptionalTag, TaggedEventsSent},
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags},
request_metadata::GetEventCountTags,
EventDataEq,
};
use vector_config::configurable_component;
Expand Down Expand Up @@ -483,7 +483,7 @@ impl Finalizable for Metric {
}

impl GetEventCountTags for Metric {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
let source = if telemetry().tags().emit_source {
self.metadata().source_id().cloned().into()
} else {
Expand All @@ -500,7 +500,7 @@ impl GetEventCountTags for Metric {
OptionalTag::Ignored
};

EventCountTags { source, service }
TaggedEventsSent { source, service }
}
}

Expand Down
9 changes: 3 additions & 6 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ use serde::{Deserialize, Serialize};
pub use trace::TraceEvent;
use vector_buffers::EventCount;
use vector_common::{
config::ComponentKey,
finalization,
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags},
EventDataEq,
config::ComponentKey, finalization, internal_event::TaggedEventsSent, json_size::JsonSize,
request_metadata::GetEventCountTags, EventDataEq,
};
pub use vrl::value::Value;
#[cfg(feature = "vrl")]
Expand Down Expand Up @@ -97,7 +94,7 @@ impl Finalizable for Event {
}

impl GetEventCountTags for Event {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
match self {
Event::Log(log) => log.get_tags(),
Event::Metric(metric) => metric.get_tags(),
Expand Down
5 changes: 2 additions & 3 deletions lib/vector-core/src/event/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use lookup::lookup_v2::TargetPath;
use serde::{Deserialize, Serialize};
use vector_buffers::EventCount;
use vector_common::{
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags},
internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags,
EventDataEq,
};

Expand Down Expand Up @@ -149,7 +148,7 @@ impl AsMut<LogEvent> for TraceEvent {
}

impl GetEventCountTags for TraceEvent {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
self.0.get_tags()
}
}
5 changes: 3 additions & 2 deletions src/sinks/elasticsearch/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::{io, io::Write};
use serde::Serialize;
use vector_buffers::EventCount;
use vector_common::{
internal_event::TaggedEventsSent,
json_size::JsonSize,
request_metadata::{EventCountTags, GetEventCountTags, GroupedCountByteSize},
request_metadata::{GetEventCountTags, GroupedCountByteSize},
};
use vector_core::{config::telemetry, event::Event, ByteSizeOf, EstimatedJsonEncodedSizeOf};

Expand Down Expand Up @@ -51,7 +52,7 @@ impl EventCount for ProcessedEvent {
}

impl GetEventCountTags for ProcessedEvent {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
self.log.get_tags()
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/loki/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub struct LokiRecord {
pub event: LokiEvent,
pub json_byte_size: JsonSize,
pub finalizers: EventFinalizers,
pub event_count_tags: EventCountTags,
pub event_count_tags: TaggedEventsSent,
}

impl ByteSizeOf for LokiRecord {
Expand Down Expand Up @@ -191,7 +191,7 @@ impl Finalizable for LokiRecord {
}

impl GetEventCountTags for LokiRecord {
fn get_tags(&self) -> EventCountTags {
fn get_tags(&self) -> TaggedEventsSent {
self.event_count_tags.clone()
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/sinks/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ pub use tower::{Service, ServiceBuilder};
pub use vector_buffers::EventCount;
pub use vector_common::{
finalization::{EventFinalizers, EventStatus, Finalizable},
internal_event::CountByteSize,
internal_event::{CountByteSize, TaggedEventsSent},
json_size::JsonSize,
request_metadata::{
EventCountTags, GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata,
},
request_metadata::{GetEventCountTags, GroupedCountByteSize, MetaDescriptive, RequestMetadata},
};
pub use vector_config::configurable_component;
pub use vector_core::{
Expand Down
Loading

0 comments on commit 81f5c50

Please sign in to comment.