Skip to content

Commit

Permalink
feat: separate aws support in es & prometheus sink
Browse files Browse the repository at this point in the history
  • Loading branch information
suikammd committed Aug 22, 2023
1 parent 61c0ae8 commit c543d9d
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 304 deletions.
426 changes: 232 additions & 194 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ path = "src/config/loading/secret_backend_example.rs"
test = false
bench = false

# Debug symbols end up chewing up several GB of disk space, so better to just
# disable them.
[profile.dev]
debug = false

# CI-based builds use full release optimization. See scripts/environment/release-flags.sh.
# This results in roughly a 5% reduction in performance when compiling locally vs when
# compiled via the CI pipeline.
Expand Down Expand Up @@ -696,7 +701,7 @@ sinks-datadog_events = []
sinks-datadog_logs = []
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
sinks-elasticsearch = ["transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
sinks-gcp = ["dep:base64", "gcp"]
sinks-greptimedb = ["dep:greptimedb-client"]
Expand All @@ -711,7 +716,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"]
sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
Expand Down Expand Up @@ -818,7 +823,7 @@ datadog-logs-integration-tests = ["sinks-datadog_logs"]
datadog-metrics-integration-tests = ["sinks-datadog_metrics"]
datadog-traces-integration-tests = ["sources-datadog_agent", "sinks-datadog_traces", "axum/tokio"]
docker-logs-integration-tests = ["sources-docker_logs", "unix"]
es-integration-tests = ["sinks-elasticsearch"]
es-integration-tests = ["sinks-elasticsearch", "aws-core"]
eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"]
fluent-integration-tests = ["docker", "sources-fluent"]
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/axiom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vector_config::configurable_component;
use crate::{
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchConfig},
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchConfig},
util::{http::RequestConfig, Compression},
Healthcheck, VectorSink,
},
Expand Down Expand Up @@ -95,7 +95,7 @@ impl SinkConfig for AxiomConfig {
let elasticsearch_config = ElasticsearchConfig {
endpoints: vec![self.build_endpoint()],
compression: self.compression,
auth: Some(ElasticsearchAuth::Basic {
auth: Some(ElasticsearchAuthConfig::Basic {
user: "axiom".to_string(),
password: self.token.clone(),
}),
Expand Down
96 changes: 44 additions & 52 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;

#[cfg(feature = "aws-core")]
use aws_credential_types::provider::SharedCredentialsProvider;
#[cfg(feature = "aws-core")]
use aws_types::region::Region;
use bytes::{Buf, Bytes};
use http::{Response, StatusCode, Uri};
Expand All @@ -15,11 +17,12 @@ use super::{
InvalidHostSnafu, Request,
};
use crate::{
http::{Auth, HttpClient, MaybeAuth},
http::{HttpClient, MaybeAuth},
sinks::{
elasticsearch::{
ElasticsearchAuth, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
},
util::auth::Auth,
util::{http::RequestConfig, TowerRequestConfig, UriSerde},
HealthcheckError,
},
Expand All @@ -31,12 +34,10 @@ use crate::{
pub struct ElasticsearchCommon {
pub base_url: String,
pub bulk_uri: Uri,
pub http_auth: Option<Auth>,
pub aws_auth: Option<SharedCredentialsProvider>,
pub auth: Option<Auth>,
pub mode: ElasticsearchCommonMode,
pub request_builder: ElasticsearchRequestBuilder,
pub tls_settings: TlsSettings,
pub region: Option<Region>,
pub request: RequestConfig,
pub query_params: HashMap<String, String>,
pub metric_to_log: MetricToLog,
Expand All @@ -61,31 +62,35 @@ impl ElasticsearchCommon {
.into());
}

let authorization = match &config.auth {
Some(ElasticsearchAuth::Basic { user, password }) => Some(Auth::Basic {
user: user.clone(),
password: password.clone(),
}),
_ => None,
};
let uri = endpoint.parse::<UriSerde>()?;
let http_auth = authorization.choose_one(&uri.auth)?;
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();

let aws_auth = match &config.auth {
Some(ElasticsearchAuth::Basic { .. }) | None => None,
Some(ElasticsearchAuth::Aws(aws)) => {
let auth = match &config.auth {
Some(ElasticsearchAuthConfig::Basic { user, password }) => {
let auth = Some(crate::http::Auth::Basic {
user: user.clone(),
password: password.clone(),
});
// basic auth must be some for now
let auth = auth.choose_one(&uri.auth)?.unwrap();
Some(Auth::Basic(auth))
}
#[cfg(feature = "aws-core")]
Some(ElasticsearchAuthConfig::Aws(aws)) => {
let region = config
.aws
.as_ref()
.map(|config| config.region())
.ok_or(ParseError::RegionRequired)?
.ok_or(ParseError::RegionRequired)?;

Some(aws.credentials_provider(region).await?)
Some(Auth::Aws {
provider: aws.credentials_provider(region.clone()).await?,
region,
})
}
None => None,
};

let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();

let mode = config.common_mode()?;

let tower_request = config
Expand Down Expand Up @@ -126,8 +131,6 @@ impl ElasticsearchCommon {
metric_config.metric_tag_values,
);

let region = config.aws.as_ref().and_then(|config| config.region());

let version = if let Some(version) = *version {
version
} else {
Expand All @@ -136,16 +139,7 @@ impl ElasticsearchCommon {
ElasticsearchApiVersion::V7 => 7,
ElasticsearchApiVersion::V8 => 8,
ElasticsearchApiVersion::Auto => {
match get_version(
&base_url,
&http_auth,
&aws_auth,
&region,
&request,
&tls_settings,
proxy_config,
)
.await
match get_version(&base_url, &auth, &request, &tls_settings, proxy_config).await
{
Ok(version) => {
debug!(message = "Auto-detected Elasticsearch API version.", %version);
Expand Down Expand Up @@ -195,15 +189,13 @@ impl ElasticsearchCommon {
};

Ok(Self {
http_auth,
auth,
base_url,
bulk_uri,
aws_auth,
mode,
request_builder,
query_params,
request,
region,
tls_settings,
metric_to_log,
})
Expand Down Expand Up @@ -248,9 +240,7 @@ impl ElasticsearchCommon {
pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
match get(
&self.base_url,
&self.http_auth,
&self.aws_auth,
&self.region,
&self.auth,
&self.request,
client,
"/_cluster/health",
Expand All @@ -264,6 +254,7 @@ impl ElasticsearchCommon {
}
}

#[cfg(feature = "aws-core")]
pub async fn sign_request(
request: &mut http::Request<Bytes>,
credentials_provider: &SharedCredentialsProvider,
Expand All @@ -274,9 +265,7 @@ pub async fn sign_request(

async fn get_version(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
auth: &Option<Auth>,
request: &RequestConfig,
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
Expand All @@ -291,7 +280,7 @@ async fn get_version(
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
let response = get(base_url, auth, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

Expand All @@ -314,28 +303,31 @@ async fn get_version(

async fn get(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
auth: &Option<Auth>,
request: &RequestConfig,
client: HttpClient,
path: &str,
) -> crate::Result<Response<Body>> {
let mut builder = Request::get(format!("{}{}", base_url, path));

if let Some(authorization) = &http_auth {
builder = authorization.apply_builder(builder);
}

for (header, value) in &request.headers {
builder = builder.header(&header[..], &value[..]);
}

let mut request = builder.body(Bytes::new())?;

if let Some(credentials_provider) = aws_auth {
sign_request(&mut request, credentials_provider, region).await?;
if let Some(auth) = auth {
match auth {
Auth::Basic(http_auth) => {
http_auth.apply(&mut request);
}
#[cfg(feature = "aws-core")]
Auth::Aws { provider, region } => {
let region = region.clone();
sign_request(&mut request, provider, &Some(region)).await?;
}
}
}

client
.send(request.map(hyper::Body::from))
.await
Expand Down
9 changes: 6 additions & 3 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::{
use futures::{FutureExt, TryFutureExt};
use vector_config::configurable_component;

#[cfg(feature = "aws-core")]
use crate::aws::RegionOrEndpoint;
use crate::{
aws::RegionOrEndpoint,
codecs::Transformer,
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
event::{EventRef, LogEvent, Value},
Expand All @@ -19,7 +20,7 @@ use crate::{
retry::ElasticsearchRetryLogic,
service::{ElasticsearchService, HttpRequestBuilder},
sink::ElasticsearchSink,
ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchCommon,
ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
ElasticsearchCommonMode, ElasticsearchMode,
},
util::{
Expand Down Expand Up @@ -142,7 +143,7 @@ pub struct ElasticsearchConfig {
pub request: RequestConfig,

#[configurable(derived)]
pub auth: Option<ElasticsearchAuth>,
pub auth: Option<ElasticsearchAuthConfig>,

/// Custom parameters to add to the query string for each HTTP request sent to Elasticsearch.
#[serde(default)]
Expand All @@ -153,6 +154,7 @@ pub struct ElasticsearchConfig {

#[serde(default)]
#[configurable(derived)]
#[cfg(feature = "aws-core")]
pub aws: Option<RegionOrEndpoint>,

#[serde(default)]
Expand Down Expand Up @@ -215,6 +217,7 @@ impl Default for ElasticsearchConfig {
request: Default::default(),
auth: None,
query: None,
#[cfg(feature = "aws-core")]
aws: None,
tls: None,
endpoint_health: None,
Expand Down
5 changes: 4 additions & 1 deletion src/sinks/elasticsearch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use snafu::Snafu;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;

#[cfg(feature = "aws-core")]
use crate::aws::AwsAuthentication;
use crate::{
event::{EventRef, LogEvent},
Expand All @@ -36,7 +37,7 @@ use crate::{
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))]
pub enum ElasticsearchAuth {
pub enum ElasticsearchAuthConfig {
/// HTTP Basic Authentication.
Basic {
/// Basic authentication username.
Expand All @@ -50,6 +51,7 @@ pub enum ElasticsearchAuth {
password: SensitiveString,
},

#[cfg(feature = "aws-core")]
/// Amazon OpenSearch Service-specific authentication.
Aws(AwsAuthentication),
}
Expand Down Expand Up @@ -210,6 +212,7 @@ pub enum ParseError {
IndexTemplate { source: TemplateParseError },
#[snafu(display("Batch action template parse error: {}", source))]
BatchActionTemplate { source: TemplateParseError },
#[cfg(feature = "aws-core")]
#[snafu(display("aws.region required when AWS authentication is in use"))]
RegionRequired,
#[snafu(display("Endpoints option must be specified"))]
Expand Down
Loading

0 comments on commit c543d9d

Please sign in to comment.