Skip to content

Commit

Permalink
feat: separate aws support in es & prometheus sink
Browse files Browse the repository at this point in the history
  • Loading branch information
suikammd committed Aug 21, 2023
1 parent d3a6235 commit 986316d
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 209 deletions.
426 changes: 232 additions & 194 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ sinks-datadog_events = []
sinks-datadog_logs = []
sinks-datadog_metrics = ["protobuf-build", "dep:prost-reflect"]
sinks-datadog_traces = ["protobuf-build", "dep:rmpv", "dep:rmp-serde", "dep:serde_bytes"]
sinks-elasticsearch = ["aws-core", "transforms-metric_to_log"]
sinks-elasticsearch = ["transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
sinks-gcp = ["dep:base64", "gcp"]
sinks-greptimedb = ["dep:greptimedb-client"]
Expand All @@ -714,7 +714,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["aws-core", "dep:base64", "dep:prometheus-parser", "dep:snap"]
sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
sinks-redis = ["dep:redis"]
sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"]
Expand Down Expand Up @@ -821,7 +821,7 @@ datadog-logs-integration-tests = ["sinks-datadog_logs"]
datadog-metrics-integration-tests = ["sinks-datadog_metrics"]
datadog-traces-integration-tests = ["sources-datadog_agent", "sinks-datadog_traces", "axum/tokio"]
docker-logs-integration-tests = ["sources-docker_logs", "unix"]
es-integration-tests = ["sinks-elasticsearch"]
es-integration-tests = ["sinks-elasticsearch", "aws-core"]
eventstoredb_metrics-integration-tests = ["sources-eventstoredb_metrics"]
fluent-integration-tests = ["docker", "sources-fluent"]
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
Expand Down
42 changes: 34 additions & 8 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;

#[cfg(feature = "aws-core")]
use aws_credential_types::provider::SharedCredentialsProvider;
#[cfg(feature = "aws-core")]
use aws_types::region::Region;
use bytes::{Buf, Bytes};
use http::{Response, StatusCode, Uri};
Expand Down Expand Up @@ -32,10 +34,12 @@ pub struct ElasticsearchCommon {
pub base_url: String,
pub bulk_uri: Uri,
pub http_auth: Option<Auth>,
#[cfg(feature = "aws-core")]
pub aws_auth: Option<SharedCredentialsProvider>,
pub mode: ElasticsearchCommonMode,
pub request_builder: ElasticsearchRequestBuilder,
pub tls_settings: TlsSettings,
#[cfg(feature = "aws-core")]
pub region: Option<Region>,
pub request: RequestConfig,
pub query_params: HashMap<String, String>,
Expand Down Expand Up @@ -72,6 +76,7 @@ impl ElasticsearchCommon {
let http_auth = authorization.choose_one(&uri.auth)?;
let base_url = uri.uri.to_string().trim_end_matches('/').to_owned();

#[cfg(feature = "aws-core")]
let aws_auth = match &config.auth {
Some(ElasticsearchAuth::Basic { .. }) | None => None,
Some(ElasticsearchAuth::Aws(aws)) => {
Expand Down Expand Up @@ -126,6 +131,7 @@ impl ElasticsearchCommon {
metric_config.metric_tag_values,
);

#[cfg(feature = "aws-core")]
let region = config.aws.as_ref().and_then(|config| config.region());

let version = if let Some(version) = *version {
Expand All @@ -139,7 +145,9 @@ impl ElasticsearchCommon {
match get_version(
&base_url,
&http_auth,
#[cfg(feature = "aws-core")]
&aws_auth,
#[cfg(feature = "aws-core")]
&region,
&request,
&tls_settings,
Expand Down Expand Up @@ -198,11 +206,13 @@ impl ElasticsearchCommon {
http_auth,
base_url,
bulk_uri,
#[cfg(feature = "aws-core")]
aws_auth,
mode,
request_builder,
query_params,
request,
#[cfg(feature = "aws-core")]
region,
tls_settings,
metric_to_log,
Expand Down Expand Up @@ -249,7 +259,9 @@ impl ElasticsearchCommon {
match get(
&self.base_url,
&self.http_auth,
#[cfg(feature = "aws-core")]
&self.aws_auth,
#[cfg(feature = "aws-core")]
&self.region,
&self.request,
client,
Expand All @@ -264,6 +276,7 @@ impl ElasticsearchCommon {
}
}

#[cfg(feature = "aws-core")]
pub async fn sign_request(
request: &mut http::Request<Bytes>,
credentials_provider: &SharedCredentialsProvider,
Expand All @@ -275,8 +288,8 @@ pub async fn sign_request(
async fn get_version(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
#[cfg(feature = "aws-core")] aws_auth: &Option<SharedCredentialsProvider>,
#[cfg(feature = "aws-core")] region: &Option<Region>,
request: &RequestConfig,
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
Expand All @@ -291,9 +304,19 @@ async fn get_version(
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
let response = get(
base_url,
http_auth,
#[cfg(feature = "aws-core")]
aws_auth,
#[cfg(feature = "aws-core")]
region,
request,
client,
"/",
)
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

let (_, body) = response.into_parts();
let mut body = body::aggregate(body).await?;
Expand All @@ -315,8 +338,8 @@ async fn get_version(
async fn get(
base_url: &str,
http_auth: &Option<Auth>,
aws_auth: &Option<SharedCredentialsProvider>,
region: &Option<Region>,
#[cfg(feature = "aws-core")] aws_auth: &Option<SharedCredentialsProvider>,
#[cfg(feature = "aws-core")] region: &Option<Region>,
request: &RequestConfig,
client: HttpClient,
path: &str,
Expand All @@ -331,8 +354,11 @@ async fn get(
builder = builder.header(&header[..], &value[..]);
}

let mut request = builder.body(Bytes::new())?;
let request = builder.body(Bytes::new())?;
#[cfg(feature = "aws-core")]
let mut request = request;

#[cfg(feature = "aws-core")]
if let Some(credentials_provider) = aws_auth {
sign_request(&mut request, credentials_provider, region).await?;
}
Expand Down
5 changes: 4 additions & 1 deletion src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::{
use futures::{FutureExt, TryFutureExt};
use vector_config::configurable_component;

#[cfg(feature = "aws-core")]
use crate::aws::RegionOrEndpoint;
use crate::{
aws::RegionOrEndpoint,
codecs::Transformer,
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
event::{EventRef, LogEvent, Value},
Expand Down Expand Up @@ -153,6 +154,7 @@ pub struct ElasticsearchConfig {

#[serde(default)]
#[configurable(derived)]
#[cfg(feature = "aws-core")]
pub aws: Option<RegionOrEndpoint>,

#[serde(default)]
Expand Down Expand Up @@ -215,6 +217,7 @@ impl Default for ElasticsearchConfig {
request: Default::default(),
auth: None,
query: None,
#[cfg(feature = "aws-core")]
aws: None,
tls: None,
endpoint_health: None,
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/elasticsearch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use snafu::Snafu;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;

#[cfg(feature = "aws-core")]
use crate::aws::AwsAuthentication;
use crate::{
event::{EventRef, LogEvent},
Expand All @@ -50,6 +51,7 @@ pub enum ElasticsearchAuth {
password: SensitiveString,
},

#[cfg(feature = "aws-core")]
/// Amazon OpenSearch Service-specific authentication.
Aws(AwsAuthentication),
}
Expand Down Expand Up @@ -210,6 +212,7 @@ pub enum ParseError {
IndexTemplate { source: TemplateParseError },
#[snafu(display("Batch action template parse error: {}", source))]
BatchActionTemplate { source: TemplateParseError },
#[cfg(feature = "aws-core")]
#[snafu(display("aws.region required when AWS authentication is in use"))]
RegionRequired,
#[snafu(display("Endpoints option must be specified"))]
Expand Down
13 changes: 12 additions & 1 deletion src/sinks/elasticsearch/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::{
task::{Context, Poll},
};

#[cfg(feature = "aws-core")]
use aws_credential_types::provider::SharedCredentialsProvider;
#[cfg(feature = "aws-core")]
use aws_types::region::Region;
use bytes::Bytes;
use futures::future::BoxFuture;
Expand All @@ -17,6 +19,7 @@ use vector_common::{
};
use vector_core::{stream::DriverResponse, ByteSizeOf};

#[cfg(feature = "aws-core")]
use crate::sinks::elasticsearch::sign_request;
use crate::{
event::{EventFinalizers, EventStatus, Finalizable},
Expand Down Expand Up @@ -96,10 +99,12 @@ impl ElasticsearchService {
pub struct HttpRequestBuilder {
pub bulk_uri: Uri,
pub query_params: HashMap<String, String>,
#[cfg(feature = "aws-core")]
pub region: Option<Region>,
pub compression: Compression,
pub http_request_config: RequestConfig,
pub http_auth: Option<Auth>,
#[cfg(feature = "aws-core")]
pub credentials_provider: Option<SharedCredentialsProvider>,
}

Expand All @@ -110,8 +115,10 @@ impl HttpRequestBuilder {
http_request_config: config.request.clone(),
http_auth: common.http_auth.clone(),
query_params: common.query_params.clone(),
#[cfg(feature = "aws-core")]
region: common.region.clone(),
compression: config.compression,
#[cfg(feature = "aws-core")]
credentials_provider: common.aws_auth.clone(),
}
}
Expand Down Expand Up @@ -140,10 +147,14 @@ impl HttpRequestBuilder {
builder = auth.apply_builder(builder);
}

let mut request = builder
let request = builder
.body(es_req.payload)
.expect("Invalid http request value used");

#[cfg(feature = "aws-core")]
let mut request = request;

#[cfg(feature = "aws-core")]
if let Some(credentials_provider) = &self.credentials_provider {
sign_request(&mut request, credentials_provider, &self.region).await?;
}
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) mod remote_write;

use vector_config::configurable_component;

#[cfg(feature = "aws-core")]
use crate::aws::AwsAuthentication;

/// Authentication strategies.
Expand All @@ -33,6 +34,7 @@ pub enum PrometheusRemoteWriteAuth {
token: SensitiveString,
},

#[cfg(feature = "aws-core")]
/// Amazon Prometheus Service-specific authentication.
Aws(AwsAuthentication),
}
Expand Down
12 changes: 10 additions & 2 deletions src/sinks/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use vector_config::configurable_component;
use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf};

use super::collector::{self, MetricCollector as _};
#[cfg(feature = "aws-core")]
use crate::aws::RegionOrEndpoint;
use crate::{
aws::RegionOrEndpoint,
config::{self, AcknowledgementsConfig, Input, SinkConfig},
event::{Event, Metric},
http::{Auth, HttpClient},
Expand Down Expand Up @@ -48,6 +49,7 @@ impl SinkBatchSettings for PrometheusRemoteWriteDefaultBatchSettings {
enum Errors {
#[snafu(display(r#"Prometheus remote_write sink cannot accept "set" metrics"#))]
SetMetricInvalid,
#[cfg(feature = "aws-core")]
#[snafu(display("aws.region required when AWS authentication is in use"))]
AwsRegionRequired,
}
Expand Down Expand Up @@ -116,6 +118,7 @@ pub struct RemoteWriteConfig {
#[configurable(derived)]
pub auth: Option<PrometheusRemoteWriteAuth>,

#[cfg(feature = "aws-core")]
#[configurable(derived)]
#[configurable(metadata(docs::advanced))]
pub aws: Option<RegionOrEndpoint>,
Expand Down Expand Up @@ -194,6 +197,7 @@ impl SinkConfig for RemoteWriteConfig {
None,
None,
),
#[cfg(feature = "aws-core")]
Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
let region = self
.aws
Expand Down Expand Up @@ -406,11 +410,14 @@ impl HttpRequestBuilder {
builder = builder.header("X-Scope-OrgID", tenant_id);
}

let mut request = builder.body(body.into()).unwrap();
let request = builder.body(body.into()).unwrap();
#[cfg(feature = "aws-core")]
let mut request = request;
if let Some(http_auth) = &self.http_auth {
http_auth.apply(&mut request);
}

#[cfg(feature = "aws-core")]
if let Some(credentials_provider) = &self.credentials_provider {
sign_request(&mut request, credentials_provider, &self.aws_region).await?;
}
Expand Down Expand Up @@ -440,6 +447,7 @@ fn compress_block(compression: Compression, data: Bytes) -> Vec<u8> {
}
}

#[cfg(feature = "aws-core")]
async fn sign_request(
request: &mut http::Request<Bytes>,
credentials_provider: &SharedCredentialsProvider,
Expand Down

0 comments on commit 986316d

Please sign in to comment.