Skip to content

Commit

Permalink
chore(transforms): add TransformConfig::build_async (vectordotdev#3138)
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <[email protected]>
Signed-off-by: Brian Menges <[email protected]>
  • Loading branch information
lukesteensen authored and Brian Menges committed Dec 9, 2020
1 parent 43861aa commit f32ad73
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn build_pieces(
let cx = TransformContext { resolver };

let input_type = transform.inner.input_type();
let transform = match transform.inner.build(cx) {
let transform = match transform.inner.build_async(cx).await {
Err(error) => {
errors.push(format!("Transform \"{}\": {}", name, error));
continue;
Expand Down
8 changes: 8 additions & 0 deletions src/topology/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,18 @@ pub struct TransformOuter {
pub inner: Box<dyn TransformConfig>,
}

#[async_trait::async_trait]
#[typetag::serde(tag = "type")]
pub trait TransformConfig: core::fmt::Debug + Send + Sync {
fn build(&self, cx: TransformContext) -> crate::Result<Box<dyn transforms::Transform>>;

async fn build_async(
&self,
cx: TransformContext,
) -> crate::Result<Box<dyn transforms::Transform>> {
self.build(cx)
}

fn input_type(&self) -> DataType;

fn output_type(&self) -> DataType;
Expand Down
47 changes: 35 additions & 12 deletions src/transforms/aws_ec2_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ inventory::submit! {
TransformDescription::new_without_default::<Ec2Metadata>("aws_ec2_metadata")
}

#[async_trait::async_trait]
#[typetag::serde(name = "aws_ec2_metadata")]
impl TransformConfig for Ec2Metadata {
fn build(&self, cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
fn build(&self, _cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
unimplemented!()
}

async fn build_async(&self, cx: TransformContext) -> crate::Result<Box<dyn Transform>> {
let (read, write) = evmap::new();

// Check if the namespace is set to `""` which should mean that we do
Expand Down Expand Up @@ -147,11 +152,13 @@ impl TransformConfig for Ec2Metadata {

let http_client = HttpClient::new(cx.resolver(), None)?;

let mut client =
MetadataClient::new(http_client, host, keys, write, refresh_interval, fields);

client.refresh_metadata().await?;

tokio::spawn(
async move {
let mut client =
MetadataClient::new(http_client, host, keys, write, refresh_interval, fields);

client.run().await;
}
// TODO: Once #1338 is done we can fetch the current span
Expand Down Expand Up @@ -513,8 +520,12 @@ mod integration_tests {
host: Some(HOST.clone()),
..Default::default()
};
let mut transform =
rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() });
let mut transform = rt.block_on_std(async move {
config
.build_async(TransformContext::new_test())
.await
.unwrap()
});

// We need to sleep to let the background task fetch the data.
std::thread::sleep(std::time::Duration::from_secs(1));
Expand Down Expand Up @@ -562,8 +573,12 @@ mod integration_tests {
fields: Some(vec!["public-ipv4".into(), "region".into()]),
..Default::default()
};
let mut transform =
rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() });
let mut transform = rt.block_on_std(async move {
config
.build_async(TransformContext::new_test())
.await
.unwrap()
});

// We need to sleep to let the background task fetch the data.
std::thread::sleep(std::time::Duration::from_secs(1));
Expand Down Expand Up @@ -593,8 +608,12 @@ mod integration_tests {
namespace: Some("ec2.metadata".into()),
..Default::default()
};
let mut transform =
rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() });
let mut transform = rt.block_on_std(async move {
config
.build_async(TransformContext::new_test())
.await
.unwrap()
});

// We need to sleep to let the background task fetch the data.
std::thread::sleep(std::time::Duration::from_secs(1));
Expand All @@ -619,8 +638,12 @@ mod integration_tests {
namespace: Some("".into()),
..Default::default()
};
let mut transform =
rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() });
let mut transform = rt.block_on_std(async move {
config
.build_async(TransformContext::new_test())
.await
.unwrap()
});

// We need to sleep to let the background task fetch the data.
std::thread::sleep(std::time::Duration::from_secs(1));
Expand Down

0 comments on commit f32ad73

Please sign in to comment.