Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spans): Send outcomes for spans #2930

Merged
merged 23 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
655899f
feat(spans): Send outcomes for spans
phacops Jan 10, 2024
e636163
Add a CHANGELOG entry
phacops Jan 10, 2024
72a5caf
Use ItemAction::Accept
phacops Jan 10, 2024
a1e768a
Add a feature guard for all Accepted outcome use
phacops Jan 10, 2024
b23750c
Send Accepted outcome after producing the span
phacops Jan 11, 2024
a386f11
Remove useless change to file
phacops Jan 11, 2024
5b50350
Record an outcome for failure to parse the span to a SpanKafkaMessage
phacops Jan 11, 2024
5ac9444
Add a SpanIndexed data category and use it where appropriate
phacops Jan 11, 2024
b0c5709
Merge branch 'master' into pierre/spans-send-outcomes
phacops Jan 11, 2024
cbcfa6e
Merge branch 'master' into pierre/spans-send-outcomes
phacops Jan 11, 2024
7cd8eab
Emit an invalid outcome if span extraction fails
phacops Jan 11, 2024
872eb2e
Use item state to determine if the item is indexed or not
phacops Jan 11, 2024
5b29743
Set metrics as extracted when extracting spans from transactions
phacops Jan 11, 2024
5d8f092
Update relay-server/src/actors/processor/span/processing.rs
phacops Jan 12, 2024
f223a4e
Refactor how we choose the right category
phacops Jan 12, 2024
250c858
Refactor how we choose the right category
phacops Jan 12, 2024
28eebaa
Remove high cardinality fields from outcome to make sure they can be …
phacops Jan 12, 2024
935a512
Filter high-cardinality fields for Accepted outcomes automatically
phacops Jan 12, 2024
d0477a7
Merge branch 'master' into pierre/spans-send-outcomes
phacops Jan 12, 2024
0e1618a
Lint code
phacops Jan 12, 2024
870bbc9
Remove processing guard
phacops Jan 12, 2024
34cd0c4
Add tests
phacops Jan 15, 2024
678dc6b
Merge branch 'master' into pierre/spans-send-outcomes
phacops Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- Make Kafka spans compatible with the Snuba span schema. ([#2917](https://github.com/getsentry/relay/pull/2917), [#2926](https://github.com/getsentry/relay/pull/2926))
- Only extract span metrics / tags when they are needed. ([#2907](https://github.com/getsentry/relay/pull/2907), [#2923](https://github.com/getsentry/relay/pull/2923), [#2924](https://github.com/getsentry/relay/pull/2924))
- Normalize metric resource identifiers in `event._metrics_summary` and `span._metrics_summary`. ([#2914](https://github.com/getsentry/relay/pull/2914))
- Send outcomes for spans. ([#2930](https://github.com/getsentry/relay/pull/2930))
- Validate error_id and trace_id vectors in replay deserializer. ([#2931](https://github.com/getsentry/relay/pull/2931))
- Add a data category for indexed spans. ([#2937](https://github.com/getsentry/relay/pull/2937))

Expand Down
22 changes: 16 additions & 6 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ impl FromMessage<Self> for TrackOutcome {
/// Defines the possible outcomes from processing an event.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Outcome {
// /// The event has been accepted and handled completely.
// ///
// /// This is never emitted by Relay as the event may be discarded by the processing pipeline
// /// after Relay. Only the `save_event` task in Sentry finally accepts an event.
// #[allow(dead_code)]
// Accepted,
/// The event has been accepted and handled completely.
///
/// This is only emitted for items going from Relay to Snuba directly.
#[cfg(feature = "processing")]
Accepted,

/// The event has been filtered due to a configured filter.
Filtered(FilterStatKey),

Expand Down Expand Up @@ -183,6 +183,8 @@ impl Outcome {
Outcome::Invalid(_) => OutcomeId::INVALID,
Outcome::Abuse => OutcomeId::ABUSE,
Outcome::ClientDiscard(_) => OutcomeId::CLIENT_DISCARD,
#[cfg(feature = "processing")]
Outcome::Accepted => OutcomeId::ACCEPTED,
}
}

Expand All @@ -198,6 +200,8 @@ impl Outcome {
.map(|code| Cow::Owned(code.as_str().into())),
Outcome::ClientDiscard(ref discard_reason) => Some(Cow::Borrowed(discard_reason)),
Outcome::Abuse => None,
#[cfg(feature = "processing")]
Outcome::Accepted => None,
}
}

Expand Down Expand Up @@ -229,6 +233,8 @@ impl fmt::Display for Outcome {
Outcome::Invalid(reason) => write!(f, "invalid data ({reason})"),
Outcome::Abuse => write!(f, "abuse limit reached"),
Outcome::ClientDiscard(reason) => write!(f, "discarded by client ({reason})"),
#[cfg(feature = "processing")]
Outcome::Accepted => write!(f, "accepted"),
}
}
}
Expand Down Expand Up @@ -356,6 +362,9 @@ pub enum DiscardReason {

/// (Relay) Profiling related discard reasons
Profiling(&'static str),

/// (Relay) A span is not valid after normalization.
InvalidSpan,
}

impl DiscardReason {
Expand Down Expand Up @@ -398,6 +407,7 @@ impl DiscardReason {
DiscardReason::InvalidReplayEventPii => "invalid_replay_pii_scrubber_failed",
DiscardReason::InvalidReplayRecordingEvent => "invalid_replay_recording",
DiscardReason::Profiling(reason) => reason,
DiscardReason::InvalidSpan => "invalid_span",
}
}
}
Expand Down
27 changes: 21 additions & 6 deletions relay-server/src/actors/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp
use relay_pii::PiiProcessor;
use relay_protocol::{Annotated, Empty};

use crate::actors::outcome::{DiscardReason, Outcome};
use crate::actors::processor::{ProcessEnvelopeState, ProcessingError};
use crate::envelope::{ContentType, Item, ItemType};
use crate::metrics_extraction::generic::extract_metrics;
Expand Down Expand Up @@ -45,15 +46,15 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {
Ok(otel_span) => Annotated::new(otel_span.into()),
Err(err) => {
relay_log::debug!("failed to parse OTel span: {}", err);
return ItemAction::DropSilently;
return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
}
}
}
ItemType::Span => match Annotated::<Span>::from_json_bytes(&item.payload()) {
Ok(span) => span,
Err(err) => {
relay_log::debug!("failed to parse span: {}", err);
return ItemAction::DropSilently;
return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson));
}
},

Expand All @@ -62,16 +63,17 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {

if let Err(e) = normalize(&mut annotated_span, config.clone()) {
relay_log::debug!("failed to normalize span: {}", e);
return ItemAction::DropSilently;
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};

let Some(span) = annotated_span.value_mut() else {
return ItemAction::DropSilently;
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
};

if let Some(config) = span_metrics_extraction_config {
let metrics = extract_metrics(span, config);
state.extracted_metrics.project_metrics.extend(metrics);
item.set_metrics_extracted(true);
}

// TODO: dynamic sampling
Expand All @@ -95,7 +97,7 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {
Ok(res) => res,
Err(err) => {
relay_log::error!("invalid span: {err}");
return ItemAction::DropSilently;
return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidSpan));
}
};

Expand All @@ -105,10 +107,11 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc<Config>) {
Ok(payload) => payload,
Err(err) => {
relay_log::debug!("failed to serialize span: {}", err);
return ItemAction::DropSilently;
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal));
}
};
new_item.set_payload(ContentType::Json, payload);
new_item.set_metrics_extracted(item.metrics_extracted());

*item = new_item;

Expand All @@ -127,18 +130,30 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState) {
Ok(span) => span,
Err(e) => {
relay_log::error!("Invalid span: {e}");
state.managed_envelope.track_outcome(
Outcome::Invalid(DiscardReason::InvalidSpan),
relay_quotas::DataCategory::SpanIndexed,
1,
);
phacops marked this conversation as resolved.
Show resolved Hide resolved
return;
}
};
let span = match span.to_json() {
Ok(span) => span,
Err(e) => {
relay_log::error!(error = &e as &dyn Error, "Failed to serialize span");
state.managed_envelope.track_outcome(
Outcome::Invalid(DiscardReason::InvalidSpan),
relay_quotas::DataCategory::SpanIndexed,
1,
);
return;
}
};
let mut item = Item::new(ItemType::Span);
item.set_payload(ContentType::Json, span);
// If metrics extraction happened for the event, it also happened for its spans:
item.set_metrics_extracted(state.event_metrics_extracted);
state.managed_envelope.envelope_mut().add_item(item);
};

