Skip to content

Commit

Permalink
ref: Remove actix-web client, make reqwest the new http client (#938)
Browse files Browse the repository at this point in the history

Remove any use of actix-web's HTTP client. We still need to clean up error types and remove unnecessary abstractions, but this should alleviate most of the pain that comes from having to support two http client implementations. We should do the real refactor in a follow-up so as to not introduce accidental behavior changes new error grouping.
  • Loading branch information
untitaker authored Feb 24, 2021
1 parent 5b3f321 commit 5aa8930
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 346 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

**Features**:

- Relay now picks up HTTP proxies from environment variables. This is made possible by switching to a different HTTP client library.

**Bug Fixes**:

- Fix a problem with Data Scrubbing source names (PII selectors) that caused `$frame.abs_path` to match, but not `$frame.abs_path || **` or `$frame.abs_path && **`. ([#932](https://github.com/getsentry/relay/pull/932))
Expand All @@ -11,6 +15,7 @@
**Internal**:

- Emit the `category` field for outcomes of events. This field disambiguates error events, security events and transactions. As a side-effect, Relay no longer emits outcomes for broken JSON payloads or network errors. ([#931](https://github.com/getsentry/relay/pull/931))
- The undocumented `http._client` option has been removed. ([#938](https://github.com/getsentry/relay/pull/938))

## 21.2.0

Expand Down
22 changes: 0 additions & 22 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,17 +488,6 @@ pub enum HttpEncoding {
Br,
}

/// (unstable) Http client to use for upstream store requests.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum HttpClient {
/// Use actix http client, the default.
Actix,
/// Use reqwest. Necessary for HTTP proxy support (standard envvars are picked up
/// automatically)
Reqwest,
}

/// Controls authentication with upstream.
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
Expand Down Expand Up @@ -547,11 +536,6 @@ struct Http {
/// - `gzip` (default): Compression using gzip.
/// - `br`: Compression using the brotli algorithm.
encoding: HttpEncoding,
/// (unstable) Which HTTP client to use. Can be "actix" or "reqwest", with "actix" being the
/// default. Switching to "reqwest" is required to get experimental HTTP proxy support.
///
/// Note that this option will be removed in the future once "reqwest" is the default.
_client: HttpClient,
}

impl Default for Http {
Expand All @@ -564,7 +548,6 @@ impl Default for Http {
auth_interval: Some(600), // 10 minutes
outage_grace_period: DEFAULT_NETWORK_OUTAGE_GRACE_PERIOD,
encoding: HttpEncoding::Gzip,
_client: HttpClient::Actix,
}
}
}
Expand Down Expand Up @@ -1139,11 +1122,6 @@ impl Config {
self.values.http.encoding
}

/// (unstable) HTTP client to use for upstream requests.
pub fn http_client(&self) -> HttpClient {
self.values.http._client
}

/// Returns whether this Relay should emit outcomes.
///
/// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
Expand Down
128 changes: 47 additions & 81 deletions relay-server/src/actors/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::time::Instant;

use ::actix::fut;
use ::actix::prelude::*;
use actix_web::client::{ClientRequest, SendRequestError};
use actix_web::http::{header, Method, StatusCode};
use failure::Fail;
use futures::{future, prelude::*, sync::oneshot};
Expand All @@ -36,7 +35,7 @@ use serde::ser::Serialize;

use relay_auth::{RegisterChallenge, RegisterRequest, RegisterResponse, Registration};
use relay_common::{metric, tryf, RetryBackoff};
use relay_config::{Config, HttpClient, RelayMode};
use relay_config::{Config, RelayMode};
use relay_log::LogError;
use relay_quotas::{
DataCategories, QuotaScope, RateLimit, RateLimitScope, RateLimits, RetryAfter, Scoping,
Expand All @@ -47,12 +46,8 @@ use crate::metrics::{RelayHistograms, RelayTimers};
use crate::utils::{self, ApiErrorResponse, IntoTracked, RelayErrorAction, TrackedFutureFinished};

#[derive(Fail, Debug)]
pub enum UpstreamSendRequestError {
#[fail(display = "could not send request using reqwest")]
Reqwest(#[cause] reqwest::Error),
#[fail(display = "could not send request using actix-web client")]
Actix(#[cause] SendRequestError),
}
#[fail(display = "could not send request using reqwest")]
pub struct UpstreamSendRequestError(#[cause] reqwest::Error);

#[derive(Fail, Debug)]
pub enum UpstreamRequestError {
Expand Down Expand Up @@ -330,7 +325,10 @@ pub struct UpstreamRelay {
high_prio_requests: VecDeque<UpstreamRequest>,
low_prio_requests: VecDeque<UpstreamRequest>,
config: Arc<Config>,
reqwest_client: Option<(tokio::runtime::Runtime, reqwest::Client)>,
reqwest_client: reqwest::Client,
/// "reqwest runtime" as this tokio runtime is currently only spawned such that reqwest can
/// run.
reqwest_runtime: tokio::runtime::Runtime,
}

/// Handles a response returned from the upstream.
Expand Down Expand Up @@ -394,23 +392,25 @@ fn handle_response(
impl UpstreamRelay {
/// Creates a new `UpstreamRelay` instance.
pub fn new(config: Arc<Config>) -> Self {
let reqwest_client = match config.http_client() {
HttpClient::Actix => None,
HttpClient::Reqwest => Some((
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap(),
reqwest::ClientBuilder::new()
.connect_timeout(config.http_connection_timeout())
.timeout(config.http_timeout())
.gzip(true)
.trust_dns(true)
.build()
.unwrap(),
)),
};
let reqwest_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();

let reqwest_client = reqwest::ClientBuilder::new()
.connect_timeout(config.http_connection_timeout())
.timeout(config.http_timeout())
// In actix-web client this option could be set on a per-request basis. In reqwest
// this option can only be set per-client. For non-forwarded upstream requests that is
// desirable, so we have it enabled.
//
// In the forward endpoint, this means that content negotiation is done twice, and the
// response body is first decompressed by reqwest, then re-compressed by actix-web.
.gzip(true)
.trust_dns(true)
.build()
.unwrap();

UpstreamRelay {
auth_backoff: RetryBackoff::new(config.http_max_retry_interval()),
Expand All @@ -422,6 +422,8 @@ impl UpstreamRelay {
low_prio_requests: VecDeque::new(),
first_error: None,
config,
reqwest_runtime,

reqwest_client,
}
}
Expand Down Expand Up @@ -538,21 +540,9 @@ impl UpstreamRelay {
.http_host_header()
.unwrap_or_else(|| self.config.upstream_descriptor().host());

let mut builder = match self.reqwest_client {
None => {
let mut builder = ClientRequest::build();
builder.method(request.method.clone()).uri(uri);

RequestBuilder::actix(builder)
}
Some((ref _runtime, ref client)) => {
let method =
reqwest::Method::from_bytes(request.method.as_ref().as_bytes()).unwrap();
let builder = client.request(method, uri);

RequestBuilder::reqwest(builder)
}
};
let method = reqwest::Method::from_bytes(request.method.as_ref().as_bytes()).unwrap();
let builder = self.reqwest_client.request(method, uri);
let mut builder = RequestBuilder::reqwest(builder);

builder.header("Host", host_header.as_bytes());

Expand All @@ -578,46 +568,22 @@ impl UpstreamRelay {

request.send_start = Some(Instant::now());

let future = match client_request {
Request::Actix(client_request) => {
let future = client_request
.send()
.wait_timeout(self.config.event_buffer_expiry())
.conn_timeout(self.config.http_connection_timeout())
// This is the timeout after wait + connect.
.timeout(self.config.http_timeout())
.map_err(UpstreamSendRequestError::Actix)
.map_err(UpstreamRequestError::SendFailed)
.map(Response::Actix);

Box::new(future) as Box<dyn Future<Item = _, Error = _>>
}
Request::Reqwest(client_request) => {
let (runtime, client) = self
.reqwest_client
.as_ref()
.expect("Constructed request request without having a client.");

let client = client.clone();

let (tx, rx) = oneshot::channel();
runtime.spawn(async move {
let res = client
.execute(client_request)
.await
.map_err(UpstreamSendRequestError::Reqwest)
.map_err(UpstreamRequestError::SendFailed);
tx.send(res)
});

let future = rx
.map_err(|_| UpstreamRequestError::ChannelClosed)
.flatten()
.map(Response::Reqwest);

Box::new(future) as Box<dyn Future<Item = _, Error = _>>
}
};
let client = self.reqwest_client.clone();

let (tx, rx) = oneshot::channel();
self.reqwest_runtime.spawn(async move {
let res = client
.execute(client_request.0)
.await
.map_err(UpstreamSendRequestError)
.map_err(UpstreamRequestError::SendFailed);
tx.send(res)
});

let future = rx
.map_err(|_| UpstreamRequestError::ChannelClosed)
.flatten()
.map(Response);

let max_response_size = self.config.max_api_payload_size();

Expand Down
45 changes: 10 additions & 35 deletions relay-server/src/endpoints/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

use ::actix::prelude::*;
use actix_web::error::ResponseError;
use actix_web::http::{header, header::HeaderName, uri::PathAndQuery, ContentEncoding, StatusCode};
use actix_web::http::{header, header::HeaderName, uri::PathAndQuery, StatusCode};
use actix_web::{AsyncResponder, Error, HttpMessage, HttpRequest, HttpResponse};
use failure::Fail;
use futures::prelude::*;
Expand Down Expand Up @@ -155,17 +155,6 @@ pub fn forward_upstream(
builder.header(key, value);
}

// actix-web specific workarounds. We don't remember why no_default_headers was
// necessary, but we suspect that actix sends out headers twice instead of
// having them be overridden (user-agent?)
//
// Need for disabling decompression is demonstrated by the
// `test_forwarding_content_encoding` integration test.
if let RequestBuilder::Actix(ref mut builder) = builder {
builder.no_default_headers();
builder.disable_decompress();
}

builder.header("X-Forwarded-For", forwarded_for.as_ref());

builder
Expand All @@ -175,10 +164,9 @@ pub fn forward_upstream(
.transform(move |response: Response| {
let status = response.status();
let headers = response.clone_headers();
let is_actix = matches!(response, Response::Actix(_));
response
.bytes(max_response_size)
.and_then(move |body| Ok((is_actix, status, headers, body)))
.and_then(move |body| Ok((status, headers, body)))
.map_err(UpstreamRequestError::Http)
});

Expand All @@ -189,29 +177,9 @@ pub fn forward_upstream(
})
})
.and_then(move |result: Result<_, UpstreamRequestError>| {
let (is_actix, status, headers, body) =
result.map_err(ForwardedUpstreamRequestError::from)?;
let (status, headers, body) = result.map_err(ForwardedUpstreamRequestError::from)?;
let mut forwarded_response = HttpResponse::build(status);

if is_actix {
// For actix-web we called ClientRequestBuilder::disable_decompress(), therefore
// the response body is already compressed and the headers contain the correct
// content-encoding
//
// The content negotiation has effectively happened between *our* upstream and
// *our* client, with us just forwarding raw bytes.
//
// Set content-encoding to identity such that actix-web will not to compress again
forwarded_response.content_encoding(ContentEncoding::Identity);
} else {
// For reqwest the option to disable automatic response decompression can only be
// set per-client. For non-forwarded upstream requests that is desirable, so we
// keep it enabled.
//
// Essentially this means that content negotiation is done twice, and the response
// body is first decompressed by reqwest, then re-compressed by actix-web.
}

let mut has_content_type = false;

for (key, value) in headers {
Expand All @@ -227,6 +195,13 @@ pub fn forward_upstream(
forwarded_response.header(&key, &*value);
}

// For reqwest the option to disable automatic response decompression can only be
// set per-client. For non-forwarded upstream requests that is desirable, so we
// keep it enabled.
//
// Essentially this means that content negotiation is done twice, and the response
// body is first decompressed by reqwest, then re-compressed by actix-web.

Ok(if has_content_type {
forwarded_response.body(body)
} else {
Expand Down
Loading

0 comments on commit 5aa8930

Please sign in to comment.