diff --git a/CHANGELOG.md b/CHANGELOG.md index 27e90e3ac1..589dae2fdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ **Internal**: - Remove a temporary flag from attachment kafka messages indicating rate limited crash reports to Sentry. This is now enabled by default. ([#718](https://github.com/getsentry/relay/pull/718)) +- Performance improvement of http requests to upstream, high priority messages are sent first. ([#678](https://github.com/getsentry/relay/pull/678)) ## 20.8.0 diff --git a/relay-server/src/actors/project_keys.rs b/relay-server/src/actors/project_keys.rs index 31cc3753b6..ccadc5a7e8 100644 --- a/relay-server/src/actors/project_keys.rs +++ b/relay-server/src/actors/project_keys.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use relay_common::{metric, LogError, ProjectId}; use relay_config::Config; -use crate::actors::upstream::{SendQuery, UpstreamQuery, UpstreamRelay}; +use crate::actors::upstream::{RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay}; use crate::metrics::{RelayCounters, RelayTimers}; use crate::utils::Response; @@ -45,6 +45,10 @@ impl UpstreamQuery for GetProjectIds { fn path(&self) -> Cow<'static, str> { Cow::Borrowed("/api/0/relays/projectids/") } + + fn priority() -> RequestPriority { + RequestPriority::High + } } #[derive(Debug)] diff --git a/relay-server/src/actors/project_upstream.rs b/relay-server/src/actors/project_upstream.rs index 3dd0163a6b..f270187647 100644 --- a/relay-server/src/actors/project_upstream.rs +++ b/relay-server/src/actors/project_upstream.rs @@ -15,7 +15,7 @@ use relay_config::Config; use crate::actors::project::ProjectState; use crate::actors::project_cache::{FetchProjectState, ProjectError, ProjectStateResponse}; -use crate::actors::upstream::{SendQuery, UpstreamQuery, UpstreamRelay}; +use crate::actors::upstream::{RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay}; use crate::metrics::{RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{self, ErrorBoundary}; @@ -42,6 +42,10 @@ impl UpstreamQuery for GetProjectStates { fn path(&self) -> Cow<'static, str> { Cow::Borrowed("/api/0/relays/projectconfigs/") } + + fn priority() -> RequestPriority { + RequestPriority::High + } } #[derive(Debug)] diff --git a/relay-server/src/actors/relays.rs b/relay-server/src/actors/relays.rs index 847901ed8c..01c61d0c75 100644 --- a/relay-server/src/actors/relays.rs +++ b/relay-server/src/actors/relays.rs @@ -16,7 +16,7 @@ use relay_auth::{PublicKey, RelayId}; use relay_common::{LogError, RetryBackoff}; use relay_config::Config; -use crate::actors::upstream::{SendQuery, UpstreamQuery, UpstreamRelay}; +use crate::actors::upstream::{RequestPriority, SendQuery, UpstreamQuery, UpstreamRelay}; use crate::utils::{self, ApiErrorResponse, Response}; #[derive(Fail, Debug)] @@ -332,6 +332,10 @@ impl UpstreamQuery for GetRelays { fn path(&self) -> Cow<'static, str> { Cow::Borrowed("/api/0/relays/publickeys/") } + + fn priority() -> RequestPriority { + RequestPriority::High + } } impl Handler for RelayCache { diff --git a/relay-server/src/actors/upstream.rs b/relay-server/src/actors/upstream.rs index 35d8047293..e5b28f8eb9 100644 --- a/relay-server/src/actors/upstream.rs +++ b/relay-server/src/actors/upstream.rs @@ -1,5 +1,25 @@ -//! This actor can be used for sending signed requests to the upstream relay. +//! This module implements the `UpstreamRelay` actor that can be used for sending requests to the +//! upstream relay via HTTP. +//! +//! The actor handles two main types of messages plus some internal messages +//! * messages that use Relay authentication +//! +//! These are requests for data originating inside Relay ( either requests for some +//! configuration data or outcome results being passed to the upstream server) +//! +//! * messages that do no use Relay authentication +//! +//! These are messages for requests that originate as user sent events and use whatever +//! authentication headers were provided by the original request. +//! +//! * messages used internally by Relay +//! +//! These are messages that Relay sends in order to coordinate its work and do not result +//! directly in a HTTP message being send to the upstream server. +//! use std::borrow::Cow; +use std::collections::VecDeque; +use std::fmt; use std::str; use std::sync::Arc; @@ -10,19 +30,20 @@ use actix_web::error::{JsonPayloadError, PayloadError}; use actix_web::http::{header, Method, StatusCode}; use actix_web::{Error as ActixError, HttpMessage}; use failure::Fail; -use futures::{future, prelude::*}; +use futures::{future, prelude::*, sync::oneshot}; use itertools::Itertools; use serde::de::DeserializeOwned; use serde::ser::Serialize; use relay_auth::{RegisterChallenge, RegisterRequest, RegisterResponse, Registration}; -use relay_common::{tryf, LogError, RetryBackoff}; +use relay_common::{metric, tryf, LogError, RetryBackoff}; use relay_config::{Config, RelayMode}; use relay_quotas::{ DataCategories, QuotaScope, RateLimit, RateLimitScope, RateLimits, RetryAfter, Scoping, }; -use crate::utils::{self, ApiErrorResponse}; +use crate::metrics::RelayHistograms; +use crate::utils::{self, ApiErrorResponse, IntoTracked, TrackedFutureFinished}; #[derive(Fail, Debug)] pub enum UpstreamRequestError { @@ -49,6 +70,9 @@ pub enum UpstreamRequestError { #[fail(display = "upstream request returned error {}", _0)] ResponseError(StatusCode, #[cause] ApiErrorResponse), + + #[fail(display = "channel closed")] + ChannelClosed, } /// Represents the current auth state. @@ -133,6 +157,54 @@ impl UpstreamRateLimits { } } +/// Requests are queued and send to the HTTP connections according to their priorities +/// High priority messages are sent first and then, when no high priority message is pending, +/// low priority messages are sent. Within the same priority messages are sent FIFO. +#[derive(Debug, Clone, Copy)] +pub enum RequestPriority { + /// High priority, low volume messages (e.g. ProjectConfig, ProjectStates, Registration messages) + High, + /// Low priority, high volume messages (e.g. Events and Outcomes) + Low, +} + +impl RequestPriority { + fn name(&self) -> &'static str { + match self { + RequestPriority::High => "high", + RequestPriority::Low => "low", + } + } +} + +impl fmt::Display for RequestPriority { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +/// UpstreamRequest objects are queued inside the Upstream actor. +/// The objects are transformed int HTTP requests, and send to upstream as HTTP connections +/// become available. +struct UpstreamRequest { + response_sender: oneshot::Sender>, + method: Method, + path: String, + build: Box Result>, +} + +pub struct UpstreamRelay { + backoff: RetryBackoff, + config: Arc, + auth_state: AuthState, + max_inflight_requests: usize, + num_inflight_requests: usize, + // high priority request queue + high_prio_requests: VecDeque, + // low priority request queue + low_prio_requests: VecDeque, +} + /// Handles a response returned from the upstream. /// /// If the response indicates success via 2XX status codes, `Ok(response)` is returned. Otherwise, @@ -169,7 +241,6 @@ fn handle_response( let upstream_limits = UpstreamRateLimits::new() .retry_after(retry_after) .rate_limits(rate_limits); - Err(UpstreamRequestError::RateLimited(upstream_limits)) } else { // Coerce the result into an empty `ApiErrorResponse` if parsing JSON did not succeed. @@ -181,18 +252,16 @@ fn handle_response( Box::new(future) } -pub struct UpstreamRelay { - backoff: RetryBackoff, - config: Arc, - auth_state: AuthState, -} - impl UpstreamRelay { pub fn new(config: Arc) -> Self { UpstreamRelay { backoff: RetryBackoff::new(config.http_max_retry_interval()), + max_inflight_requests: config.max_concurrent_requests(), config, auth_state: AuthState::Unknown, + num_inflight_requests: 0, + high_prio_requests: VecDeque::new(), + low_prio_requests: VecDeque::new(), } } @@ -204,16 +273,18 @@ impl UpstreamRelay { } } - fn send_request( - &self, - method: Method, - path: P, - build: F, - ) -> ResponseFuture - where - F: FnOnce(&mut ClientRequestBuilder) -> Result, - P: AsRef, - { + fn send_request( + &mut self, + request: UpstreamRequest, + ctx: &mut Context, + ) -> ResponseFuture<(), ()> { + let UpstreamRequest { + response_sender, + method, + path, + build, + } = request; + let host_header = self .config .http_host_header() @@ -229,8 +300,21 @@ impl UpstreamRelay { builder.header("X-Sentry-Relay-Id", credentials.id.to_string()); } - let request = tryf!(build(&mut builder).map_err(UpstreamRequestError::BuildFailed)); - let future = request + //try to build a ClientRequest + let client_request = match build(&mut builder) { + Err(e) => { + response_sender + .send(Err(UpstreamRequestError::BuildFailed(e))) + .ok(); + return Box::new(futures::future::err(())); + } + Ok(client_request) => client_request, + }; + + // we are about to send a HTTP message keep track of requests in flight + self.num_inflight_requests += 1; + + let future = client_request .send() // We currently use the main connection pool size limit to control how many events get // sent out at once, and "queue" up the rest (queueing means that there are a lot of @@ -249,18 +333,65 @@ impl UpstreamRelay { .conn_timeout(self.config.http_connection_timeout()) // This is the timeout after wait + connect. .timeout(self.config.http_timeout()) + .track(ctx.address().recipient()) .map_err(UpstreamRequestError::SendFailed) - .and_then(handle_response); + .and_then(handle_response) + .then(|x| { + response_sender.send(x).ok(); + Ok(()) + }); + Box::new(future) + } + + fn enqueue_request( + &mut self, + priority: RequestPriority, + method: Method, + path: P, + build: F, + ) -> ResponseFuture + where + F: 'static + Send + FnOnce(&mut ClientRequestBuilder) -> Result, + P: AsRef, + { + let (tx, rx) = oneshot::channel::>(); + + let request = UpstreamRequest { + method, + path: path.as_ref().to_owned(), + response_sender: tx, + build: Box::new(build), + }; + match priority { + RequestPriority::Low => { + self.low_prio_requests.push_front(request); + } + RequestPriority::High => { + self.high_prio_requests.push_front(request); + } + }; + metric!( + histogram(RelayHistograms::UpstreamMessageQueueSize) = + self.low_prio_requests.len() as u64, + priority = priority.name() + ); + + let future = rx + // map errors caused by the oneshot channel being closed (unlikely) + .map_err(|_| UpstreamRequestError::ChannelClosed) + //unwrap the result (this is how we transport the http failure through the channel) + .and_then(|result| result); Box::new(future) } fn send_query( - &self, + &mut self, query: Q, ) -> ResponseFuture { let method = query.method(); let path = query.path(); + let priority = Q::priority(); let credentials = tryf!(self .config @@ -272,7 +403,7 @@ impl UpstreamRelay { let max_response_size = self.config.max_api_payload_size(); let future = self - .send_request(method, path, |builder| { + .enqueue_request(priority, method, path, |builder| { builder .header("X-Sentry-Relay-Signature", signature) .header(header::CONTENT_TYPE, "application/json") @@ -312,10 +443,19 @@ impl Message for Authenticate { type Result = Result<(), ()>; } +/// The `Authenticate` message is sent to the UpstreamRelay at Relay startup and coordinates the +/// authentication of the current Relay with the upstream server. +/// +/// Any message the requires Relay authentication (i.e. SendQuery messages) will be send only +/// after Relay has successfully authenticated with the upstream server (i.e. an Authenticate +/// message was successfully handled). +/// +/// **Note:** Relay has retry functionality, outside this actor, that periodically sends Authenticate +/// messages until successful Authentication with the upstream server was achieved. impl Handler for UpstreamRelay { type Result = ResponseActFuture; - fn handle(&mut self, _msg: Authenticate, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, _msg: Authenticate, ctx: &mut Self::Context) -> Self::Result { let credentials = match self.config.credentials() { Some(x) => x, None => return Box::new(fut::err(())), @@ -332,16 +472,18 @@ impl Handler for UpstreamRelay { let future = self .send_query(request) .into_actor(self) - .and_then(|challenge, slf, _ctx| { + .and_then(|challenge, slf, ctx| { log::debug!("got register challenge (token = {})", challenge.token()); slf.auth_state = AuthState::RegisterChallengeResponse; let challenge_response = challenge.create_response(); log::debug!("sending register challenge response"); - slf.send_query(challenge_response).into_actor(slf) + let fut = slf.send_query(challenge_response).into_actor(slf); + ctx.notify(PumpHttpMessageQueue); + fut }) .map(|_, slf, _ctx| { - log::debug!("relay successfully registered with upstream"); + log::info!("relay successfully registered with upstream"); slf.auth_state = AuthState::Registered; }) .map_err(|err, slf, ctx| { @@ -367,6 +509,7 @@ impl Handler for UpstreamRelay { } }); + ctx.notify(PumpHttpMessageQueue); Box::new(future) } } @@ -377,6 +520,10 @@ impl Message for IsAuthenticated { type Result = bool; } +/// The `IsAuthenticated` message is an internal Relay message that is used to query the current +/// state of authentication with the upstream sever. +/// +/// Currently it is only used by the HealthCheck actor. impl Handler for UpstreamRelay { type Result = bool; @@ -385,14 +532,40 @@ impl Handler for UpstreamRelay { } } -pub trait RequestBuilder: 'static { - fn build_request(self, _: &mut ClientRequestBuilder) -> Result; +/// Message send to drive the HttpMessage queue +struct PumpHttpMessageQueue; + +impl Message for PumpHttpMessageQueue { + type Result = (); } -pub trait ResponseTransformer: 'static { - type Result: 'static; +/// The `PumpHttpMessageQueue` is an internal Relay message that is used to drive the +/// HttpMessageQueue. Requests that need to be sent over http are placed on queues with +/// various priorities. At various points in time (when events are added to the queue or +/// when HTTP ClientConnector finishes dealing with an HTTP request) `PumpHttpMessageQueue` +/// messages are sent in order to take messages waiting in the queues and send them over +/// HTTP. +/// +/// `PumpHttpMessageQueue` will end up sending messages over HTTP only when there are free +/// connections available. +impl Handler for UpstreamRelay { + type Result = (); + + fn handle(&mut self, _msg: PumpHttpMessageQueue, ctx: &mut Self::Context) -> Self::Result { + while self.num_inflight_requests < self.max_inflight_requests { + if let Some(msg) = self.high_prio_requests.pop_back() { + self.send_request(msg, ctx).into_actor(self).spawn(ctx); + } else if let Some(msg) = self.low_prio_requests.pop_back() { + self.send_request(msg, ctx).into_actor(self).spawn(ctx); + } else { + break; // no more messages to send at this time stop looping + } + } + } +} - fn transform_response(self, _: ClientResponse) -> Self::Result; +pub trait RequestBuilder: 'static { + fn build_request(self, _: &mut ClientRequestBuilder) -> Result; } impl RequestBuilder for () { @@ -416,35 +589,10 @@ where } } -impl ResponseTransformer for () { - type Result = ResponseFuture<(), UpstreamRequestError>; - - fn transform_response(self, response: ClientResponse) -> Self::Result { - let future = response - .payload() - .for_each(|_| Ok(())) - .map_err(UpstreamRequestError::PayloadFailed); - - Box::new(future) - } -} - -impl ResponseTransformer for F -where - F: FnOnce(ClientResponse) -> T + 'static, -{ - type Result = T; - - fn transform_response(self, response: ClientResponse) -> Self::Result { - self(response) - } -} - -pub struct SendRequest { +pub struct SendRequest { method: Method, path: String, builder: B, - transformer: T, } impl SendRequest { @@ -453,7 +601,6 @@ impl SendRequest { method, path: path.into(), builder: (), - transformer: (), } } @@ -462,8 +609,8 @@ impl SendRequest { } } -impl SendRequest { - pub fn build(self, callback: F) -> SendRequest +impl SendRequest { + pub fn build(self, callback: F) -> SendRequest where F: FnOnce(&mut ClientRequestBuilder) -> Result + 'static, { @@ -471,42 +618,71 @@ impl SendRequest { method: self.method, path: self.path, builder: callback, - transformer: self.transformer, } } } -impl Message for SendRequest -where - R: ResponseTransformer, - R::Result: IntoFuture, -{ - type Result = Result; +impl Message for SendRequest { + type Result = Result<(), UpstreamRequestError>; } -impl Handler> for UpstreamRelay +/// SendRequest messages represent external messages that need to be sent to the upstream server +/// and do not use Relay authentication. +/// +/// The handler adds the message to one of the message queues and tries to advance the processing +/// by sending a `PumpMessageQueue`. +impl Handler> for UpstreamRelay where - B: RequestBuilder, - R: ResponseTransformer, - R::Result: IntoFuture, - T: Send, - E: From + Send, + B: RequestBuilder + Send, { - type Result = ResponseFuture; + type Result = ResponseFuture<(), UpstreamRequestError>; - fn handle(&mut self, message: SendRequest, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, message: SendRequest, ctx: &mut Self::Context) -> Self::Result { let SendRequest { method, path, builder, - transformer, } = message; - Box::new( - self.send_request(method, path, |b| builder.build_request(b)) - .from_err() - .and_then(|r| transformer.transform_response(r)), - ) + let ret_val = Box::new( + self.enqueue_request(RequestPriority::Low, method, path, |b| { + builder.build_request(b) + }) + .and_then(|client_response| { + client_response + .payload() + .for_each(|_| Ok(())) + .map_err(UpstreamRequestError::PayloadFailed) + }), + ); + + ctx.notify(PumpHttpMessageQueue); + ret_val + } +} + +/// This handler handles messages that mark the end of an http request future. +/// The handler decrements the counter of in-flight HTTP requests (since one was just +/// finished) and tries to pump the http message queue by sending a `PumpHttpMessageQueue` +/// +/// Every future representing an HTTP message sent by the ClientConnector is wrapped so that when +/// it finishes or it is dropped a message is sent back to the actor to notify it that a http connection +/// was freed. +/// +/// **Note:** An alternative, simpler, implementation would have been to increment the in-flight +/// requests counter just before sending an http message and to decrement it when the future +/// representing the sent message completes (on the future .then() method). +/// While this approach would have simplified the design, no need for wrapping, no need for +/// the mpsc channel or this handler, it would have not dealt with dropped futures. +/// Weather the added complexity of this design is justified by being able to handle dropped +/// futures is not clear to me (RaduW) at this moment. +impl Handler for UpstreamRelay { + type Result = (); + /// handle notifications received from the tracked future stream + fn handle(&mut self, _item: TrackedFutureFinished, ctx: &mut Self::Context) { + // an HTTP request has finished update the inflight requests and pump the message queue + self.num_inflight_requests -= 1; + ctx.notify(PumpHttpMessageQueue) } } @@ -516,6 +692,10 @@ pub trait UpstreamQuery: Serialize { fn method(&self) -> Method; fn path(&self) -> Cow<'static, str>; + + fn priority() -> RequestPriority { + RequestPriority::Low + } } pub struct SendQuery(pub T); @@ -524,12 +704,20 @@ impl Message for SendQuery { type Result = Result; } +/// SendQuery messages represent messages that need to be sent to the upstream server +/// and use Relay authentication. +/// +/// The handler ensures that Relay is authenticated with the upstream server, adds the message +/// to one of the message queues and tries to advance the processing by +/// sending a `PumpMessageQueue`. impl Handler> for UpstreamRelay { type Result = ResponseFuture; - fn handle(&mut self, message: SendQuery, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, message: SendQuery, ctx: &mut Self::Context) -> Self::Result { tryf!(self.assert_authenticated()); - self.send_query(message.0) + let ret_val = self.send_query(message.0); + ctx.notify(PumpHttpMessageQueue); + ret_val } } @@ -542,6 +730,9 @@ impl UpstreamQuery for RegisterRequest { fn path(&self) -> Cow<'static, str> { Cow::Borrowed("/api/0/relays/register/challenge/") } + fn priority() -> RequestPriority { + RequestPriority::High + } } impl UpstreamQuery for RegisterResponse { @@ -553,4 +744,7 @@ impl UpstreamQuery for RegisterResponse { fn path(&self) -> Cow<'static, str> { Cow::Borrowed("/api/0/relays/register/response/") } + fn priority() -> RequestPriority { + RequestPriority::High + } } diff --git a/relay-server/src/metrics.rs b/relay-server/src/metrics.rs index 24aabae3ba..ec6e0f2390 100644 --- a/relay-server/src/metrics.rs +++ b/relay-server/src/metrics.rs @@ -56,6 +56,9 @@ pub enum RelayHistograms { ProjectStateCacheSize, /// The number of upstream requests queued up for a connection in the connection pool. ConnectorWaitQueue, + /// Number of messages queued by the Upstream actor and waiting to be sent over http. + /// This metric is tagged with a priority label (for high and low priority queues). + UpstreamMessageQueueSize, } impl HistogramMetric for RelayHistograms { @@ -70,6 +73,7 @@ impl HistogramMetric for RelayHistograms { RelayHistograms::ProjectStateReceived => "project_state.received", RelayHistograms::ProjectStateCacheSize => "project_cache.size", RelayHistograms::ConnectorWaitQueue => "connector.wait_queue", + RelayHistograms::UpstreamMessageQueueSize => "http_queue.size", } } } diff --git a/relay-server/src/utils/mod.rs b/relay-server/src/utils/mod.rs index 108b2c0ef5..144e14efc3 100644 --- a/relay-server/src/utils/mod.rs +++ b/relay-server/src/utils/mod.rs @@ -7,6 +7,7 @@ mod rate_limits; mod request; mod shutdown; mod timer; +mod tracked_future; #[cfg(feature = "processing")] mod kafka; @@ -23,6 +24,7 @@ pub use self::rate_limits::*; pub use self::request::*; pub use self::shutdown::*; pub use self::timer::*; +pub use self::tracked_future::*; #[cfg(feature = "processing")] pub use self::kafka::*; diff --git a/relay-server/src/utils/tracked_future.rs b/relay-server/src/utils/tracked_future.rs new file mode 100644 index 0000000000..dcc4d05de2 --- /dev/null +++ b/relay-server/src/utils/tracked_future.rs @@ -0,0 +1,77 @@ +use actix::{Message, Recipient}; +use futures::{Async, Future, Poll}; + +/// Message send on the notification channel when the tracked future finishes or is disposed. +pub struct TrackedFutureFinished; + +impl Message for TrackedFutureFinished { + type Result = (); +} + +pub trait IntoTracked +where + F: Future, +{ + fn track(self, notifier: Recipient) -> TrackedFuture; +} + +pub struct TrackedFuture { + notified: bool, + notifier: Recipient, + inner: F, +} + +impl TrackedFuture { + fn notify(&mut self) { + self.notifier + .do_send(TrackedFutureFinished) + .map_err(|_| { + log::error!("TrackedFuture could not notify completion"); + }) + .ok(); + } +} + +impl IntoTracked for F +where + F: Future, +{ + fn track(self, notifier: Recipient) -> TrackedFuture { + TrackedFuture { + notified: false, + inner: self, + notifier, + } + } +} + +impl Future for TrackedFuture +where + F: Future, +{ + type Item = F::Item; + type Error = F::Error; + + fn poll(&mut self) -> Poll { + let ret_val = self.inner.poll(); + + match ret_val { + Ok(Async::NotReady) => {} + _ => { + // future is finished notify channel + self.notified = true; + self.notify(); + } + } + ret_val + } +} + +impl Drop for TrackedFuture { + fn drop(&mut self) { + if !self.notified { + //future dropped without being brought to completion + self.notify(); + } + } +} diff --git a/tests/integration/test_basic.py b/tests/integration/test_basic.py index 55ecd6d4a2..c3171b7e08 100644 --- a/tests/integration/test_basic.py +++ b/tests/integration/test_basic.py @@ -48,9 +48,19 @@ def get_project_config(): relay.shutdown(sig=signal.SIGINT) pytest.raises(queue.Empty, lambda: mini_sentry.captured_events.get(timeout=1)) - ((route, error),) = mini_sentry.test_failures - assert route == "/api/666/store/" - assert "Dropped unfinished future" in str(error) + failures = mini_sentry.test_failures + assert len(failures) == 2 + # we are expecting a tracked future error and dropped unfinished future error + dropped_unfinished_error_found = False + tracked_future_error_found = False + for (route, error) in failures: + assert route == "/api/666/store/" + if "Dropped unfinished future" in str(error): + dropped_unfinished_error_found = True + if "TrackedFuture" in str(error): + tracked_future_error_found = True + assert dropped_unfinished_error_found + assert tracked_future_error_found mini_sentry.test_failures.clear()