diff --git a/CHANGELOG.md b/CHANGELOG.md index e6532a67ca6..90411d075fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - Make Kafka spans compatible with the Snuba span schema. ([#2917](https://github.com/getsentry/relay/pull/2917), [#2926](https://github.com/getsentry/relay/pull/2926)) - Only extract span metrics / tags when they are needed. ([#2907](https://github.com/getsentry/relay/pull/2907), [#2923](https://github.com/getsentry/relay/pull/2923), [#2924](https://github.com/getsentry/relay/pull/2924)) - Normalize metric resource identifiers in `event._metrics_summary` and `span._metrics_summary`. ([#2914](https://github.com/getsentry/relay/pull/2914)) +- Send outcomes for spans. ([#2930](https://github.com/getsentry/relay/pull/2930)) - Validate error_id and trace_id vectors in replay deserializer. ([#2931](https://github.com/getsentry/relay/pull/2931)) - Add a data category for indexed spans. ([#2937](https://github.com/getsentry/relay/pull/2937)) - Add nested Android app start span ops to span ingestion ([#2927](https://github.com/getsentry/relay/pull/2927)) diff --git a/relay-server/src/actors/outcome.rs b/relay-server/src/actors/outcome.rs index 12ec0417814..d85005a48d1 100644 --- a/relay-server/src/actors/outcome.rs +++ b/relay-server/src/actors/outcome.rs @@ -148,12 +148,12 @@ impl FromMessage for TrackOutcome { /// Defines the possible outcomes from processing an event. #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum Outcome { - // /// The event has been accepted and handled completely. - // /// - // /// This is never emitted by Relay as the event may be discarded by the processing pipeline - // /// after Relay. Only the `save_event` task in Sentry finally accepts an event. - // #[allow(dead_code)] - // Accepted, + /// The event has been accepted and handled completely. + /// + /// This is only emitted for items going from Relay to Snuba directly. + #[allow(dead_code)] + Accepted, + /// The event has been filtered due to a configured filter. Filtered(FilterStatKey), @@ -183,6 +183,7 @@ impl Outcome { Outcome::Invalid(_) => OutcomeId::INVALID, Outcome::Abuse => OutcomeId::ABUSE, Outcome::ClientDiscard(_) => OutcomeId::CLIENT_DISCARD, + Outcome::Accepted => OutcomeId::ACCEPTED, } } @@ -198,6 +199,7 @@ impl Outcome { .map(|code| Cow::Owned(code.as_str().into())), Outcome::ClientDiscard(ref discard_reason) => Some(Cow::Borrowed(discard_reason)), Outcome::Abuse => None, + Outcome::Accepted => None, } } @@ -229,6 +231,7 @@ impl fmt::Display for Outcome { Outcome::Invalid(reason) => write!(f, "invalid data ({reason})"), Outcome::Abuse => write!(f, "abuse limit reached"), Outcome::ClientDiscard(reason) => write!(f, "discarded by client ({reason})"), + Outcome::Accepted => write!(f, "accepted"), } } } @@ -356,6 +359,9 @@ pub enum DiscardReason { /// (Relay) Profiling related discard reasons Profiling(&'static str), + + /// (Relay) A span is not valid after normalization. + InvalidSpan, } impl DiscardReason { @@ -398,6 +404,7 @@ impl DiscardReason { DiscardReason::InvalidReplayEventPii => "invalid_replay_pii_scrubber_failed", DiscardReason::InvalidReplayRecordingEvent => "invalid_replay_recording", DiscardReason::Profiling(reason) => reason, + DiscardReason::InvalidSpan => "invalid_span", } } } diff --git a/relay-server/src/actors/outcome_aggregator.rs b/relay-server/src/actors/outcome_aggregator.rs index 3934f93ae3d..f997fd667de 100644 --- a/relay-server/src/actors/outcome_aggregator.rs +++ b/relay-server/src/actors/outcome_aggregator.rs @@ -170,6 +170,7 @@ impl OutcomeAggregator { Outcome::RateLimited(_) | Outcome::Invalid(DiscardReason::ProjectId) | Outcome::FilteredSampling(_) + | Outcome::Accepted ) } diff --git a/relay-server/src/actors/processor/span/processing.rs b/relay-server/src/actors/processor/span/processing.rs index 0b0ade16174..30997ef13ec 100644 --- a/relay-server/src/actors/processor/span/processing.rs +++ b/relay-server/src/actors/processor/span/processing.rs @@ -14,6 +14,7 @@ use relay_metrics::{aggregator::AggregatorConfig, MetricNamespace, UnixTimestamp use relay_pii::PiiProcessor; use relay_protocol::{Annotated, Empty}; +use crate::actors::outcome::{DiscardReason, Outcome}; use crate::actors::processor::{ProcessEnvelopeState, ProcessingError}; use crate::envelope::{ContentType, Item, ItemType}; use crate::metrics_extraction::generic::extract_metrics; @@ -45,7 +46,7 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc) { Ok(otel_span) => Annotated::new(otel_span.into()), Err(err) => { relay_log::debug!("failed to parse OTel span: {}", err); - return ItemAction::DropSilently; + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson)); } } } @@ -53,7 +54,7 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc) { Ok(span) => span, Err(err) => { relay_log::debug!("failed to parse span: {}", err); - return ItemAction::DropSilently; + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidJson)); } }, @@ -62,16 +63,17 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc) { if let Err(e) = normalize(&mut annotated_span, config.clone()) { relay_log::debug!("failed to normalize span: {}", e); - return ItemAction::DropSilently; + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; let Some(span) = annotated_span.value_mut() else { - return ItemAction::DropSilently; + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); }; if let Some(config) = span_metrics_extraction_config { let metrics = extract_metrics(span, config); state.extracted_metrics.project_metrics.extend(metrics); + item.set_metrics_extracted(true); } // TODO: dynamic sampling @@ -95,7 +97,7 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc) { Ok(res) => res, Err(err) => { relay_log::error!("invalid span: {err}"); - return ItemAction::DropSilently; + return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidSpan)); } }; @@ -105,10 +107,11 @@ pub fn process(state: &mut ProcessEnvelopeState, config: Arc) { Ok(payload) => payload, Err(err) => { relay_log::debug!("failed to serialize span: {}", err); - return ItemAction::DropSilently; + return ItemAction::Drop(Outcome::Invalid(DiscardReason::Internal)); } }; new_item.set_payload(ContentType::Json, payload); + new_item.set_metrics_extracted(item.metrics_extracted()); *item = new_item; @@ -127,6 +130,11 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState) { Ok(span) => span, Err(e) => { relay_log::error!("Invalid span: {e}"); + state.managed_envelope.track_outcome( + Outcome::Invalid(DiscardReason::InvalidSpan), + relay_quotas::DataCategory::SpanIndexed, + 1, + ); return; } }; @@ -134,11 +142,18 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState) { Ok(span) => span, Err(e) => { relay_log::error!(error = &e as &dyn Error, "Failed to serialize span"); + state.managed_envelope.track_outcome( + Outcome::Invalid(DiscardReason::InvalidSpan), + relay_quotas::DataCategory::SpanIndexed, + 1, + ); return; } }; let mut item = Item::new(ItemType::Span); item.set_payload(ContentType::Json, span); + // If metrics extraction happened for the event, it also happened for its spans: + item.set_metrics_extracted(state.event_metrics_extracted); state.managed_envelope.envelope_mut().add_item(item); }; diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 9ff81949261..5cebc7f2356 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -8,8 +8,9 @@ use std::time::Instant; use bytes::Bytes; use once_cell::sync::OnceCell; +use relay_base_schema::data_category::DataCategory; use relay_base_schema::project::ProjectId; -use relay_common::time::UnixTimestamp; +use relay_common::time::{instant_to_date_time, UnixTimestamp}; use relay_config::Config; use relay_event_schema::protocol::{ self, EventId, SessionAggregates, SessionStatus, SessionUpdate, @@ -259,13 +260,9 @@ impl StoreService { retention, item, )?, - ItemType::Span => self.produce_span( - scoping.organization_id, - scoping.project_id, - event_id, - retention, - item, - )?, + ItemType::Span => { + self.produce_span(scoping, start_time, event_id, retention, item)? + } _ => {} } } @@ -840,8 +837,8 @@ impl StoreService { fn produce_span( &self, - organization_id: u64, - project_id: ProjectId, + scoping: Scoping, + start_time: Instant, event_id: Option, retention_days: u16, item: &Item, @@ -855,17 +852,40 @@ impl StoreService { error = &error as &dyn std::error::Error, "failed to parse span" ); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::SpanIndexed, + event_id: None, + outcome: Outcome::Invalid(DiscardReason::InvalidSpan), + quantity: 1, + remote_addr: None, + scoping, + timestamp: instant_to_date_time(start_time), + }); return Ok(()); } }; span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32; span.event_id = event_id; - span.project_id = project_id.value(); + span.project_id = scoping.project_id.value(); span.retention_days = retention_days; span.start_timestamp_ms = (span.start_timestamp * 1e3) as u64; - self.produce(KafkaTopic::Spans, organization_id, KafkaMessage::Span(span))?; + self.produce( + KafkaTopic::Spans, + scoping.organization_id, + KafkaMessage::Span(span), + )?; + + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::SpanIndexed, + event_id: None, + outcome: Outcome::Accepted, + quantity: 1, + remote_addr: None, + scoping, + timestamp: instant_to_date_time(start_time), + }); metric!( counter(RelayCounters::ProcessingMessageProduced) += 1, diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index c89c8129e84..02fe39c6e3c 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -617,9 +617,12 @@ impl Item { ItemType::ReplayEvent | ItemType::ReplayRecording => Some(DataCategory::Replay), ItemType::ClientReport => None, ItemType::CheckIn => Some(DataCategory::Monitor), + ItemType::Span | ItemType::OtelSpan => Some(if indexed { + DataCategory::SpanIndexed + } else { + DataCategory::Span + }), ItemType::Unknown(_) => None, - ItemType::Span => None, // No outcomes, for now - ItemType::OtelSpan => None, // No outcomes, for now } } diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index d9b1fe618a3..f6e9401347d 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -10,7 +10,7 @@ use relay_system::Addr; use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::actors::processor::ProcessingGroup; use crate::actors::test_store::{Capture, TestStore}; -use crate::envelope::{Envelope, Item}; +use crate::envelope::{Envelope, Item, ItemType}; use crate::extractors::RequestMeta; use crate::statsd::{RelayCounters, RelayTimers}; use crate::utils::{EnvelopeSummary, SemaphorePermit}; @@ -218,14 +218,18 @@ impl ManagedEnvelope { let use_indexed = self.use_index_category(); self.envelope.retain_items(|item| match f(item) { ItemAction::Keep => true, + ItemAction::DropSilently => false, ItemAction::Drop(outcome) => { + let use_indexed = if item.ty() == &ItemType::Span { + item.metrics_extracted() + } else { + use_indexed + }; if let Some(category) = item.outcome_category(use_indexed) { outcomes.push((outcome, category, item.quantity())); - } - + }; false } - ItemAction::DropSilently => false, }); for (outcome, category, quantity) in outcomes { self.track_outcome(outcome, category, quantity); @@ -253,7 +257,7 @@ impl ManagedEnvelope { /// /// This managed envelope should be updated using [`update`](Self::update) soon after this /// operation to ensure that subsequent outcomes are consistent. - fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) { + pub fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) { self.outcome_aggregator.send(TrackOutcome { timestamp: self.received_at(), scoping: self.context.scoping, diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index bea05bd4f3e..1c2f9a7815d 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -1,20 +1,18 @@ import json +import signal +import time import uuid from copy import deepcopy from datetime import datetime, timedelta, timezone -from queue import Empty -import signal from pathlib import Path +from queue import Empty -import requests import pytest -import time - +import requests from sentry_sdk.envelope import Envelope, Item, PayloadRef from .test_metrics import metrics_by_name - RELAY_ROOT = Path(__file__).parent.parent.parent HOUR_MILLISEC = 1000 * 3600 @@ -575,7 +573,17 @@ def _get_event_payload(event_type): "type": "transaction", "timestamp": now.isoformat(), "start_timestamp": (now - timedelta(seconds=2)).isoformat(), - "spans": [], + "spans": [ + { + "op": "default", + "span_id": "968cff94913ebb07", + "segment_id": "968cff94913ebb07", + "start_timestamp": now.timestamp(), + "timestamp": now.timestamp() + 1, + "exclusive_time": 1000.0, + "trace_id": "a0fa8803753e40fd8124b21eeb2986b5", + }, + ], "contexts": { "trace": { "op": "hi", @@ -670,6 +678,19 @@ def _get_profile_payload(metadata_only=True): return profile +def _get_span_payload(): + now = datetime.utcnow() + return { + "op": "default", + "span_id": "968cff94913ebb07", + "segment_id": "968cff94913ebb07", + "start_timestamp": now.timestamp(), + "timestamp": now.timestamp() + 1, + "exclusive_time": 1000.0, + "trace_id": "a0fa8803753e40fd8124b21eeb2986b5", + } + + @pytest.mark.parametrize( "category,is_outcome_expected", [("session", False), ("transaction", True)] ) @@ -1665,3 +1686,233 @@ def assert_metrics_outcomes(n_metrics, n_outcomes): for _ in range(2): send_buckets(1) assert_metrics_outcomes(0, 1) + + +@pytest.mark.parametrize("num_intermediate_relays", [0, 1, 2]) +def test_span_outcomes( + mini_sentry, + relay, + relay_with_processing, + outcomes_consumer, + num_intermediate_relays, +): + """ + Tests that Relay reports correct outcomes for spans. + + Have a chain of many relays that eventually connect to Sentry + and verify that the outcomes sent by the first relay + are properly forwarded up to sentry. + """ + outcomes_consumer = outcomes_consumer(timeout=5) + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id)["config"] + + project_config.setdefault("features", []).extend( + [ + "projects:span-metrics-extraction", + "projects:span-metrics-extraction-all-modules", + ] + ) + project_config["transactionMetrics"] = { + "version": 1, + } + project_config["sampling"] = { + "version": 2, + "rules": [ + { + "id": 1, + "samplingValue": {"type": "sampleRate", "value": 0.0}, + "type": "transaction", + "condition": { + "op": "eq", + "name": "event.transaction", + "value": "hi", + }, + } + ], + } + + config = { + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + "aggregator": { + "bucket_interval": 1, + "flush_interval": 1, + }, + "source": "processing-relay", + }, + "aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0}, + } + + # The innermost Relay needs to be in processing mode + upstream = relay_with_processing(config) + + # build a chain of relays + for i in range(num_intermediate_relays): + config = deepcopy(config) + if i == 0: + # Emulate a PoP Relay + config["outcomes"]["source"] = "pop-relay" + if i == 1: + # Emulate a customer Relay + config["outcomes"]["source"] = "external-relay" + config["outcomes"]["emit_outcomes"] = "as_client_reports" + upstream = relay(upstream, config) + + def make_envelope(transaction_name): + payload = _get_event_payload("transaction") + payload["transaction"] = transaction_name + envelope = Envelope() + envelope.add_item( + Item( + payload=PayloadRef(bytes=json.dumps(payload).encode()), + type="transaction", + ) + ) + return envelope + + upstream.send_envelope( + project_id, make_envelope("hi") + ) # should get dropped by dynamic sampling + upstream.send_envelope( + project_id, make_envelope("ho") + ) # should be kept by dynamic sampling + + outcomes = outcomes_consumer.get_outcomes() + outcomes.sort(key=lambda o: sorted(o.items())) + + expected_source = { + 0: "processing-relay", + 1: "pop-relay", + 2: "pop-relay", + }[num_intermediate_relays] + expected_outcomes = [ + { + "category": 9, # Span + "key_id": 123, + "org_id": 1, + "outcome": 1, # Filtered + "project_id": 42, + "quantity": 1, + "reason": "Sampled:1", + "source": expected_source, + }, + { + "category": 16, # SpanIndexed + "key_id": 123, + "org_id": 1, + "outcome": 0, # Accepted + "project_id": 42, + "quantity": 2, + "source": "processing-relay", + }, + ] + for outcome in outcomes: + outcome.pop("timestamp") + + assert outcomes == expected_outcomes, outcomes + + +@pytest.mark.parametrize("metrics_already_extracted", [False, True]) +def test_span_outcomes_invalid( + mini_sentry, + relay_with_processing, + outcomes_consumer, + metrics_already_extracted, +): + """ + Tests that Relay reports correct outcomes for invalid spans as `Span` or `Transaction`. + """ + outcomes_consumer = outcomes_consumer(timeout=2) + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id)["config"] + + project_config.setdefault("features", []).extend( + [ + "projects:span-metrics-extraction", + "projects:span-metrics-extraction-all-modules", + "organizations:standalone-span-ingestion", + ] + ) + project_config["transactionMetrics"] = { + "version": 1, + } + + config = { + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + "aggregator": { + "bucket_interval": 1, + "flush_interval": 1, + }, + "source": "pop-relay", + }, + "aggregator": {"bucket_interval": 1, "initial_delay": 0, "debounce_delay": 0}, + } + + upstream = relay_with_processing(config) + + # Create an envelope with an invalid profile: + def make_envelope(): + envelope = Envelope() + payload = _get_event_payload("transaction") + payload["spans"][0].pop("span_id", None) + envelope.add_item( + Item( + payload=PayloadRef(bytes=json.dumps(payload).encode()), + type="transaction", + headers={"metrics_extracted": metrics_already_extracted}, + ) + ) + payload = _get_span_payload() + payload.pop("span_id", None) + envelope.add_item( + Item( + payload=PayloadRef(bytes=json.dumps(payload).encode()), + type="span", + headers={"metrics_extracted": metrics_already_extracted}, + ) + ) + return envelope + + envelope = make_envelope() + upstream.send_envelope(project_id, envelope) + + outcomes = outcomes_consumer.get_outcomes() + outcomes.sort(key=lambda o: sorted(o.items())) + + expected_outcomes = [ + { + "category": 9 if metrics_already_extracted else 2, + "key_id": 123, + "org_id": 1, + "outcome": 3, # Invalid + "project_id": 42, + "quantity": 1, + "reason": "invalid_transaction", + "remote_addr": "127.0.0.1", + "source": "pop-relay", + }, + { + "category": 16 if metrics_already_extracted else 12, + "key_id": 123, + "org_id": 1, + "outcome": 3, # Invalid + "project_id": 42, + "quantity": 1, + "reason": "internal", + "remote_addr": "127.0.0.1", + "source": "pop-relay", + }, + ] + for outcome in outcomes: + outcome.pop("timestamp") + outcome.pop("event_id") + + assert outcomes == expected_outcomes, outcomes