Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Feature/smart module filter #159

Merged
merged 8 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ log = "^0.4.17"

[dependencies]
cpython = { version = "0.7", features = ["extension-module"] }
flate2 = "1.0.24"
fluvio = { version = "0.12.6" }
fluvio-future = { version = "0.4.1", features = ["task", "io"] }

Expand Down
29 changes: 29 additions & 0 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,35 @@ def stream(self, offset: Offset) -> PartitionConsumerStream:
'''
return PartitionConsumerStream(self._inner.stream(offset._inner))

def stream_with_config(self, offset: Offset, wasm_module_path: str) -> PartitionConsumerStream:
'''
Continuously streams events from a particular offset with a SmartModule WASM module in the consumer’s
partition. This returns a `PartitionConsumerStream` which is an
iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a
continuous request for new records arriving in a partition, beginning
at a particular offset. You specify the starting point of the stream
using an Offset and periodically receive events, either individually or
in batches.

Args:
offset: Offset
wasm_module_path: str - The absolute path to the WASM file

Example:
import os

wasm_module_path = os.path.abspath("somefilter.wasm")
for i in consumer.stream_with_config(Offset.beginning(), wasm_module_path):
# do something with i

Returns:
PartionConsumerStream

'''
return PartitionConsumerStream(self._inner.stream_with_config(offset._inner, wasm_module_path))


class TopicProducer:
'''An interface for producing events to a particular topic.
Expand Down
Binary file added integration-tests/smartmodule_filter_on_a.wasm
Binary file not shown.
31 changes: 30 additions & 1 deletion integration-tests/test_fluvio_python.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from string import ascii_lowercase
from fluvio import (Fluvio, FluviorError, Offset)
import unittest
import uuid
import os


def create_topic(topic):
Expand Down Expand Up @@ -32,7 +34,34 @@ def test_produce(self):
for i in range(10):
producer.send_string("FOOBAR %s " % i)

def test_consume_with_iterator(self):
def test_consume_with_smart_module_iterator(self):
"""
Test adds a the alphabet into a topic in the format of record-[letter]

A wasm smart module is added to the filter and a all messages are retrieved and stored in the records list
We can then assert the following:

- There should be 1 item
- It should be record-a

"""

wasm_module_path = os.path.abspath("integration-tests/smartmodule_filter_on_a.wasm")

fluvio = Fluvio.connect()
producer = fluvio.topic_producer(self.topic)
for i in list(ascii_lowercase):
producer.send_string(f"record-{i}")

records = []

consumer = fluvio.partition_consumer(self.topic, 0)
records.append(bytearray(next(consumer.stream_with_config(Offset.beginning(), wasm_module_path)).value()).decode())

self.assertEqual(len(records), 1)
self.assertEqual(records[0], "record-a")

def test_consumer_with_interator(self):
fluvio = Fluvio.connect()
producer = fluvio.topic_producer(self.topic)
for i in range(10):
Expand Down
48 changes: 48 additions & 0 deletions src/glue.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ use fluvio::{
Offset,
consumer::Record,
};
use std::io::Read;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use fluvio_future::{
task::run_block_on,
io::{
Stream,
StreamExt,
},
};
use fluvio::consumer::{
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
ConsumerConfig
};
use std::pin::Pin;

