diff --git a/rust/examples/ingestion/data.rs b/rust/examples/ingestion/data.rs index dd2ed4a8..8d0a784a 100644 --- a/rust/examples/ingestion/data.rs +++ b/rust/examples/ingestion/data.rs @@ -1,6 +1,5 @@ -use chrono::{DateTime, Utc}; +use chrono::Utc; use pbjson_types::Timestamp; -use rand::Rng; use sift_rs::{ ingest::v1::{ ingest_with_config_data_channel_value::Type, IngestWithConfigDataChannelValue, @@ -9,84 +8,54 @@ use sift_rs::{ ingestion_configs::v1::IngestionConfig, runs::v2::Run, }; -use std::{ - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use tokio::{ - sync::mpsc::{channel, Receiver}, + sync::mpsc::unbounded_channel, task::{self, JoinHandle}, + time, }; -use tokio_stream::Stream; +use tokio_stream::wrappers::UnboundedReceiverStream; -pub struct DataSource { +pub fn init_data_source( ingestion_config: IngestionConfig, run: Run, - source: Option>, - data_rx: Receiver<(DateTime, f64)>, -} - -impl DataSource { - pub fn new(ingestion_config: IngestionConfig, run: Run) -> Self { - let (data_tx, data_rx) = channel(1); - - let task_handler = task::spawn_blocking(move || { - let duration = Duration::from_secs(60); - let start = Instant::now(); - let mut rng = rand::thread_rng(); - - while Instant::now().duration_since(start) < duration { - data_tx - .blocking_send((Utc::now(), rng.gen_range(0.0..100.0))) - .unwrap(); - std::thread::sleep(Duration::from_millis(500)); - } - }); - - let source = Some(task_handler); - - Self { - ingestion_config, - run, - source, - data_rx, +) -> ( + JoinHandle<()>, + UnboundedReceiverStream, +) { + let (data_tx, data_rx) = unbounded_channel(); + + let data_source_handle = task::spawn(async move { + let duration = Duration::from_secs(60); + let start = Instant::now(); + + // Generating a nice sine wave + let mut t: f64 = 0.0; + let amplitude = 5.0; + let offset = 5.0; + let frequency = 0.1; + + while Instant::now().duration_since(start) < duration { + let value = amplitude * (t * frequency).sin() + offset; + let req = IngestWithConfigDataStreamRequest { + run_id: run.run_id.clone(), + ingestion_config_id: String::from(&ingestion_config.ingestion_config_id), + flow: String::from("velocity_reading"), + timestamp: Some(Timestamp::from(Utc::now())), + channel_values: vec![IngestWithConfigDataChannelValue { + r#type: Some(Type::Double(value)), + }], + // Set this flag to `true` only for debugging purposes to get real-time data validation from + // the Sift API. Do not use in production as it will hurt performance. + end_stream_on_validation_error: false, + ..Default::default() + }; + println!("Emitting value for velocity_reading"); + data_tx.send(req).unwrap(); + t += 1.0; + time::sleep(Duration::from_secs(1)).await; } - } -} - -impl Stream for DataSource { - type Item = IngestWithConfigDataStreamRequest; - - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - match self.data_rx.poll_recv(ctx) { - Poll::Ready(Some((ts, val))) => { - let req = IngestWithConfigDataStreamRequest { - run_id: self.run.run_id.clone(), - ingestion_config_id: String::from(&self.ingestion_config.ingestion_config_id), - flow: String::from("velocity_reading"), - timestamp: Some(Timestamp::from(ts)), - channel_values: vec![IngestWithConfigDataChannelValue { - r#type: Some(Type::Double(val)), - }], - // Set this flag to `true` only for debugging purposes to get real-time data validation from - // the Sift API. Do not use in production as it will hurt performance. - end_stream_on_validation_error: false, - ..Default::default() - }; - println!("Emitting value for velocity_reading"); - Poll::Ready(Some(req)) - } - Poll::Ready(_) => Poll::Ready(None), - _ => Poll::Pending, - } - } -} + }); -impl Drop for DataSource { - fn drop(&mut self) { - if let Some(handle) = self.source.take() { - handle.abort(); - } - } + (data_source_handle, UnboundedReceiverStream::new(data_rx)) } diff --git a/rust/examples/ingestion/main.rs b/rust/examples/ingestion/main.rs index 3d261c5c..50f7aa27 100644 --- a/rust/examples/ingestion/main.rs +++ b/rust/examples/ingestion/main.rs @@ -14,7 +14,6 @@ use std::{env, error::Error}; /// Simulates a data source mod data; -use data::DataSource; /// Name of the asset that we want to ingest data for. pub const ASSET_NAME: &str = "LV-426"; @@ -43,13 +42,14 @@ async fn main() -> Result<(), Box> { let run = create_run(grpc_channel.clone(), ASSET_NAME).await?; println!("initialized run {}", &run.name); - // Initialize the data source which is an impl Stream - let data_source = DataSource::new(ingestion_config, run); + let (data_source_handle, data_source_rx) = data::init_data_source(ingestion_config, run); IngestServiceClient::new(grpc_channel) - .ingest_with_config_data_stream(data_source) + .ingest_with_config_data_stream(data_source_rx) .await?; + data_source_handle.await?; + println!("done."); Ok(()) }