diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 9e1e554943fc8..c95d3916569cf 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -177,7 +177,7 @@ pub async fn build_pieces( let cx = TransformContext { resolver }; let input_type = transform.inner.input_type(); - let transform = match transform.inner.build(cx) { + let transform = match transform.inner.build_async(cx).await { Err(error) => { errors.push(format!("Transform \"{}\": {}", name, error)); continue; diff --git a/src/topology/config/mod.rs b/src/topology/config/mod.rs index e6e1b3f5e4ce8..ca297f935a929 100644 --- a/src/topology/config/mod.rs +++ b/src/topology/config/mod.rs @@ -204,10 +204,18 @@ pub struct TransformOuter { pub inner: Box, } +#[async_trait::async_trait] #[typetag::serde(tag = "type")] pub trait TransformConfig: core::fmt::Debug + Send + Sync { fn build(&self, cx: TransformContext) -> crate::Result>; + async fn build_async( + &self, + cx: TransformContext, + ) -> crate::Result> { + self.build(cx) + } + fn input_type(&self) -> DataType; fn output_type(&self) -> DataType; diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index 24d317a743c26..de8dc2179f4df 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -112,9 +112,14 @@ inventory::submit! { TransformDescription::new_without_default::("aws_ec2_metadata") } +#[async_trait::async_trait] #[typetag::serde(name = "aws_ec2_metadata")] impl TransformConfig for Ec2Metadata { - fn build(&self, cx: TransformContext) -> crate::Result> { + fn build(&self, _cx: TransformContext) -> crate::Result> { + unimplemented!() + } + + async fn build_async(&self, cx: TransformContext) -> crate::Result> { let (read, write) = evmap::new(); // Check if the namespace is set to `""` which should mean that we do @@ -147,11 +152,13 @@ impl TransformConfig for Ec2Metadata { let http_client = HttpClient::new(cx.resolver(), None)?; + let mut client = + MetadataClient::new(http_client, host, keys, write, refresh_interval, fields); + + client.refresh_metadata().await?; + tokio::spawn( async move { - let mut client = - MetadataClient::new(http_client, host, keys, write, refresh_interval, fields); - client.run().await; } // TODO: Once #1338 is done we can fetch the current span @@ -513,8 +520,12 @@ mod integration_tests { host: Some(HOST.clone()), ..Default::default() }; - let mut transform = - rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() }); + let mut transform = rt.block_on_std(async move { + config + .build_async(TransformContext::new_test()) + .await + .unwrap() + }); // We need to sleep to let the background task fetch the data. std::thread::sleep(std::time::Duration::from_secs(1)); @@ -562,8 +573,12 @@ mod integration_tests { fields: Some(vec!["public-ipv4".into(), "region".into()]), ..Default::default() }; - let mut transform = - rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() }); + let mut transform = rt.block_on_std(async move { + config + .build_async(TransformContext::new_test()) + .await + .unwrap() + }); // We need to sleep to let the background task fetch the data. std::thread::sleep(std::time::Duration::from_secs(1)); @@ -593,8 +608,12 @@ mod integration_tests { namespace: Some("ec2.metadata".into()), ..Default::default() }; - let mut transform = - rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() }); + let mut transform = rt.block_on_std(async move { + config + .build_async(TransformContext::new_test()) + .await + .unwrap() + }); // We need to sleep to let the background task fetch the data. std::thread::sleep(std::time::Duration::from_secs(1)); @@ -619,8 +638,12 @@ mod integration_tests { namespace: Some("".into()), ..Default::default() }; - let mut transform = - rt.block_on_std(async move { config.build(TransformContext::new_test()).unwrap() }); + let mut transform = rt.block_on_std(async move { + config + .build_async(TransformContext::new_test()) + .await + .unwrap() + }); // We need to sleep to let the background task fetch the data. std::thread::sleep(std::time::Duration::from_secs(1));