From cd7cb074b66c2fe530329b8050ac376440563dba Mon Sep 17 00:00:00 2001 From: Radu Woinaroski <5281987+RaduW@users.noreply.github.com> Date: Mon, 5 Oct 2020 10:51:31 +0200 Subject: [PATCH 1/3] ref(server): Upstream request retries & re-authentication (#788) Implement request retries and periodic re-authentication for relay upstream. --- CHANGELOG.md | 1 + relay-config/src/config.rs | 24 +- relay-server/src/actors/upstream.rs | 252 +++++++++++++----- relay-server/src/endpoints/healthcheck.rs | 6 + relay-server/src/utils/api.rs | 23 ++ tests/integration/fixtures/mini_sentry.py | 4 + tests/integration/test_healthchecks.py | 26 ++ tests/integration/test_store.py | 302 +++++++++++++++++++++- 8 files changed, 551 insertions(+), 87 deletions(-) create mode 100644 tests/integration/test_healthchecks.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b744bac5c..5742680249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Fix issue where `$span` would not be recognized in Advanced Data Scrubbing. ([#781](https://github.com/getsentry/relay/pull/781)) - Accept big-endian minidumps. ([#789](https://github.com/getsentry/relay/pull/789)) +- Detect network outages and retry sending events instead of dropping them. ([#788](https://github.com/getsentry/relay/pull/788)) ## 20.9.0 diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index c7dfb2c5ad..9fed4972b9 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -18,6 +18,8 @@ use relay_redis::RedisConfig; use crate::byte_size::ByteSize; use crate::upstream::UpstreamDescriptor; +const DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD: u64 = 10; + /// Defines the source of a config error #[derive(Debug)] enum ConfigErrorSource { @@ -546,13 +548,12 @@ struct Http { /// /// Defaults to `600` (10 minutes). auth_interval: Option, - /// The time until Relay considers authentication dropped after experiencing errors. + /// The maximum time of experiencing uninterrupted network failures until Relay considers that + /// it has encountered a network outage in seconds. /// - /// If connection with the upstream resumes or authentication succeeds during the grace period, - /// Relay retains normal operation. If, instead, connection errors or failed re-authentication - /// attempts persist beyond the grace period, Relay suspends event submission and reverts into - /// authentication mode. - auth_grace_period: u64, + /// During a network outage relay will try to reconnect and will buffer all upstream messages + /// until it manages to reconnect. + outage_grace_period: u64, /// Content encoding to apply to upstream store requests. /// /// By default, Relay applies `gzip` content encoding to compress upstream requests. Compression @@ -578,7 +579,7 @@ impl Default for Http { max_retry_interval: 60, // 1 minute host_header: None, auth_interval: Some(600), // 10 minutes - auth_grace_period: 10, + outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD, encoding: HttpEncoding::Gzip, } } @@ -1161,11 +1162,10 @@ impl Config { } } - /// The maximum amount of time that a Relay is allowed to take to re-authenticate with - /// the upstream after which it is declared as un-authenticated (if it is not able to - /// authenticate). - pub fn http_auth_grace_period(&self) -> Duration { - Duration::from_secs(self.values.http.auth_grace_period) + /// The maximum time of experiencing uninterrupted network failures until Relay considers that + /// it has encountered a network outage. + pub fn http_outage_grace_period(&self) -> Duration { + Duration::from_secs(self.values.http.outage_grace_period) } /// Content encoding of upstream requests. diff --git a/relay-server/src/actors/upstream.rs b/relay-server/src/actors/upstream.rs index e9079a9e8e..a472321380 100644 --- a/relay-server/src/actors/upstream.rs +++ b/relay-server/src/actors/upstream.rs @@ -44,7 +44,7 @@ use relay_quotas::{ }; use crate::metrics::RelayHistograms; -use crate::utils::{self, ApiErrorResponse, IntoTracked, TrackedFutureFinished}; +use crate::utils::{self, ApiErrorResponse, IntoTracked, RelayErrorAction, TrackedFutureFinished}; #[derive(Fail, Debug)] pub enum UpstreamRequestError { @@ -81,33 +81,50 @@ impl UpstreamRequestError { _ => false, } } + + fn is_permanent_rejection(&self) -> bool { + if let Self::ResponseError(status_code, response) = self { + return *status_code == StatusCode::FORBIDDEN + && response.relay_action() == RelayErrorAction::Stop; + } + false + } } /// Represents the current auth state. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] enum AuthState { + /// Relay is not authenticated and authentication has not started. Unknown, + + /// Relay is not authenticated and authentication is in progress. + Registering, + + /// The connection is healthy and authenticated in managed mode. Registered, - Error, -} -/// The position for enqueueing an upstream request. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -enum EnqueuePosition { - Front, - Back, + /// Relay is authenticated and renewing the registration lease. During this process, Relay + /// remains authenticated, unless an error occurs. + Renewing, + + /// Authentication has been permanently denied by the Upstream. Do not attempt to retry. + Denied, } impl AuthState { /// Returns true if the state is considered authenticated. pub fn is_authenticated(self) -> bool { - // XXX: the goal of auth state is that it also tracks auth - // failures from queries. Later we will need to - // extend the states here for it. - self == AuthState::Registered + matches!(self, AuthState::Registered | AuthState::Renewing) } } +/// The position for enqueueing an upstream request. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +enum EnqueuePosition { + Front, + Back, +} + /// Rate limits returned by the upstream. /// /// Upstream rate limits can come in two forms: @@ -165,7 +182,6 @@ impl UpstreamRateLimits { retry_after: self.retry_after, }); } - rate_limits } } @@ -223,14 +239,19 @@ struct UpstreamRequest { } pub struct UpstreamRelay { - backoff: RetryBackoff, - first_error: Option, - config: Arc, + /// backoff policy for the registration messages + auth_backoff: RetryBackoff, auth_state: AuthState, + /// backoff policy for the network outage message + outage_backoff: RetryBackoff, + /// from this instant forward we only got network errors on all our http requests + /// (any request that is sent without causing a network error resets this back to None) + first_error: Option, max_inflight_requests: usize, num_inflight_requests: usize, high_prio_requests: VecDeque, low_prio_requests: VecDeque, + config: Arc, } /// Handles a response returned from the upstream. @@ -281,11 +302,13 @@ fn handle_response( } impl UpstreamRelay { + /// Creates a new `UpstreamRelay` instance. pub fn new(config: Arc) -> Self { UpstreamRelay { - backoff: RetryBackoff::new(config.http_max_retry_interval()), - max_inflight_requests: config.max_concurrent_requests(), + auth_backoff: RetryBackoff::new(config.http_max_retry_interval()), auth_state: AuthState::Unknown, + outage_backoff: RetryBackoff::new(config.http_max_retry_interval()), + max_inflight_requests: config.max_concurrent_requests(), num_inflight_requests: 0, high_prio_requests: VecDeque::new(), low_prio_requests: VecDeque::new(), @@ -294,32 +317,91 @@ impl UpstreamRelay { } } + /// Predicate, checks if a Relay performs authentication. + fn should_authenticate(&self) -> bool { + // only managed mode relays perform authentication + self.config.relay_mode() == RelayMode::Managed + } + + /// Predicate, checks if a Relay does re-authentication. + fn should_renew_auth(&self) -> bool { + self.renew_auth_interval().is_some() + } + + /// Returns the interval at which this Relay should renew authentication. + fn renew_auth_interval(&self) -> Option { + // only relays that authenticate also re-authenticate + let should_renew_auth = self.should_authenticate() + // processing relays do NOT re-authenticate + && !self.config.processing_enabled() + // the upstream did not ban us explicitly from trying to re-authenticate + && self.auth_state != AuthState::Denied; + + if should_renew_auth { + // only relays the have a configured auth-interval reauthenticate + self.config.http_auth_interval() + } else { + None + } + } + + /// Predicate, checks if we are in an network outage situation. + fn is_network_outage(&self) -> bool { + self.outage_backoff.started() + } + + /// Returns an error message if an authentication is prohibited in this state and + /// None if it can authenticate. + fn get_auth_state_error(&self) -> Option<&'static str> { + if !self.should_authenticate() { + Some("Upstream actor trying to authenticate although it is not supposed to.") + } else if self.auth_state == AuthState::Registered && !self.should_renew_auth() { + Some("Upstream actor trying to re-authenticate although it is not supposed to.") + } else if self.auth_state == AuthState::Denied { + Some("Upstream actor trying to authenticate after authentication was denied.") + } else { + // Ok to authenticate + None + } + } + + /// Returns `true` if the connection is ready to send requests to the upstream. fn is_ready(&self) -> bool { + if self.is_network_outage() { + return false; + } + match self.auth_state { - // Relays that have auth errors cannot send messages, even in proxy mode - AuthState::Error => false, + // Relays that have auth errors cannot send messages + AuthState::Registering | AuthState::Denied => false, // Non-managed mode Relays do not authenticate and are ready immediately - AuthState::Unknown => self.config.relay_mode() != RelayMode::Managed, + AuthState::Unknown => !self.should_authenticate(), // All good in managed mode - AuthState::Registered => true, + AuthState::Registered | AuthState::Renewing => true, } } + /// Called when a message to the upstream goes through without a network error. + fn reset_network_error(&mut self) { + self.first_error = None; + self.outage_backoff.reset(); + } + + /// Records an occurrence of a network error. + /// + /// If the network errors persist throughout the http outage grace period, an outage is + /// triggered, which results in halting all network requests and starting a reconnect loop. fn handle_network_error(&mut self, ctx: &mut Context) { let now = Instant::now(); let first_error = *self.first_error.get_or_insert(now); // Only take action if we exceeded the grace period. - if first_error + self.config.http_auth_grace_period() > now { + if first_error + self.config.http_outage_grace_period() > now { return; } - // Set authentication to errored to stop sending requests. - self.auth_state = AuthState::Error; - - // There is no re-authentication scheduled, schedule one now. - if !self.backoff.started() { - ctx.notify_later(Authenticate, self.backoff.next_backoff()); + if !self.outage_backoff.started() { + ctx.notify_later(CheckUpstreamConnection, self.outage_backoff.next_backoff()); } } @@ -389,15 +471,15 @@ impl UpstreamRelay { ctx: &mut Context, ) { if matches!(send_result, Err(ref err) if err.is_network_error()) { - // TODO: Enable after fixing network error handling - // self.handle_network_error(ctx); + self.handle_network_error(ctx); if request.retry { return self.enqueue(request, ctx, EnqueuePosition::Back); } } else { - // we managed a request without a network error, reset the first time we got a network error - self.first_error = None; + // we managed a request without a network error, reset the first time we got a network + // error and resume sending events. + self.reset_network_error(); } request.response_sender.send(send_result).ok(); @@ -517,9 +599,10 @@ impl Actor for UpstreamRelay { fn started(&mut self, context: &mut Self::Context) { log::info!("upstream relay started"); - self.backoff.reset(); + self.auth_backoff.reset(); + self.outage_backoff.reset(); - if self.config.relay_mode() == RelayMode::Managed { + if self.should_authenticate() { context.notify(Authenticate); } } @@ -548,6 +631,12 @@ impl Handler for UpstreamRelay { type Result = ResponseActFuture; fn handle(&mut self, _msg: Authenticate, ctx: &mut Self::Context) -> Self::Result { + // detect incorrect authentication requests, if we detect them we have a programming error + if let Some(auth_state_error) = self.get_auth_state_error() { + log::error!("{}", auth_state_error); + return Box::new(fut::err(())); + } + let credentials = match self.config.credentials() { Some(x) => x, None => return Box::new(fut::err(())), @@ -558,8 +647,14 @@ impl Handler for UpstreamRelay { self.config.upstream_descriptor() ); + self.auth_state = if self.auth_state.is_authenticated() { + AuthState::Renewing + } else { + AuthState::Registering + }; + let request = RegisterRequest::new(&credentials.id, &credentials.public_key); - let interval = self.backoff.next_backoff(); + let interval = self.auth_backoff.next_backoff(); let future = self .enqueue_query(request, ctx) @@ -574,10 +669,10 @@ impl Handler for UpstreamRelay { .map(|_, slf, ctx| { log::info!("relay successfully registered with upstream"); slf.auth_state = AuthState::Registered; - slf.backoff.reset(); + slf.auth_backoff.reset(); - if let Some(interval) = slf.config.http_auth_interval() { - ctx.notify_later(Authenticate, interval); + if let Some(renew_interval) = slf.renew_auth_interval() { + ctx.notify_later(Authenticate, renew_interval); } // Resume sending queued requests if we suspended due to dropped authentication @@ -586,31 +681,24 @@ impl Handler for UpstreamRelay { .map_err(move |err, slf, ctx| { log::error!("authentication encountered error: {}", LogError(&err)); - // Network errors are handled separately by the generic response handler. - if !err.is_network_error() { - slf.auth_state = AuthState::Error; - } - // TODO: Remove this when fixing network error handling - else { - slf.handle_network_error(ctx); + if err.is_permanent_rejection() { + slf.auth_state = AuthState::Denied; + return; } - // Do not retry client errors including authentication failures since client errors - // are usually permanent. This allows the upstream to reject unsupported Relays - // without infinite retries. - let should_retry = match err { - UpstreamRequestError::ResponseError(code, _) => !code.is_client_error(), - _ => true, - }; - - if should_retry { - log::debug!( - "scheduling authentication retry in {} seconds", - interval.as_secs() - ); - - ctx.notify_later(Authenticate, interval); + // If the authentication request fails due to any reason other than a network error, + // go back to `Registering` which indicates that this Relay is not authenticated. + // Note that network errors are handled separately by the generic response handler. + if !err.is_network_error() { + slf.auth_state = AuthState::Registering; } + + // Even on network errors, retry authentication independently. + log::debug!( + "scheduling authentication retry in {} seconds", + interval.as_secs() + ); + ctx.notify_later(Authenticate, interval); }); Box::new(future) @@ -661,6 +749,7 @@ impl Handler for UpstreamRelay { return; } + // we are authenticated and there is no network outage, go ahead with the messages while self.num_inflight_requests < self.max_inflight_requests { if let Some(msg) = self.high_prio_requests.pop_back() { self.send_request(msg, ctx); @@ -673,6 +762,47 @@ impl Handler for UpstreamRelay { } } +/// Checks the status of the network connection with the upstream server +struct CheckUpstreamConnection; + +impl Message for CheckUpstreamConnection { + type Result = (); +} + +impl Handler for UpstreamRelay { + type Result = (); + + fn handle(&mut self, _msg: CheckUpstreamConnection, ctx: &mut Self::Context) -> Self::Result { + self.enqueue_request( + RequestPriority::Immediate, + false, + Method::GET, + "/api/0/relays/live/", + ClientRequestBuilder::finish, + ctx, + ) + .and_then(|client_response| { + // consume response bodies to ensure the connection remains usable. + client_response + .payload() + .for_each(|_| Ok(())) + .map_err(UpstreamRequestError::PayloadFailed) + }) + .into_actor(self) + .then(|result, slf, ctx| { + if matches!(result, Err(err) if err.is_network_error()) { + // still network error, schedule another attempt + ctx.notify_later(CheckUpstreamConnection, slf.outage_backoff.next_backoff()); + } else { + // resume normal messages + ctx.notify(PumpHttpMessageQueue); + } + fut::ok(()) + }) + .spawn(ctx); + } +} + pub trait RequestBuilder: 'static { fn build_request(&mut self, _: &mut ClientRequestBuilder) -> Result; } diff --git a/relay-server/src/endpoints/healthcheck.rs b/relay-server/src/endpoints/healthcheck.rs index 964b7411b3..0f7088add0 100644 --- a/relay-server/src/endpoints/healthcheck.rs +++ b/relay-server/src/endpoints/healthcheck.rs @@ -70,4 +70,10 @@ pub fn configure_app(app: ServiceApp) -> ServiceApp { r.name("internal-healthcheck-live"); r.get().with(liveness_healthcheck); }) + // live check is also used to check network connectivity by downstream relays. + // It must have the same url as Sentry (hence two urls doing the same thing) + .resource("/api/0/relays/live/", |r| { + r.name("external-healthcheck-live"); + r.get().with(liveness_healthcheck); + }) } diff --git a/relay-server/src/utils/api.rs b/relay-server/src/utils/api.rs index b096c4577c..42c71c1278 100644 --- a/relay-server/src/utils/api.rs +++ b/relay-server/src/utils/api.rs @@ -3,6 +3,21 @@ use std::fmt; use failure::Fail; use serde::{Deserialize, Serialize}; +/// Represents an action requested by the Upstream sent in an error message +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RelayErrorAction { + None, + Stop, + #[serde(other)] + Unknown, +} + +impl Default for RelayErrorAction { + fn default() -> Self { + RelayErrorAction::None + } +} /// An error response from an api. #[derive(Serialize, Deserialize, Default, Debug, Fail)] pub struct ApiErrorResponse { @@ -10,6 +25,8 @@ pub struct ApiErrorResponse { detail: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] causes: Vec, + #[serde(default)] + relay: RelayErrorAction, } impl ApiErrorResponse { @@ -18,6 +35,7 @@ impl ApiErrorResponse { ApiErrorResponse { detail: Some(s.as_ref().to_string()), causes: Vec::new(), + relay: RelayErrorAction::None, } } @@ -35,8 +53,13 @@ impl ApiErrorResponse { ApiErrorResponse { detail: Some(messages.remove(0)), causes: messages, + relay: RelayErrorAction::None, } } + + pub fn relay_action(&self) -> RelayErrorAction { + self.relay + } } impl fmt::Display for ApiErrorResponse { diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index ebb8364021..23aef24b6b 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -112,6 +112,10 @@ def check_challenge(): assert relay_id in authenticated_relays return jsonify({"relay_id": relay_id}) + @app.route("/api/0/relays/live/", methods=["GET"]) + def is_live(): + return jsonify({"is_healthy": True}) + @app.route("/api/666/envelope/", methods=["POST"]) def store_internal_error_event(): envelope = Envelope.deserialize(flask_request.data) diff --git a/tests/integration/test_healthchecks.py b/tests/integration/test_healthchecks.py new file mode 100644 index 0000000000..163382d2fc --- /dev/null +++ b/tests/integration/test_healthchecks.py @@ -0,0 +1,26 @@ +""" +Test the health check endpoints +""" + + +def test_live(mini_sentry, relay): + """Internal endpoint used by kubernetes """ + relay = relay(mini_sentry) + response = relay.get("/api/relay/healthcheck/live/") + assert response.status_code == 200 + + +def test_external_live(mini_sentry, relay): + """Endpoint called by a downstream to see if it has network connection to the upstream. """ + relay = relay(mini_sentry) + response = relay.get("/api/0/relays/live/") + assert response.status_code == 200 + + +def test_is_healthy(mini_sentry, relay): + """Internal endpoint used by kubernetes """ + relay = relay(mini_sentry) + # NOTE this is redundant but palced here to clearly show the exposed endpoint + # (internally the relay fixture waits for the ready health check anyway) + response = relay.get("/api/relay/healthcheck/ready/") + assert response.status_code == 200 diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index ec0f496d53..1377a5a096 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -9,7 +9,7 @@ import pytest from requests.exceptions import HTTPError -from flask import abort +from flask import abort, Response def test_store(mini_sentry, relay_chain): @@ -611,36 +611,310 @@ def network_error_endpoint(*args, **kwargs): assert event["logentry"] == {"formatted": "Hello, World!"} -@pytest.mark.skip(reason="Enable after fixing network error handling") -def test_failed_network_requests_trigger_re_authentication(relay, mini_sentry): +def test_failed_network_requests_trigger_health_check(relay, mini_sentry): + """ + Tests that consistently failing network requests will trigger relay to enter outage mode + and call on the liveliness endpoint + """ + def network_error_endpoint(*args, **kwargs): # simulate a network error raise socket.timeout() # make the store endpoint fail with a network error mini_sentry.app.view_functions["store_event"] = network_error_endpoint + original_is_live = mini_sentry.app.view_functions["is_live"] + evt = threading.Event() + + def is_live(): + evt.set() # mark is_live was called + return original_is_live() + + mini_sentry.app.view_functions["is_live"] = is_live + + # keep max backoff and the outage grace period as short as the configuration allows + + relay_options = { + "http": { + "max_retry_interval": 1, + "auth_interval": 1000, + "outage_grace_period": 1, + } + } + relay = relay(mini_sentry, relay_options) + project_config = relay.basic_project_config() + mini_sentry.project_configs[42] = project_config + + # send an event, the event should fail and trigger a liveliness check (after a second) + relay.send_event(42) + + # it did try to reestablish connection + assert evt.wait(5) + + +@pytest.mark.parametrize("mode", ["static", "proxy"]) +def test_no_auth(relay, mini_sentry, mode): + """ + Tests that relays that run in proxy and static mode do NOT authenticate + """ + project_config = mini_sentry.basic_project_config() + + old_handler = mini_sentry.app.view_functions["get_challenge"] + has_registered = [False] + + # remember if somebody has tried to register + def register_challenge(*args, **kwargs): + has_registered[0] = True + return old_handler(*args, **kwargs) + + mini_sentry.app.view_functions["get_challenge"] = register_challenge + + def configure_static_project(dir): + os.remove(dir.join("credentials.json")) + os.makedirs(dir.join("projects")) + dir.join("projects").join("42.json").write(json.dumps(project_config)) + + relay_options = {"relay": {"mode": mode}} + relay = relay(mini_sentry, options=relay_options, prepare=configure_static_project) + mini_sentry.project_configs[42] = project_config + + relay.send_event(42, {"message": "123"}) + + # sanity test that we got the event we sent + event = mini_sentry.captured_events.get(timeout=1).get_event() + assert event["logentry"] == {"formatted": "123"} + # verify that no registration took place (the register flag is not set) + assert not has_registered[0] + + +def test_processing_no_re_auth(relay_with_processing, mini_sentry): + """ + Test that processing relays only authenticate once. + + That is processing relays do NOT reauthenticate. + """ + from time import sleep + + relay_options = {"http": {"auth_interval": 1}} + + # count the number of times relay registers original_check_challenge = mini_sentry.app.view_functions["check_challenge"] counter = [0] + + def counted_check_challenge(*args, **kwargs): + counter[0] += 1 + return original_check_challenge(*args, **kwargs) + + mini_sentry.app.view_functions["check_challenge"] = counted_check_challenge + + # creates a relay (we don't need to call it explicitly it should register by itself) + relay_with_processing(options=relay_options) + + sleep(2) + # check that the registration happened only once (although it should have happened every 0.1 secs) + assert counter[0] == 1 + + +def test_re_auth(relay, mini_sentry): + """ + Tests that managed non-processing relays re-authenticate periodically. + """ + from time import sleep + + relay_options = {"http": {"auth_interval": 1}} + + # count the number of times relay registers + original_check_challenge = mini_sentry.app.view_functions["check_challenge"] + counter = [0] + + def counted_check_challenge(*args, **kwargs): + counter[0] += 1 + return original_check_challenge(*args, **kwargs) + + mini_sentry.app.view_functions["check_challenge"] = counted_check_challenge + + # creates a relay (we don't need to call it explicitly it should register by itself) + relay(mini_sentry, options=relay_options) + + sleep(2) + # check that the registration happened repeatedly + assert counter[0] > 1 + + +def test_re_auth_failure(relay, mini_sentry): + """ + Test that after a re-authentication failure, relay stops sending messages until is reauthenticated. + + That is re-authentication failure puts relay in Error state that blocks any + further message passing until authentication is re established. + """ + relay_options = {"http": {"auth_interval": 1}} + + # count the number of times relay registers + original_check_challenge = mini_sentry.app.view_functions["check_challenge"] + counter = [0] + registration_should_succeed = True evt = threading.Event() - def counted_check_challenge(): + def counted_check_challenge(*args, **kwargs): counter[0] += 1 - if counter[0] >= 2: - evt.set() # second auth attempt + evt.set() + if registration_should_succeed: + return original_check_challenge(*args, **kwargs) + else: + return Response("failed", status=500) + + mini_sentry.app.view_functions["check_challenge"] = counted_check_challenge + + # creates a relay (we don't need to call it explicitly it should register by itself) + relay = relay(mini_sentry, options=relay_options) + project_config = relay.basic_project_config() + mini_sentry.project_configs[42] = project_config + + # we have authenticated successfully + assert evt.wait(2) + auth_count_1 = counter[0] + # now fail re-authentication + registration_should_succeed = False + # wait for re-authentication try (should fail) + evt.clear() + assert evt.wait(2) + # check that we have had some authentications attempts (that failed) + auth_count_2 = counter[0] + assert auth_count_1 < auth_count_2 + + # send a message, it should not come through while the authentication has failed + relay.send_event(42, {"message": "123"}) + # sentry should have received nothing + pytest.raises(queue.Empty, lambda: mini_sentry.captured_events.get(timeout=1)) + + # set back authentication to ok + registration_should_succeed = True + # and wait for authentication to be called + evt.clear() + assert evt.wait(2) + # clear authentication errors accumulated until now + mini_sentry.test_failures.clear() + # check that we have had some auth that succeeded + auth_count_3 = counter[0] + assert auth_count_2 < auth_count_3 + + # now we should be re-authenticated and we should have the event + + # sanity test that we got the event we sent + event = mini_sentry.captured_events.get(timeout=1).get_event() + assert event["logentry"] == {"formatted": "123"} + + +def test_permanent_rejection(relay, mini_sentry): + """ + Tests that after a permanent rejection stops authentication attempts. + + That is once an authentication message detects a permanent rejection + it will not re-try to authenticate. + """ + from time import sleep + + relay_options = {"http": {"auth_interval": 1}} + + # count the number of times relay registers + original_check_challenge = mini_sentry.app.view_functions["check_challenge"] + counter = [0, 0] + registration_should_succeed = True + evt = threading.Event() - return original_check_challenge() + def counted_check_challenge(*args, **kwargs): + counter[0] += 1 + evt.set() + if registration_should_succeed: + return original_check_challenge(*args, **kwargs) + else: + counter[1] += 1 + response = Response( + json.dumps({"detail": "bad dog", "relay": "stop"}), + status=403, + content_type="application/json", + ) + print("returning RESPONSE:", response) + return response mini_sentry.app.view_functions["check_challenge"] = counted_check_challenge - # keep max backoff as short as the configuration allows (1 sec) - # make sure the re-authentication is caused by the network failure (set it to be very large) - relay_options = {"http": {"max_retry_interval": 1, "auth_interval": 1000}} + relay(mini_sentry, options=relay_options) + + # we have authenticated successfully + assert evt.wait(2) + auth_count_1 = counter[0] + # now fail re-authentication with client error + registration_should_succeed = False + # wait for re-authentication try (should fail) + evt.clear() + assert evt.wait(2) + # check that we have had some authentications attempts (that failed) + auth_count_2 = counter[0] + assert auth_count_1 < auth_count_2 + + # once we issue a client error we are never called back again + # and wait for authentication to be called + evt.clear() + # check that we were not called + assert evt.wait(2) is False + # to be sure verify that we have only been called once (after failing) + assert counter[1] == 1 + print("auth fail called ", counter[1]) + # clear authentication errors accumulated until now + mini_sentry.test_failures.clear() + + +def test_buffer_events_during_outage(relay, mini_sentry): + """ + Tests that events are buffered during network outages and then sent. + """ + + original_store_event = mini_sentry.app.view_functions["store_event"] + is_network_error = True + + def network_error_endpoint(*args, **kwargs): + if is_network_error: + # simulate a network error + raise socket.timeout() + else: + # normal processing + original_store_event(*args, **kwargs) + + # make the store endpoint fail with a network error + is_network_error = True + mini_sentry.app.view_functions["store_event"] = network_error_endpoint + original_is_live = mini_sentry.app.view_functions["is_live"] + evt = threading.Event() + + def is_live(): + evt.set() # mark is_live was called + return original_is_live() + + mini_sentry.app.view_functions["is_live"] = is_live + + # keep max backoff and the outage grace period as short as the configuration allows + relay_options = { + "http": { + "max_retry_interval": 1, + "auth_interval": 1000, + "outage_grace_period": 1, + } + } relay = relay(mini_sentry, relay_options) project_config = relay.basic_project_config() mini_sentry.project_configs[42] = project_config - # send an event, the event should fail and trigger re-authentication (after a second) - relay.send_event(42) + # send an event, the event should fail and trigger a liveliness check (after a second) + relay.send_event(42, {"message": "123"}) - # auth called at least twice - assert evt.wait(3) + # it did try to reestablish connection + assert evt.wait(5) + + # now stop network errors (let the events pass) + is_network_error = False + + # sanity test that we got the event we sent + event = mini_sentry.captured_events.get(timeout=1).get_event() + assert event["logentry"] == {"formatted": "123"} From 0415a690d8863e741209a1d45c4099da53a437a5 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 5 Oct 2020 13:56:54 +0200 Subject: [PATCH 2/3] ref(server): Bring back upstream response transformer (#794) Brings back the `ResponseTransformer` trait for `upstream::SendRequest` for two reasons: - It will allow to deduplicate the code that consumes response bodies when we don't need to parse them. This is currently done twice: Once for `SendRequest` and once for the manual `CheckUpstreamConnection` message. - It allows other actors to call `SendRequest` with a response body. --- relay-server/src/actors/upstream.rs | 87 ++++++++++++++++++++++++----- 1 file changed, 72 insertions(+), 15 deletions(-) diff --git a/relay-server/src/actors/upstream.rs b/relay-server/src/actors/upstream.rs index a472321380..d41a3e5edf 100644 --- a/relay-server/src/actors/upstream.rs +++ b/relay-server/src/actors/upstream.rs @@ -828,10 +828,42 @@ where } } -pub struct SendRequest { +pub trait ResponseTransformer: 'static { + type Result: 'static; + + fn transform_response(self, _: ClientResponse) -> Self::Result; +} + +impl ResponseTransformer for () { + type Result = ResponseFuture<(), UpstreamRequestError>; + + fn transform_response(self, response: ClientResponse) -> Self::Result { + // consume response bodies to allow connection keep-alive + let future = response + .payload() + .for_each(|_| Ok(())) + .map_err(UpstreamRequestError::PayloadFailed); + + Box::new(future) + } +} + +impl ResponseTransformer for F +where + F: FnOnce(ClientResponse) -> T + 'static, +{ + type Result = T; + + fn transform_response(self, response: ClientResponse) -> Self::Result { + self(response) + } +} + +pub struct SendRequest { method: Method, path: String, builder: B, + transformer: T, retry: bool, } @@ -841,6 +873,7 @@ impl SendRequest { method, path: path.into(), builder: (), + transformer: (), retry: true, } } @@ -850,39 +883,67 @@ impl SendRequest { } } -impl SendRequest { - pub fn build(self, callback: F) -> SendRequest +impl SendRequest { + pub fn build(self, callback: F) -> SendRequest where F: FnMut(&mut ClientRequestBuilder) -> Result + 'static, { SendRequest { method: self.method, path: self.path, - retry: self.retry, builder: callback, + transformer: self.transformer, + retry: self.retry, + } + } + + #[allow(dead_code)] + pub fn transform(self, callback: F) -> SendRequest + where + F: FnOnce(ClientResponse) -> R, + { + SendRequest { + method: self.method, + path: self.path, + builder: self.builder, + transformer: callback, + retry: self.retry, } } } -impl Message for SendRequest { - type Result = Result<(), UpstreamRequestError>; +impl Message for SendRequest +where + R: ResponseTransformer, + R::Result: IntoFuture, +{ + type Result = Result; } +// impl Message for SendRequest { +// type Result = Result<(), UpstreamRequestError>; +// } + /// SendRequest messages represent external messages that need to be sent to the upstream server /// and do not use Relay authentication. /// /// The handler adds the message to one of the message queues. -impl Handler> for UpstreamRelay +impl Handler> for UpstreamRelay where B: RequestBuilder + Send, + R: ResponseTransformer, + R::Result: IntoFuture, + T: Send, + E: From + Send, { - type Result = ResponseFuture<(), UpstreamRequestError>; + type Result = ResponseFuture; - fn handle(&mut self, message: SendRequest, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, message: SendRequest, ctx: &mut Self::Context) -> Self::Result { let SendRequest { method, path, mut builder, + transformer, retry, } = message; @@ -895,12 +956,8 @@ where move |b| builder.build_request(b), ctx, ) - .and_then(|client_response| { - client_response - .payload() - .for_each(|_| Ok(())) - .map_err(UpstreamRequestError::PayloadFailed) - }); + .from_err() + .and_then(move |r| transformer.transform_response(r)); Box::new(future) } From 79ebd1e64e72f302b87f28dd6c595a6c167eea44 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 5 Oct 2020 14:08:38 +0200 Subject: [PATCH 3/3] ref(common): Make the DSN public key Copy (#795) Creates a type for DSN public keys that implements copy. Every public key is a 32-character hexadecimal string. Storage requirements could be halved by parsing the hexadecimal contents of the key, however this is currently not enforced anywhere in Sentry. For this reason, we're taking the conservative approach using 32 bytes. The type is called `ProjectKey` to distinguish it from Relay's own public key. --- relay-common/src/lib.rs | 4 +- relay-common/src/project.rs | 77 +++++++++++++++++++++ relay-quotas/src/quota.rs | 30 ++++---- relay-quotas/src/rate_limit.rs | 32 +++++---- relay-quotas/src/redis.rs | 18 ++--- relay-server/src/actors/events.rs | 6 +- relay-server/src/actors/project.rs | 12 ++-- relay-server/src/endpoints/common.rs | 2 +- relay-server/src/envelope.rs | 10 ++- relay-server/src/extractors/request_meta.rs | 68 +++++++++++++++--- relay-server/src/utils/rate_limits.rs | 10 +-- 11 files changed, 203 insertions(+), 66 deletions(-) create mode 100644 relay-common/src/project.rs diff --git a/relay-common/src/lib.rs b/relay-common/src/lib.rs index fa3cdd8952..9814a644f8 100644 --- a/relay-common/src/lib.rs +++ b/relay-common/src/lib.rs @@ -11,6 +11,7 @@ mod cell; mod constants; mod glob; mod log; +mod project; mod retry; mod time; mod utils; @@ -19,11 +20,12 @@ pub use crate::cell::*; pub use crate::constants::*; pub use crate::glob::*; pub use crate::log::*; +pub use crate::project::*; pub use crate::retry::*; pub use crate::time::*; pub use crate::utils::*; pub use sentry_types::protocol::LATEST as PROTOCOL_VERSION; pub use sentry_types::{ - Auth, Dsn, ParseAuthError, ParseDsnError, ParseProjectIdError, ProjectId, Scheme, Uuid, + Auth, Dsn, ParseAuthError, ParseDsnError, ParseProjectIdError, Scheme, Uuid, }; diff --git a/relay-common/src/project.rs b/relay-common/src/project.rs new file mode 100644 index 0000000000..5ceeaefba1 --- /dev/null +++ b/relay-common/src/project.rs @@ -0,0 +1,77 @@ +use std::borrow::Cow; +use std::fmt; + +use serde::{Deserialize, Serialize}; + +#[doc(inline)] +pub use sentry_types::ProjectId; + +/// An error parsing [`ProjectKey`](struct.ProjectKey.html). +#[derive(Clone, Copy, Debug)] +pub struct ParseProjectKeyError; + +impl fmt::Display for ParseProjectKeyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "invalid project key") + } +} + +impl std::error::Error for ParseProjectKeyError {} + +/// The public key used in a DSN to identify and authenticate for a project at Sentry. +/// +/// Project keys are always 32-character hexadecimal strings. +#[derive(Clone, Copy, Eq, Hash, Ord, PartialOrd, PartialEq)] +pub struct ProjectKey([u8; 32]); + +impl Serialize for ProjectKey { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(self.as_str()) + } +} + +impl<'de> Deserialize<'de> for ProjectKey { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let cow = Cow::::deserialize(deserializer)?; + Self::parse(&cow).map_err(serde::de::Error::custom) + } +} + +impl ProjectKey { + /// Parses a `ProjectKey` from a string. + pub fn parse(key: &str) -> Result { + if key.len() != 32 || !key.is_ascii() { + return Err(ParseProjectKeyError); + } + + let mut project_key = Self(Default::default()); + project_key.0.copy_from_slice(key.as_bytes()); + Ok(project_key) + } + + /// Returns the string representation of the project key. + #[inline] + pub fn as_str(&self) -> &str { + // Safety: The string is already validated to be of length 32 and valid ASCII when + // constructing `ProjectKey`. + unsafe { std::str::from_utf8_unchecked(&self.0) } + } +} + +impl fmt::Debug for ProjectKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ProjectKey(\"{}\")", self.as_str()) + } +} + +impl fmt::Display for ProjectKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.as_str().fmt(f) + } +} diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index 0bb07bf6a2..54831a41c1 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -4,12 +4,12 @@ use std::str::FromStr; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use relay_common::ProjectId; +use relay_common::{ProjectId, ProjectKey}; /// Data scoping information. /// /// This structure holds information of all scopes required for attributing an item to quotas. -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct Scoping { /// The organization id. pub organization_id: u64, @@ -18,7 +18,7 @@ pub struct Scoping { pub project_id: ProjectId, /// The DSN public key. - pub public_key: String, + pub public_key: ProjectKey, /// The public key's internal id. pub key_id: Option, @@ -567,7 +567,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -590,7 +590,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -613,7 +613,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -623,7 +623,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -646,7 +646,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -669,7 +669,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -679,7 +679,7 @@ mod tests { scoping: &Scoping { organization_id: 0, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -702,7 +702,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -712,7 +712,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(0), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -735,7 +735,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), } })); @@ -745,7 +745,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(0), } })); @@ -755,7 +755,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); diff --git a/relay-quotas/src/rate_limit.rs b/relay-quotas/src/rate_limit.rs index 2afb043dc5..78da9f085d 100644 --- a/relay-quotas/src/rate_limit.rs +++ b/relay-quotas/src/rate_limit.rs @@ -2,7 +2,7 @@ use std::fmt; use std::str::FromStr; use std::time::{Duration, Instant}; -use relay_common::ProjectId; +use relay_common::{ProjectId, ProjectKey}; use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping}; use crate::REJECT_ALL_SECS; @@ -107,7 +107,7 @@ pub enum RateLimitScope { /// A project with identifier. Project(ProjectId), /// A DSN public key. - Key(String), + Key(ProjectKey), } impl RateLimitScope { @@ -116,9 +116,9 @@ impl RateLimitScope { match scope { QuotaScope::Organization => RateLimitScope::Organization(scoping.organization_id), QuotaScope::Project => RateLimitScope::Project(scoping.project_id), - QuotaScope::Key => RateLimitScope::Key(scoping.public_key.clone()), + QuotaScope::Key => RateLimitScope::Key(scoping.public_key), // For unknown scopes, assume the most specific scope: - QuotaScope::Unknown => RateLimitScope::Key(scoping.public_key.clone()), + QuotaScope::Unknown => RateLimitScope::Key(scoping.public_key), } } @@ -379,7 +379,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); @@ -389,7 +389,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); @@ -409,7 +409,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); @@ -419,7 +419,7 @@ mod tests { scoping: &Scoping { organization_id: 0, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); @@ -439,7 +439,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); @@ -449,7 +449,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(0), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); @@ -459,7 +459,9 @@ mod tests { fn test_rate_limit_matches_key() { let rate_limit = RateLimit { categories: DataCategories::new(), - scope: RateLimitScope::Key("a94ae32be2584e0bbd7a4cbb95971fee".to_owned()), + scope: RateLimitScope::Key( + ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), + ), reason_code: None, retry_after: RetryAfter::from_secs(1), }; @@ -469,7 +471,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, } })); @@ -479,7 +481,7 @@ mod tests { scoping: &Scoping { organization_id: 0, project_id: ProjectId::new(21), - public_key: "deadbeefdeadbeefdeadbeefdeadbeef".to_owned(), + public_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(), key_id: None, } })); @@ -715,7 +717,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, }); @@ -762,7 +764,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, }; diff --git a/relay-quotas/src/redis.rs b/relay-quotas/src/redis.rs index f11d974b64..60a6a5bceb 100644 --- a/relay-quotas/src/redis.rs +++ b/relay-quotas/src/redis.rs @@ -2,10 +2,10 @@ use std::fmt; use std::sync::Arc; use failure::Fail; +use sentry::protocol::value; use relay_common::UnixTimestamp; use relay_redis::{redis::Script, RedisError, RedisPool}; -use sentry::protocol::value; use crate::quota::{ItemScoping, Quota, QuotaScope}; use crate::rate_limit::{RateLimit, RateLimits, RetryAfter}; @@ -248,7 +248,7 @@ mod tests { use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; - use relay_common::ProjectId; + use relay_common::{ProjectId, ProjectKey}; use relay_redis::redis::Commands; use crate::quota::{DataCategories, DataCategory, ReasonCode, Scoping}; @@ -295,7 +295,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(43), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(44), }, }; @@ -334,7 +334,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(43), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(44), }, }; @@ -371,7 +371,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(43), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(44), }, }; @@ -413,7 +413,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(43), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(44), }, }; @@ -460,7 +460,7 @@ mod tests { scoping: &Scoping { organization_id: 42, project_id: ProjectId::new(43), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(44), }, }; @@ -507,7 +507,7 @@ mod tests { scoping: &Scoping { organization_id: 69420, project_id: ProjectId::new(42), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(4711), }, }; @@ -534,7 +534,7 @@ mod tests { scoping: &Scoping { organization_id: 69420, project_id: ProjectId::new(42), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(4711), }, }; diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 5f2d898e3f..e8921215cf 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -874,7 +874,7 @@ impl EventProcessor { } = *state; let key_id = project_state - .get_public_key_config(&envelope.meta().public_key()) + .get_public_key_config(envelope.meta().public_key()) .and_then(|k| Some(k.numeric_id?.to_string())); if key_id.is_none() { @@ -1408,7 +1408,7 @@ impl Handler for EventManager { .send(StoreEnvelope { envelope, start_time, - scoping: scoping.borrow().clone(), + scoping: *scoping.borrow(), }) .map_err(ProcessingError::ScheduleFailed) .and_then(move |result| result.map_err(ProcessingError::StoreFailed)); @@ -1525,7 +1525,7 @@ impl Handler for EventManager { if let Some(outcome) = outcome { outcome_producer.do_send(TrackOutcome { timestamp: Instant::now(), - scoping: scoping.borrow().clone(), + scoping: *scoping.borrow(), outcome, event_id, remote_addr, diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index d070d255fe..08c3e79644 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -9,7 +9,7 @@ use serde_json::Value; use url::Url; use relay_auth::PublicKey; -use relay_common::{metric, ProjectId}; +use relay_common::{metric, ProjectId, ProjectKey}; use relay_config::{Config, RelayMode}; use relay_filter::{matches_any_origin, FiltersConfig}; use relay_general::pii::{DataScrubbingConfig, PiiConfig}; @@ -185,7 +185,7 @@ impl ProjectState { } /// Returns configuration options for a public key. - pub fn get_public_key_config(&self, public_key: &str) -> Option<&PublicKeyConfig> { + pub fn get_public_key_config(&self, public_key: ProjectKey) -> Option<&PublicKeyConfig> { for key in &self.public_keys { if key.public_key == public_key { return Some(key); @@ -195,7 +195,7 @@ impl ProjectState { } /// Returns the current status of a key. - pub fn get_public_key_status(&self, public_key: &str) -> PublicKeyStatus { + pub fn get_public_key_status(&self, public_key: ProjectKey) -> PublicKeyStatus { if let Some(key) = self.get_public_key_config(public_key) { if key.is_enabled { PublicKeyStatus::Enabled @@ -276,7 +276,7 @@ impl ProjectState { // project was refetched in between. In such a case, access to key quotas is not availabe, // but we can gracefully execute all other rate limiting. scoping.key_id = self - .get_public_key_config(&scoping.public_key) + .get_public_key_config(scoping.public_key) .and_then(|config| config.numeric_id); // This is a hack covering three cases: @@ -366,7 +366,7 @@ impl ProjectState { #[serde(rename_all = "camelCase")] pub struct PublicKeyConfig { /// Public part of key (random hash). - pub public_key: String, + pub public_key: ProjectKey, /// Whether this key can be used. pub is_enabled: bool, @@ -386,7 +386,7 @@ mod limited_public_key_comfigs { #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase", remote = "PublicKeyConfig")] pub struct LimitedPublicKeyConfig { - pub public_key: String, + pub public_key: ProjectKey, pub is_enabled: bool, } diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 09fc98156e..cc7e068eab 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -468,7 +468,7 @@ where if is_event { outcome_producer.do_send(TrackOutcome { timestamp: start_time, - scoping: scoping.borrow().clone(), + scoping: *scoping.borrow(), outcome: error.to_outcome(), event_id: *event_id.borrow(), remote_addr, diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index d73f49f2bc..b2c274e8c7 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -1033,7 +1033,10 @@ mod tests { .unwrap(); assert_eq!(*meta.dsn(), dsn); assert_eq!(meta.project_id(), ProjectId::new(42)); - assert_eq!(meta.public_key(), "e12d836b15bb49d7bbf99e64295d995b"); + assert_eq!( + meta.public_key().as_str(), + "e12d836b15bb49d7bbf99e64295d995b" + ); assert_eq!(meta.client(), Some("sentry/javascript")); assert_eq!(meta.version(), 6); assert_eq!(meta.origin(), Some(&"http://localhost/".parse().unwrap())); @@ -1155,7 +1158,10 @@ mod tests { .unwrap(); assert_eq!(*meta.dsn(), dsn); assert_eq!(meta.project_id(), ProjectId::new(42)); - assert_eq!(meta.public_key(), "e12d836b15bb49d7bbf99e64295d995b"); + assert_eq!( + meta.public_key().as_str(), + "e12d836b15bb49d7bbf99e64295d995b" + ); assert_eq!(meta.client(), Some("sentry/client")); assert_eq!(meta.version(), 7); assert_eq!(meta.origin(), Some(&"http://origin/".parse().unwrap())); diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index 637bddc9aa..cced34184b 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -11,7 +11,8 @@ use serde::{Deserialize, Serialize}; use url::Url; use relay_common::{ - tryf, Auth, Dsn, ParseAuthError, ParseDsnError, ParseProjectIdError, ProjectId, + tryf, Auth, Dsn, ParseAuthError, ParseDsnError, ParseProjectIdError, ParseProjectKeyError, + ProjectId, ProjectKey, }; use relay_quotas::Scoping; @@ -38,6 +39,9 @@ pub enum BadEventMeta { #[fail(display = "bad sentry DSN")] BadDsn(#[fail(cause)] ParseDsnError), + #[fail(display = "bad sentry DSN public key")] + BadPublicKey(ParseProjectKeyError), + #[fail(display = "bad project key: project does not exist")] BadProjectKey, @@ -51,7 +55,9 @@ impl ResponseError for BadEventMeta { Self::MissingAuth | Self::MultipleAuth | Self::BadProjectKey | Self::BadAuth(_) => { HttpResponse::Unauthorized() } - Self::BadProject(_) | Self::BadDsn(_) => HttpResponse::BadRequest(), + Self::BadProject(_) | Self::BadDsn(_) | Self::BadPublicKey(_) => { + HttpResponse::BadRequest() + } Self::ScheduleFailed => HttpResponse::ServiceUnavailable(), }; @@ -59,13 +65,54 @@ impl ResponseError for BadEventMeta { } } +/// Wrapper around a Sentry DSN with parsed public key. +/// +/// The Sentry DSN type carries a plain public key string. However, Relay handles copy `ProjectKey` +/// types internally. Converting from `String` to `ProjectKey` is fallible and should be caught when +/// deserializing the request. +/// +/// This type caches the parsed project key in addition to the DSN. Other than that, it +/// transparently serializes to and deserializes from a DSN string. +#[derive(Debug, Clone)] +pub struct FullDsn { + dsn: Dsn, + public_key: ProjectKey, +} + +impl FullDsn { + /// Ensures a valid public key in the DSN. + fn from_dsn(dsn: Dsn) -> Result { + let public_key = ProjectKey::parse(dsn.public_key())?; + Ok(Self { dsn, public_key }) + } +} + +impl<'de> Deserialize<'de> for FullDsn { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let dsn = Dsn::deserialize(deserializer)?; + Self::from_dsn(dsn).map_err(serde::de::Error::custom) + } +} + +impl Serialize for FullDsn { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.dsn.serialize(serializer) + } +} + const fn default_version() -> u16 { relay_common::PROTOCOL_VERSION } /// Request information for sentry ingest data, such as events, envelopes or metrics. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestMeta { +pub struct RequestMeta { /// The DSN describing the target of this envelope. dsn: D, @@ -154,7 +201,7 @@ impl RequestMeta { #[cfg(test)] pub fn new(dsn: Dsn) -> Self { RequestMeta { - dsn, + dsn: FullDsn::from_dsn(dsn).expect("invalid DSN key"), client: Some("sentry/client".to_string()), version: 7, origin: Some("http://origin/".parse().unwrap()), @@ -170,7 +217,7 @@ impl RequestMeta { /// The DSN declares the project and auth information and upstream address. When RequestMeta is /// constructed from a web request, the DSN is set to point to the upstream host. pub fn dsn(&self) -> &Dsn { - &self.dsn + &self.dsn.dsn } /// Returns the project identifier that the DSN points to. @@ -179,8 +226,8 @@ impl RequestMeta { } /// Returns the public key part of the DSN for authentication. - pub fn public_key(&self) -> &str { - &self.dsn.public_key() + pub fn public_key(&self) -> ProjectKey { + self.dsn.public_key } /// Formats the Sentry authentication header. @@ -209,7 +256,7 @@ impl RequestMeta { Scoping { organization_id: 0, project_id: self.project_id(), - public_key: self.public_key().to_owned(), + public_key: self.public_key(), key_id: None, } } @@ -368,8 +415,11 @@ fn extract_event_meta( project_id, ); + let dsn = dsn_string.parse().map_err(BadEventMeta::BadDsn)?; + let dsn = FullDsn::from_dsn(dsn).map_err(BadEventMeta::BadPublicKey)?; + Ok(RequestMeta { - dsn: dsn_string.parse().map_err(BadEventMeta::BadDsn)?, + dsn, version, client, origin, diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 0d3136c26c..78b42de2da 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -288,7 +288,7 @@ mod tests { use smallvec::smallvec; - use relay_common::ProjectId; + use relay_common::{ProjectId, ProjectKey}; use relay_quotas::RetryAfter; use crate::envelope::{AttachmentType, ContentType}; @@ -323,7 +323,7 @@ mod tests { let scoping = Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), }; @@ -337,7 +337,7 @@ mod tests { let scoping = Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), }; @@ -377,7 +377,7 @@ mod tests { let scoping = Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(), + public_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: Some(17), }; @@ -416,7 +416,7 @@ mod tests { Scoping { organization_id: 42, project_id: ProjectId::new(21), - public_key: "e12d836b15bb49d7bbf99e64295d995b".to_owned(), + public_key: ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(), key_id: Some(17), } }