Skip to content

Commit

Permalink
Do not start different smartengines per send call (#1913)
Browse files Browse the repository at this point in the history
With this change we can create a producer with something like:

```rust
    lat data = std::fs::read(name)?;
    let params = Default::default();
    let producer = fluvio.topic_producer(&self.fluvio_topic).await?.with_map(data, params)?
```

The smartengine will be initialized when `with_filter`, `with_map`... is called and not per `send()` call
  • Loading branch information
morenol committed Nov 17, 2021
1 parent 0cd0a6e commit 0dc8082
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 44 deletions.
2 changes: 1 addition & 1 deletion crates/fluvio-smartengine/src/smartstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl SmartStreamContext {
}
}

pub trait SmartStream: Send {
pub trait SmartStream: Send + Sync {
fn process(&mut self, input: SmartStreamInput) -> Result<SmartStreamOutput>;
fn params(&self) -> SmartStreamExtraParams;
}
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ pub mod config;
use tracing::instrument;
pub use error::FluvioError;
pub use config::FluvioConfig;
pub use producer::{TopicProducer, RecordKey, SmartStreamConfig};
pub use producer::{TopicProducer, RecordKey};

pub use consumer::{
PartitionConsumer, ConsumerConfig, MultiplePartitionConsumer, PartitionSelectionStrategy,
};
Expand Down
91 changes: 49 additions & 42 deletions crates/fluvio/src/producer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::sync::Arc;
use std::collections::HashMap;
#[cfg(feature = "smartengine")]
use std::sync::RwLock;
use fluvio_protocol::Encoder;
#[cfg(feature = "smartengine")]
use fluvio_smartengine::SmartStream;
use tracing::debug;
use tracing::instrument;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -32,78 +36,85 @@ pub struct TopicProducer {
topic: String,
pool: Arc<SpuPool>,
partitioner: Box<dyn Partitioner + Send + Sync>,
pub(crate) smartstream_config: SmartStreamConfig,
#[cfg(feature = "smartengine")]
pub(crate) smartengine: Option<Arc<RwLock<Box<dyn SmartStream>>>>,
}
cfg_if::cfg_if! {
if #[cfg(feature = "smartengine")] {
use fluvio_spu_schema::server::stream_fetch::{SmartStreamWasm, SmartStreamKind, SmartStreamPayload};
use std::collections::BTreeMap;

#[derive(Default)]
pub struct SmartStreamConfig {
pub(crate) wasm_module: Option<SmartStreamPayload>,
}
impl SmartStreamConfig {
use fluvio_smartengine::SmartEngine;

impl TopicProducer {
fn init_engine(&mut self, smart_payload: SmartStreamPayload) -> Result<(), FluvioError> {
let engine = SmartEngine::default();
let smartstream = engine.create_module_from_payload(
smart_payload).map_err(|e| FluvioError::Other(format!("SmartEngine - {:?}", e)))?;
self.smartengine = Some(Arc::new(RwLock::new(smartstream)));
Ok(())
}
/// Adds a SmartStream filter to this TopicProducer
pub fn wasm_filter<T: Into<Vec<u8>>>(
pub fn with_filter<T: Into<Vec<u8>>>(
mut self,
filter: T,
params: BTreeMap<String, String>,
) -> Self {
self.wasm_module = Some(SmartStreamPayload {
) -> Result<Self, FluvioError> {
let smart_payload = SmartStreamPayload {
wasm: SmartStreamWasm::Raw(filter.into()),
kind: SmartStreamKind::Filter,
params: params.into(),
});
self
};
self.init_engine(smart_payload)?;
Ok(self)
}

/// Adds a SmartStream map to this TopicProducer
pub fn wasm_map<T: Into<Vec<u8>>>(
pub fn with_map<T: Into<Vec<u8>>>(
mut self,
map: T,
params: BTreeMap<String, String>,
) -> Self {
self.wasm_module = Some(SmartStreamPayload {
) -> Result<Self, FluvioError> {
let smart_payload = SmartStreamPayload {
wasm: SmartStreamWasm::Raw(map.into()),
kind: SmartStreamKind::Map,
params: params.into(),
});
self
};
self.init_engine(smart_payload)?;
Ok(self)
}

/// Adds a SmartStream array_map to this TopicProducer
pub fn wasm_array_map<T: Into<Vec<u8>>>(
pub fn with_array_map<T: Into<Vec<u8>>>(
mut self,
map: T,
params: BTreeMap<String, String>,
) -> Self {
self.wasm_module = Some(SmartStreamPayload {
) -> Result<Self, FluvioError> {
let smart_payload = SmartStreamPayload {
wasm: SmartStreamWasm::Raw(map.into()),
kind: SmartStreamKind::ArrayMap,
params: params.into(),
});
self
};

self.init_engine(smart_payload)?;
Ok(self)
}

/// Adds a SmartStream array_map to this TopicProducer
pub fn wasm_aggregate<T: Into<Vec<u8>>>(
/// Adds a SmartStream aggregate to this TopicProducer
pub fn with_aggregate<T: Into<Vec<u8>>>(
mut self,
map: T,
params: BTreeMap<String, String>,
accumulator: Vec<u8>,
) -> Self {
self.wasm_module = Some(SmartStreamPayload {
) -> Result<Self, FluvioError> {
let smart_payload = SmartStreamPayload {
wasm: SmartStreamWasm::Raw(map.into()),
kind: SmartStreamKind::Aggregate { accumulator },
params: params.into(),
});
self
};
self.init_engine(smart_payload)?;
Ok(self)
}
}
} else {
#[derive(Default)]
pub struct SmartStreamConfig {}
}
}

Expand All @@ -114,12 +125,10 @@ impl TopicProducer {
topic,
pool,
partitioner,
smartstream_config: Default::default(),
#[cfg(feature = "smartengine")]
smartengine: Default::default(),
}
}
pub fn get_smartstream_mut(&mut self) -> &mut SmartStreamConfig {
&mut self.smartstream_config
}

/// Sends a key/value record to this producer's Topic.
///
Expand Down Expand Up @@ -175,16 +184,14 @@ impl TopicProducer {
.map::<(RecordKey, RecordData), _>(|(k, v)| (k.into(), v.into()))
.map(Record::from)
.collect::<Vec<Record>>();
use fluvio_smartengine::{SmartEngine};
use dataplane::smartstream::SmartStreamInput;
use std::convert::TryFrom;
if let Some(smart_payload) = &self.smartstream_config.wasm_module {


let engine = SmartEngine::default();
let mut smartstream = engine.create_module_from_payload(smart_payload.clone()).map_err(|e| FluvioError::Other(format!("SmartEngine - {:?}", e)))?;

let output = smartstream.process(SmartStreamInput::try_from(entries)?).map_err(|e| FluvioError::Other(format!("SmartEngine - {:?}", e)))?;
if let Some(
smartengine_ref
) = &self.smartengine {
let mut smartengine = smartengine_ref.write().map_err(|e| FluvioError::Other(format!("SmartEngine - {:?}", e)))?;
let output = smartengine.process(SmartStreamInput::try_from(entries)?).map_err(|e| FluvioError::Other(format!("SmartEngine - {:?}", e)))?;
entries = output.successes;
}
} else {
Expand Down

0 comments on commit 0dc8082

Please sign in to comment.