Skip to content

Commit

Permalink
enhancement(es sink): separate aws support in es & prometheus sink (v…
Browse files Browse the repository at this point in the history
…ectordotdev#18288)

* feat: separate aws support in es & prometheus sink

* remove redundant aws-core feature

* modify auth aws feature

* format code

* fix clippy
  • Loading branch information
suikammd authored Aug 31, 2023
1 parent 749594c commit e652ea4
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 151 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ sinks-datadog_events = []
sinks-datadog_logs = []
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
sinks-elasticsearch = ["transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
sinks-gcp = ["dep:base64", "gcp"]
sinks-greptimedb = ["dep:greptimedb-client"]
Expand All @@ -721,7 +721,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"]
sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
Expand Down Expand Up @@ -828,7 +828,7 @@ datadog-logs-integration-tests = ["sinks-datadog_logs"]
datadog-metrics-integration-tests = ["sinks-datadog_metrics"]
datadog-traces-integration-tests = ["sources-datadog_agent", "sinks-datadog_traces", "axum/tokio"]
docker-logs-integration-tests = ["sources-docker_logs", "unix"]
es-integration-tests = ["sinks-elasticsearch"]
es-integration-tests = ["sinks-elasticsearch", "aws-core"]
eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"]
fluent-integration-tests = ["docker", "sources-fluent"]
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/aws_kinesis/firehose/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use crate::{
aws::{create_client, AwsAuthentication, ImdsAuthentication, RegionOrEndpoint},
config::{ProxyConfig, SinkConfig, SinkContext},
sinks::{
elasticsearch::{BulkConfig, ElasticsearchAuth, ElasticsearchCommon, ElasticsearchConfig},
elasticsearch::{
BulkConfig, ElasticsearchAuthConfig, ElasticsearchCommon, ElasticsearchConfig,
},
util::{BatchConfig, Compression, TowerRequestConfig},
},
template::Template,
Expand Down Expand Up @@ -73,7 +75,7 @@ async fn firehose_put_records() {
sleep(Duration::from_secs(5)).await;

let config = ElasticsearchConfig {
auth: Some(ElasticsearchAuth::Aws(AwsAuthentication::Default {
auth: Some(ElasticsearchAuthConfig::Aws(AwsAuthentication::Default {
load_timeout_secs: Some(5),
imds: ImdsAuthentication::default(),
region: None,
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/axiom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vector_config::configurable_component;
use crate::{
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchConfig},
elasticsearch::{ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchConfig},
util::{http::RequestConfig, Compression},
Healthcheck, VectorSink,
},
Expand Down Expand Up @@ -95,7 +95,7 @@ impl SinkConfig for AxiomConfig {
let elasticsearch_config = ElasticsearchConfig {
endpoints: vec![self.build_endpoint()],
compression: self.compression,
auth: Some(ElasticsearchAuth::Basic {
auth: Some(ElasticsearchAuthConfig::Basic {
user: "axiom".to_string(),
password: self.token.clone(),
}),
Expand Down
103 changes: 47 additions & 56 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::collections::HashMap;

use aws_credential_types::provider::SharedCredentialsProvider;
use aws_types::region::Region;
use bytes::{Buf, Bytes};
use http::{Response, StatusCode, Uri};
use hyper::{body, Body};
Expand All @@ -15,11 +13,12 @@ use super::{
InvalidHostSnafu, Request,
};
use crate::{
http::{Auth, HttpClient, MaybeAuth},
http::{HttpClient, MaybeAuth},
sinks::{
elasticsearch::{
ElasticsearchAuth, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, ParseError,
},
util::auth::Auth,
util::{http::RequestConfig, TowerRequestConfig, UriSerde},
HealthcheckError,
},
Expand All @@ -31,12 +30,10 @@ use crate::{
pub struct ElasticsearchCommon {
pub base_url: String,
pub bulk_uri: Uri,
pub http_auth: Option<Auth>,
pub aws_auth: Option<SharedCredentialsProvider>,
pub auth: Option<Auth>,
pub mode: ElasticsearchCommonMode,
pub request_builder: ElasticsearchRequestBuilder,
pub tls_settings: TlsSettings,
pub region: Option<Region>,
pub request: RequestConfig,
pub query_params: HashMap<String, String>,
pub metric_to_log: MetricToLog,
Expand All @@ -61,31 +58,35 @@ impl ElasticsearchCommon {
.into());
}

let authorization = match &config.auth {
Some(ElasticsearchAuth::Basic { user, password }) => Some(Auth::Basic {
user: user.clone(),
password: password.clone(),
}),
_ => None,
};
let uri = endpoint.parse::<UriSerde>()?;
let http_auth = authorization.choose_one(&uri.auth)?;
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();

let aws_auth = match &config.auth {
Some(ElasticsearchAuth::Basic { .. }) | None => None,
Some(ElasticsearchAuth::Aws(aws)) => {
let auth = match &config.auth {
Some(ElasticsearchAuthConfig::Basic { user, password }) => {
let auth = Some(crate::http::Auth::Basic {
user: user.clone(),
password: password.clone(),
});
// basic auth must be some for now
let auth = auth.choose_one(&uri.auth)?.unwrap();
Some(Auth::Basic(auth))
}
#[cfg(feature = "aws-core")]
Some(ElasticsearchAuthConfig::Aws(aws)) => {
let region = config
.aws
.as_ref()
.map(|config| config.region())
.ok_or(ParseError::RegionRequired)?
.ok_or(ParseError::RegionRequired)?;

Some(aws.credentials_provider(region).await?)
Some(Auth::Aws {
credentials_provider: aws.credentials_provider(region.clone()).await?,
region,
})
}
None => None,
};

let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();

let mode = config.common_mode()?;

let tower_request = config
Expand Down Expand Up @@ -126,8 +127,6 @@ impl ElasticsearchCommon {
metric_config.metric_tag_values,
);

let region = config.aws.as_ref().and_then(|config| config.region());

let version = if let Some(version) = *version {
version
} else {
Expand All @@ -136,16 +135,7 @@ impl ElasticsearchCommon {
ElasticsearchApiVersion::V7 => 7,
ElasticsearchApiVersion::V8 => 8,
ElasticsearchApiVersion::Auto => {
match get_version(
&base_url,
&http_auth,
&aws_auth,
&region,
&request,
&tls_settings,
proxy_config,
)
.await
match get_version(&base_url, &auth, &request, &tls_settings, proxy_config).await
{
Ok(version) => {
debug!(message = "Auto-detected Elasticsearch API version.", %version);
Expand Down Expand Up @@ -195,15 +185,13 @@ impl ElasticsearchCommon {
};

Ok(Self {
http_auth,
auth,
base_url,
bulk_uri,
aws_auth,
mode,
request_builder,
query_params,
request,
region,
tls_settings,
metric_to_log,
})
Expand Down Expand Up @@ -248,9 +236,7 @@ impl ElasticsearchCommon {
pub async fn healthcheck(self, client: HttpClient) -> crate::Result<()> {
match get(
&self.base_url,
&self.http_auth,
&self.aws_auth,
&self.region,
&self.auth,
&self.request,
client,
"/_cluster/health",
Expand All @@ -264,19 +250,18 @@ impl ElasticsearchCommon {
}
}

#[cfg(feature = "aws-core")]
pub async fn sign_request(
request: &mut http::Request<Bytes>,
credentials_provider: &SharedCredentialsProvider,
region: &Option<Region>,
credentials_provider: &aws_credential_types::provider::SharedCredentialsProvider,
region: &Option<aws_types::region::Region>,
) -> crate::Result<()> {
crate::aws::sign_request("es", request, credentials_provider, region).await
}

async fn get_version(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
auth: &Option<Auth>,
request: &RequestConfig,
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
Expand All @@ -291,7 +276,7 @@ async fn get_version(
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
let response = get(base_url, auth, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

Expand All @@ -314,28 +299,34 @@ async fn get_version(

async fn get(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
auth: &Option<Auth>,
request: &RequestConfig,
client: HttpClient,
path: &str,
) -> crate::Result<Response<Body>> {
let mut builder = Request::get(format!("{}{}", base_url, path));

if let Some(authorization) = &http_auth {
builder = authorization.apply_builder(builder);
}

for (header, value) in &request.headers {
builder = builder.header(&header[..], &value[..]);
}

let mut request = builder.body(Bytes::new())?;

if let Some(credentials_provider) = aws_auth {
sign_request(&mut request, credentials_provider, region).await?;
if let Some(auth) = auth {
match auth {
Auth::Basic(http_auth) => {
http_auth.apply(&mut request);
}
#[cfg(feature = "aws-core")]
Auth::Aws {
credentials_provider: provider,
region,
} => {
let region = region.clone();
sign_request(&mut request, provider, &Some(region)).await?;
}
}
}

client
.send(request.map(hyper::Body::from))
.await
Expand Down
9 changes: 5 additions & 4 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use futures::{FutureExt, TryFutureExt};
use vector_config::configurable_component;

use crate::{
aws::RegionOrEndpoint,
codecs::Transformer,
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
event::{EventRef, LogEvent, Value},
Expand All @@ -19,7 +18,7 @@ use crate::{
retry::ElasticsearchRetryLogic,
service::{ElasticsearchService, HttpRequestBuilder},
sink::ElasticsearchSink,
ElasticsearchApiVersion, ElasticsearchAuth, ElasticsearchCommon,
ElasticsearchApiVersion, ElasticsearchAuthConfig, ElasticsearchCommon,
ElasticsearchCommonMode, ElasticsearchMode,
},
util::{
Expand Down Expand Up @@ -142,7 +141,7 @@ pub struct ElasticsearchConfig {
pub request: RequestConfig,

#[configurable(derived)]
pub auth: Option<ElasticsearchAuth>,
pub auth: Option<ElasticsearchAuthConfig>,

/// Custom parameters to add to the query string for each HTTP request sent to Elasticsearch.
#[serde(default)]
Expand All @@ -153,7 +152,8 @@ pub struct ElasticsearchConfig {

#[serde(default)]
#[configurable(derived)]
pub aws: Option<RegionOrEndpoint>,
#[cfg(feature = "aws-core")]
pub aws: Option<crate::aws::RegionOrEndpoint>,

#[serde(default)]
#[configurable(derived)]
Expand Down Expand Up @@ -215,6 +215,7 @@ impl Default for ElasticsearchConfig {
request: Default::default(),
auth: None,
query: None,
#[cfg(feature = "aws-core")]
aws: None,
tls: None,
endpoint_health: None,
Expand Down
Loading

0 comments on commit e652ea4

Please sign in to comment.