Expand Down
41 changes: 34 additions & 7 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
//! The service uses kafka topics to forward data to Sentry

use std::collections::BTreeMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use once_cell::sync::OnceCell;
use relay_base_schema::data_category::DataCategory;
use relay_base_schema::project::ProjectId;
use relay_common::time::UnixTimestamp;
use relay_common::time::{instant_to_date_time, UnixTimestamp};
use relay_config::Config;
use relay_event_schema::protocol::{
self, EventId, SessionAggregates, SessionStatus, SessionUpdate,
Expand Down Expand Up @@ -241,11 +243,12 @@ impl StoreService {
item,
)?,
ItemType::Span => self.produce_span(
scoping.organization_id,
scoping.project_id,
scoping,
start_time,
event_id,
retention,
item,
envelope.meta().remote_addr(),
)?,
_ => {}
}
Expand Down Expand Up @@ -821,11 +824,12 @@ impl StoreService {

fn produce_span(
&self,
organization_id: u64,
project_id: ProjectId,
scoping: Scoping,
start_time: Instant,
event_id: Option<EventId>,
retention_days: u16,
item: &Item,
remote_addr: Option<IpAddr>,
) -> Result<(), StoreError> {
let payload = item.payload();
let d = &mut Deserializer::from_slice(&payload);
Expand All @@ -836,17 +840,40 @@ impl StoreService {
error = &error as &dyn std::error::Error,
"failed to parse span"
);
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id,
outcome: Outcome::Invalid(DiscardReason::InvalidSpan),
quantity: 1,
remote_addr,
scoping,
timestamp: instant_to_date_time(start_time),
});
return Ok(());
}
};