mod _Fluvio {
Expand Down Expand Up @@ -52,6 +61,22 @@ foreign_class!(class Fluvio {
});


pub struct ConsumerConfigWrapper {
wasm_module: Vec<u8>,
}

impl ConsumerConfigWrapper {
fn new_config_with_wasm_filter(file: &str) -> ConsumerConfigWrapper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't catch it until I saw the stacktrace in CI but we should remove the unwraps. Due to the fact that std::fs::read returns a Result<Vec<u8>, std::io::Error>, let's make this return something like Result<ConsumerConfigWrapper, std::io::Error>.

I'm a little hesitant to suggest it but given that FluvioError implements From<IoError>, this function could also just return a Result<ConsumerConfigWrapper, FluvioError> and have raw_buffer.as_ref().unwrap() be raw_buffer.as_ref()?. The ? here is called the question mark operator and is used to bubble up and handle various error types. Basically, it's syntactic sugar that roughly expands to:

let good_val = match my_result_type { 
    Err(e) => return FluvioError::from(e)
    Ok(val) => val
};

The reason I'm a little hesitant about using the base FluvioError is that the conversions to a FluvioError (really just any rust error type) are done in that rust crate and are exposed upward. Something about having a library/crate that converts an io error downward a dependency's error rather than it's own error type feels wrong. That being said, I think I've done it before.

Anyway, the unwrap causes this to segfault/panic rather than bubble up to the python as an exception. The Flapigen macros turns these error types into Strings (though, it can also return a typed error if we want) and raises them on the python side.

What do you think? We could do the simple change on the error type in this PR and then create an issue or have a follow up PR exposing a more expressive type to the Python side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will implement your first suggestion and return, sorry about the delay, i was having a weekend away from the computer!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented your suggestion, it now looks like this:

impl ConsumerConfigWrapper {
    fn new_config_with_wasm_filter(file: &str) -> Result<ConsumerConfigWrapper, std::io::Error> {
        let raw_buffer = match std::fs::read(file) {
            Ok(b) => b,
            Err(error) => return Err(error)
        };
        let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
        let mut buffer = Vec::with_capacity(raw_buffer.len());
        match encoder.read_to_end(&mut buffer) {
            Ok(encoder) => encoder,
            Err(error) => return Err(error)
        };
        Ok(ConsumerConfigWrapper {
            wasm_module: buffer,
        })
    }
}

The only bit i was unsure on, was in the stream_with_config method, i was unsure of how to handle the error from the result so i settled on this:

pub fn stream_with_config(
        consumer: &PartitionConsumer,
        offset: &Offset,
        wasm_module_path: &str
    ) -> Result<PartitionConsumerStream, FluvioError> {
        let config_wrapper = match ConsumerConfigWrapper::new_config_with_wasm_filter(wasm_module_path) {
            Ok(config) => config,
            Err(error) => return Err(FluvioError::Other(error.to_string()))
        };
        let mut builder = ConsumerConfig::builder();
        builder.smartmodule(Some(SmartModuleInvocation {
            wasm: SmartModuleInvocationWasm::AdHoc(config_wrapper.wasm_module),
            kind: SmartModuleKind::Filter,
            params: Default::default()
        }));
        let config = builder.build().map_err(|err| FluvioError::Other(err.to_string()))?;
        run_block_on(consumer.stream_with_config(offset.clone(), config)).map(|stream| PartitionConsumerStream { inner: Box::pin(stream) })
    }

let raw_buffer = std::fs::read(file);
let mut encoder = GzEncoder::new(raw_buffer.as_ref().unwrap().as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.as_ref().unwrap().len());
encoder.read_to_end(&mut buffer);
ConsumerConfigWrapper {
wasm_module: buffer,
}
}
}

mod _PartitionConsumer {
use super::*;
pub fn stream(
Expand All @@ -62,13 +87,36 @@ mod _PartitionConsumer {
inner: Box::pin(run_block_on(consumer.stream(offset.clone()))?)
})
}
pub fn stream_with_config(
consumer: &PartitionConsumer,
offset: &Offset,
wasm_module_path: &str
) -> Result<PartitionConsumerStream, FluvioError> {
let config_wrapper = ConsumerConfigWrapper::new_config_with_wasm_filter(wasm_module_path);
let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(config_wrapper.wasm_module),
kind: SmartModuleKind::Filter,
params: Default::default()
}));
let config = builder.build().expect("Failed to create config");
bencleary marked this conversation as resolved.
Show resolved Hide resolved
match run_block_on(consumer.stream_with_config(offset.clone(), config)) {
bencleary marked this conversation as resolved.
Show resolved Hide resolved
Ok(stream) => Ok(PartitionConsumerStream {
inner: Box::pin(stream),
}),
Err(e) => Err(e),
}
}
}
foreign_class!(class PartitionConsumer {
self_type PartitionConsumer;
private constructor = empty;
fn _PartitionConsumer::stream(
&self, _: &Offset
) -> Result<PartitionConsumerStream, FluvioError>;
fn _PartitionConsumer::stream_with_config(
&self, _: &Offset, _: &str
) -> Result<PartitionConsumerStream, FluvioError>;
});

type PartitionConsumerIteratorInner =
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#![allow(non_snake_case, unused)]

include!(concat!(env!("OUT_DIR"), "/glue.rs"));
include!(concat!(env!("OUT_DIR"), "/glue.rs"));
bencleary marked this conversation as resolved.
Show resolved Hide resolved