Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - chore: smartmodule latest interface adoption #3661

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions smartmodule/examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-average/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use serde::{Serialize, Deserialize};
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

#[derive(Default, Serialize, Deserialize)]
struct IncrementalAverage {
Expand All @@ -22,7 +22,7 @@ impl IncrementalAverage {
}

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
// Parse the average from JSON
let mut average: IncrementalAverage =
serde_json::from_slice(accumulator.as_ref()).unwrap_or_default();
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-init/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::OnceLock;

use fluvio_smartmodule::{
dataplane::smartmodule::SmartModuleExtraParams, smartmodule, Record, RecordData, Result,
dataplane::smartmodule::SmartModuleExtraParams, smartmodule, SmartModuleRecord, RecordData, Result,
};

static INITIAL_VALUE: OnceLock<UseOnce<RecordData>> = OnceLock::new();

const PARAM_NAME: &str = "initial_value";

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
let accumulator = if let Some(initial_value) = INITIAL_VALUE.get() {
initial_value.get_or(&accumulator)
} else {
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-json/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};
use serde::{Serialize, Deserialize};

#[derive(Default, Serialize, Deserialize)]
Expand All @@ -20,7 +20,7 @@ impl std::ops::Add for GithubStars {
}

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
// Parse accumulator
let accumulated_stars: GithubStars =
serde_json::from_slice(accumulator.as_ref()).unwrap_or_default();
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate-sum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
// Parse the accumulator and current record as strings
let accumulator_string = std::str::from_utf8(accumulator.as_ref())?;
let current_string = std::str::from_utf8(current.value.as_ref())?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/aggregate/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

/// This aggregate concatenate accumulator and current value
/// values: "a","b"
// accumulator: "1",
// "1a","1ab"
#[smartmodule(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
pub fn aggregate(accumulator: RecordData, current: &SmartModuleRecord) -> Result<RecordData> {
let mut acc = String::from_utf8(accumulator.as_ref().to_vec())?;
let next = std::str::from_utf8(current.value.as_ref())?;
acc.push_str(next);
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/array_map_json_array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
//! "Cranberry"
//! ```

use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
pub fn array_map(record: &SmartModuleRecord) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Deserialize a JSON array with any kind of values inside
let array: Vec<serde_json::Value> = serde_json::from_slice(record.value.as_ref())?;

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/array_map_json_object/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@
//! [c] "Cranberry"
//! ```

use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};
use serde_json::{Map, Value};

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
pub fn array_map(record: &SmartModuleRecord) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Deserialize a JSON object (Map) with any kind of values inside
let object: Map<String, Value> = serde_json::from_slice(record.value.as_ref())?;

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/array_map_json_reddit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! zero or many output records. This example showcases taking a stream of Reddit API
//! responses and converting it into a stream of the individual posts.

use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};
use serde::{Serialize, Deserialize};

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -33,7 +33,7 @@ struct RedditPostData {
}

#[smartmodule(array_map)]
pub fn array_map(record: &Record) -> Result<Vec<(Option<RecordData>, RecordData)>> {
pub fn array_map(record: &SmartModuleRecord) -> Result<Vec<(Option<RecordData>, RecordData)>> {
// Deserialize a RedditListing from JSON
let listing: RedditListing = serde_json::from_slice(record.value.as_ref())?;

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains('a'))
}
6 changes: 3 additions & 3 deletions smartmodule/examples/filter_hashset/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ use std::{
};

use fluvio_smartmodule::{
smartmodule, Record, Result, dataplane::smartmodule::SmartModuleExtraParams, eyre,
smartmodule, SmartModuleRecord, Result, dataplane::smartmodule::SmartModuleExtraParams, eyre,
};

static SET: OnceLock<BoundedHashSet<String>> = OnceLock::new();

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(get_mut_set()?.insert(string.to_owned()))
}

#[smartmodule(look_back)]
pub fn look_back(record: &Record) -> Result<()> {
pub fn look_back(record: &SmartModuleRecord) -> Result<()> {
let string = std::str::from_utf8(record.value.as_ref())?;
get_mut_set()?.insert(string.to_owned());
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_init/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::OnceLock;

use fluvio_smartmodule::{
smartmodule, Record, Result, eyre,
smartmodule, SmartModuleRecord, Result, eyre,
dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError},
};

Expand All @@ -19,7 +19,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains(CRITERIA.get().unwrap()))
}
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_json/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
//! {"level":"error","message":"Unable to connect to database"}
//! ```

