Skip to content

Commit

Permalink
simplify example
Browse files Browse the repository at this point in the history
  • Loading branch information
solidiquis committed Jan 31, 2025
1 parent 84f1f33 commit ff5ad24
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 78 deletions.
117 changes: 43 additions & 74 deletions rust/examples/ingestion/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use chrono::{DateTime, Utc};
use chrono::Utc;
use pbjson_types::Timestamp;
use rand::Rng;
use sift_rs::{
ingest::v1::{
ingest_with_config_data_channel_value::Type, IngestWithConfigDataChannelValue,
Expand All @@ -9,84 +8,54 @@ use sift_rs::{
ingestion_configs::v1::IngestionConfig,
runs::v2::Run,
};
use std::{
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use tokio::{
sync::mpsc::{channel, Receiver},
sync::mpsc::unbounded_channel,
task::{self, JoinHandle},
time,
};
use tokio_stream::Stream;
use tokio_stream::wrappers::UnboundedReceiverStream;

pub struct DataSource {
pub fn init_data_source(
ingestion_config: IngestionConfig,
run: Run,
source: Option<JoinHandle<()>>,
data_rx: Receiver<(DateTime<Utc>, f64)>,
}

impl DataSource {
pub fn new(ingestion_config: IngestionConfig, run: Run) -> Self {
let (data_tx, data_rx) = channel(1);

let task_handler = task::spawn_blocking(move || {
let duration = Duration::from_secs(60);
let start = Instant::now();
let mut rng = rand::thread_rng();

while Instant::now().duration_since(start) < duration {
data_tx
.blocking_send((Utc::now(), rng.gen_range(0.0..100.0)))
.unwrap();
std::thread::sleep(Duration::from_millis(500));
}
});

let source = Some(task_handler);

Self {
ingestion_config,
run,
source,
data_rx,
) -> (
JoinHandle<()>,
UnboundedReceiverStream<IngestWithConfigDataStreamRequest>,
) {
let (data_tx, data_rx) = unbounded_channel();

let data_source_handle = task::spawn(async move {
let duration = Duration::from_secs(60);
let start = Instant::now();

// Generating a nice sine wave
let mut t: f64 = 0.0;
let amplitude = 5.0;
let offset = 5.0;
let frequency = 0.1;

while Instant::now().duration_since(start) < duration {
let value = amplitude * (t * frequency).sin() + offset;
let req = IngestWithConfigDataStreamRequest {
run_id: run.run_id.clone(),
ingestion_config_id: String::from(&ingestion_config.ingestion_config_id),
flow: String::from("velocity_reading"),
timestamp: Some(Timestamp::from(Utc::now())),
channel_values: vec![IngestWithConfigDataChannelValue {
r#type: Some(Type::Double(value)),
}],
// Set this flag to `true` only for debugging purposes to get real-time data validation from
// the Sift API. Do not use in production as it will hurt performance.
end_stream_on_validation_error: false,
..Default::default()
};
println!("Emitting value for velocity_reading");
data_tx.send(req).unwrap();
t += 1.0;
time::sleep(Duration::from_secs(1)).await;
}
}
}

impl Stream for DataSource {
type Item = IngestWithConfigDataStreamRequest;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.data_rx.poll_recv(ctx) {
Poll::Ready(Some((ts, val))) => {
let req = IngestWithConfigDataStreamRequest {
run_id: self.run.run_id.clone(),
ingestion_config_id: String::from(&self.ingestion_config.ingestion_config_id),
flow: String::from("velocity_reading"),
timestamp: Some(Timestamp::from(ts)),
channel_values: vec![IngestWithConfigDataChannelValue {
r#type: Some(Type::Double(val)),
}],
// Set this flag to `true` only for debugging purposes to get real-time data validation from
// the Sift API. Do not use in production as it will hurt performance.
end_stream_on_validation_error: false,
..Default::default()
};
println!("Emitting value for velocity_reading");
Poll::Ready(Some(req))
}
Poll::Ready(_) => Poll::Ready(None),
_ => Poll::Pending,
}
}
}
});

impl Drop for DataSource {
fn drop(&mut self) {
if let Some(handle) = self.source.take() {
handle.abort();
}
}
(data_source_handle, UnboundedReceiverStream::new(data_rx))
}
8 changes: 4 additions & 4 deletions rust/examples/ingestion/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::{env, error::Error};

/// Simulates a data source
mod data;
use data::DataSource;

/// Name of the asset that we want to ingest data for.
pub const ASSET_NAME: &str = "LV-426";
Expand Down Expand Up @@ -43,13 +42,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
let run = create_run(grpc_channel.clone(), ASSET_NAME).await?;
println!("initialized run {}", &run.name);

// Initialize the data source which is an impl Stream
let data_source = DataSource::new(ingestion_config, run);
let (data_source_handle, data_source_rx) = data::init_data_source(ingestion_config, run);

IngestServiceClient::new(grpc_channel)
.ingest_with_config_data_stream(data_source)
.ingest_with_config_data_stream(data_source_rx)
.await?;

data_source_handle.await?;

println!("done.");
Ok(())
}
Expand Down

0 comments on commit ff5ad24

Please sign in to comment.