span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32;
span.event_id = event_id;
span.project_id = project_id.value();
span.project_id = scoping.project_id.value();
span.retention_days = retention_days;
span.start_timestamp_ms = (span.start_timestamp * 1e3) as u64;

self.produce(KafkaTopic::Spans, organization_id, KafkaMessage::Span(span))?;
self.produce(
KafkaTopic::Spans,
scoping.organization_id,
KafkaMessage::Span(span),
)?;

self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id,
outcome: Outcome::Accepted,
quantity: 1,
remote_addr,
phacops marked this conversation as resolved.
Show resolved Hide resolved
scoping,
timestamp: instant_to_date_time(start_time),
});

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
Expand Down
7 changes: 5 additions & 2 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,9 +617,12 @@ impl Item {
ItemType::ReplayEvent | ItemType::ReplayRecording => Some(DataCategory::Replay),
ItemType::ClientReport => None,
ItemType::CheckIn => Some(DataCategory::Monitor),
ItemType::Span | ItemType::OtelSpan => Some(if indexed {
phacops marked this conversation as resolved.
Show resolved Hide resolved
DataCategory::SpanIndexed
} else {
DataCategory::Span
}),
ItemType::Unknown(_) => None,
ItemType::Span => None, // No outcomes, for now
ItemType::OtelSpan => None, // No outcomes, for now
}
}

Expand Down
14 changes: 9 additions & 5 deletions relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use relay_system::Addr;

use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::test_store::{Capture, TestStore};
use crate::envelope::{Envelope, Item};
use crate::envelope::{Envelope, Item, ItemType};
use crate::extractors::RequestMeta;
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::{EnvelopeSummary, SemaphorePermit};
Expand Down Expand Up @@ -196,14 +196,18 @@ impl ManagedEnvelope {
let use_indexed = self.use_index_category();
self.envelope.retain_items(|item| match f(item) {
ItemAction::Keep => true,
ItemAction::DropSilently => false,
ItemAction::Drop(outcome) => {
let use_indexed = if item.ty() == &ItemType::Span {
item.metrics_extracted()
} else {
use_indexed
};
if let Some(category) = item.outcome_category(use_indexed) {
outcomes.push((outcome, category, item.quantity()));
}

};
false
}
ItemAction::DropSilently => false,
});
for (outcome, category, quantity) in outcomes {
self.track_outcome(outcome, category, quantity);
Expand Down Expand Up @@ -231,7 +235,7 @@ impl ManagedEnvelope {
///
/// This managed envelope should be updated using [`update`](Self::update) soon after this
/// operation to ensure that subsequent outcomes are consistent.
fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
self.outcome_aggregator.send(TrackOutcome {
timestamp: self.received_at(),
scoping: self.context.scoping,
Expand Down