use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

#[derive(PartialEq, Eq, PartialOrd, Ord, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
Expand All @@ -64,7 +64,7 @@ struct StructuredLog {
}

#[smartmodule(filter)]
pub fn filter_log_level(record: &Record) -> Result<bool> {
pub fn filter_log_level(record: &SmartModuleRecord) -> Result<bool> {
let log = serde_json::from_slice::<StructuredLog>(record.value.as_ref())?;
Ok(log.level > LogLevel::Debug)
}
6 changes: 3 additions & 3 deletions smartmodule/examples/filter_look_back/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::atomic::{AtomicI32, Ordering::SeqCst};

use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

static PREV: AtomicI32 = AtomicI32::new(0);

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
let current: i32 = string.parse()?;
let last = PREV.load(SeqCst);
Expand All @@ -18,7 +18,7 @@ pub fn filter(record: &Record) -> Result<bool> {
}

#[smartmodule(look_back)]
pub fn look_back(record: &Record) -> Result<()> {
pub fn look_back(record: &SmartModuleRecord) -> Result<()> {
let string = std::str::from_utf8(record.value.as_ref())?;
let last: i32 = string.parse()?;
PREV.store(last, SeqCst);
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_map/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! This SmartModule filters out all odd numbers, and divides all even numbers by 2.

use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(filter_map)]
pub fn filter_map(record: &Record) -> Result<Option<(Option<RecordData>, RecordData)>> {
pub fn filter_map(record: &SmartModuleRecord) -> Result<Option<(Option<RecordData>, RecordData)>> {
let key = record.key.clone();
let string = String::from_utf8_lossy(record.value.as_ref()).to_string();
let int: i32 = string.parse()?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_odd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! filter_odd/src/lib.rs:45:38
//! ```

use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};

#[derive(Debug, thiserror::Error)]
pub enum SecondErrorWrapper {
Expand All @@ -37,7 +37,7 @@ pub enum FirstErrorWrapper {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
let int = string
.parse::<i32>()
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_regex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use fluvio_smartmodule::{smartmodule, Record, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, Result};
use regex::Regex;

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;

// Check whether the Record contains a Social Security number
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/filter_with_param/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::OnceLock;

use fluvio_smartmodule::{
smartmodule, Record, Result, eyre,
smartmodule, SmartModuleRecord, Result, eyre,
dataplane::smartmodule::{SmartModuleExtraParams},
};

Expand All @@ -22,7 +22,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains(CRITERIA.get().unwrap()))
}
4 changes: 2 additions & 2 deletions smartmodule/examples/map/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();
let mut value = Vec::from(record.value.as_ref());

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/map_double/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();

let string = std::str::from_utf8(record.value.as_ref())?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/map_json/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use fluvio_smartmodule::{smartmodule, Record, RecordData, Result};
use fluvio_smartmodule::{smartmodule, SmartModuleRecord, RecordData, Result};

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let json = serde_json::from_slice::<serde_json::Value>(record.value.as_ref())?;
let yaml_bytes = serde_yaml::to_string(&json)?.into_bytes();

Expand Down
4 changes: 2 additions & 2 deletions smartmodule/examples/map_regex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use regex::Regex;
use once_cell::sync::Lazy;
use fluvio_smartmodule::{smartmodule, Result, Record, RecordData};
use fluvio_smartmodule::{smartmodule, Result, SmartModuleRecord, RecordData};

static SSN_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"\d{3}-\d{2}-\d{4}").unwrap());

#[smartmodule(map)]
pub fn map(record: &Record) -> Result<(Option<RecordData>, RecordData)> {
pub fn map(record: &SmartModuleRecord) -> Result<(Option<RecordData>, RecordData)> {
let key = record.key.clone();

let string = std::str::from_utf8(record.value.as_ref())?;
Expand Down
4 changes: 2 additions & 2 deletions smartmodule/regex-filter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use regex::Regex;


use fluvio_smartmodule::{
smartmodule, Record, Result, eyre,
smartmodule, SmartModuleRecord, Result, eyre,
dataplane::smartmodule::{SmartModuleExtraParams, SmartModuleInitError},
};

Expand All @@ -22,7 +22,7 @@ fn init(params: SmartModuleExtraParams) -> Result<()> {
}

#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
pub fn filter(record: &SmartModuleRecord) -> Result<bool> {
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(REGEX.get().unwrap().is_match(string))
}