Skip to content

Commit

Permalink
WIP - Feature/smart module filter (#159)
Browse files Browse the repository at this point in the history
* initial spike of feature

* updated docs and worked through sample more

* tidied up python interface and added test

* PR comments fix

* rust changes implemented from PR feedback

* better error handling for consumerconfigwrapper

* fixed linting errors

Co-authored-by: simlay <[email protected]>
  • Loading branch information
bencleary and simlay authored Sep 27, 2022
1 parent 91fe8a1 commit 44274db
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ log = "^0.4.17"

[dependencies]
cpython = { version = "0.7", features = ["extension-module"] }
flate2 = "1.0.24"
fluvio = { version = "0.13" }
fluvio-future = { version = "0.4.2", features = ["task", "io"] }
fluvio-future = { version = "0.4.2", features = ["task", "io"] }
31 changes: 31 additions & 0 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ def stream(self, offset: Offset) -> PartitionConsumerStream:
'''
return PartitionConsumerStream(self._inner.stream(offset._inner))

def stream_with_config(self, offset: Offset, wasm_path: str) -> PartitionConsumerStream:
'''
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
`PartitionConsumerStream` which is an iterator.
Streaming is one of the two ways to consume events in Fluvio. It is a
continuous request for new records arriving in a partition, beginning
at a particular offset. You specify the starting point of the stream
using an Offset and periodically receive events, either individually or
in batches.
Args:
offset: Offset
wasm_module_path: str - The absolute path to the WASM file
Example:
import os
wmp = os.path.abspath("somefilter.wasm")
for i in consumer.stream_with_config(Offset.beginning(), wmp):
# do something with i
Returns:
PartionConsumerStream
'''
return PartitionConsumerStream(
self._inner.stream_with_config(offset._inner, wasm_path)
)


class TopicProducer:
'''An interface for producing events to a particular topic.
Expand Down
Binary file added integration-tests/smartmodule_filter_on_a.wasm
Binary file not shown.
84 changes: 54 additions & 30 deletions integration-tests/test_fluvio_python.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from fluvio import (Fluvio, FluviorError, Offset)
from string import ascii_lowercase
from fluvio import Fluvio, FluviorError, Offset
import unittest
import uuid
import os


def create_topic(topic):
import subprocess

subprocess.run("fluvio topic create %s" % topic, shell=True)


def delete_topic(topic):
import subprocess

subprocess.run("fluvio topic delete %s" % topic, shell=True)


Expand All @@ -32,7 +36,40 @@ def test_produce(self):
for i in range(10):
producer.send_string("FOOBAR %s " % i)

def test_consume_with_iterator(self):
def test_consume_with_smart_module_iterator(self):
"""
Test adds a the alphabet into a topic in the format of record-[letter]
A wasm smart module is added to the filter and a all messages are retrieved and stored in the records list
We can then assert the following:
- There should be 1 item
- It should be record-a
"""

wasm_module_path = os.path.abspath(
"integration-tests/smartmodule_filter_on_a.wasm"
)

fluvio = Fluvio.connect()
producer = fluvio.topic_producer(self.topic)
for i in list(ascii_lowercase):
producer.send_string(f"record-{i}")
records = []

consumer = fluvio.partition_consumer(self.topic, 0)
records.append(
bytearray(
next(
consumer.stream_with_config(Offset.beginning(), wasm_module_path)
).value()
).decode()
)
self.assertEqual(len(records), 1)
self.assertEqual(records[0], "record-a")

def test_consumer_with_interator(self):
fluvio = Fluvio.connect()
producer = fluvio.topic_producer(self.topic)
for i in range(10):
Expand All @@ -42,10 +79,8 @@ def test_consume_with_iterator(self):
count = 0
for i in consumer.stream(Offset.beginning()):
print("THIS IS IN AN ITERATOR! %s" % i.value())
self.assertEqual(
bytearray(i.value()).decode(), 'record-%s' % count
)
self.assertEqual(i.value_string(), 'record-%s' % count)
self.assertEqual(bytearray(i.value()).decode(), "record-%s" % count)
self.assertEqual(i.value_string(), "record-%s" % count)
count += 1
if count >= 10:
break
Expand All @@ -59,18 +94,11 @@ def test_key_value(self):
consumer = fluvio.partition_consumer(self.topic, 0)
count = 0
for i in consumer.stream(Offset.beginning()):
print(
"THIS IS IN AN ITERATOR! key - %s, value - %s" % (
i.key(),
i.value()
)
)
self.assertEqual(
bytearray(i.value()).decode(), 'record-%s' % count
)
self.assertEqual(i.value_string(), 'record-%s' % count)
self.assertEqual(i.key_string(), 'foo')
self.assertEqual(i.key(), list('foo'.encode()))
print("THIS IS IN AN ITERATOR! key - %s, value - %s" % (i.key(), i.value()))
self.assertEqual(bytearray(i.value()).decode(), "record-%s" % count)
self.assertEqual(i.value_string(), "record-%s" % count)
self.assertEqual(i.key_string(), "foo")
self.assertEqual(i.key(), list("foo".encode()))

count += 1
if count >= 10:
Expand All @@ -90,12 +118,10 @@ def test_batch_produce(self):
consumer = fluvio.partition_consumer(self.topic, 0)
count = 0
for i in consumer.stream(Offset.beginning()):
self.assertEqual(
bytearray(i.value()).decode(), 'record-%s' % count
)
self.assertEqual(i.value_string(), 'record-%s' % count)
self.assertEqual(i.key_string(), ('%s' % count))
self.assertEqual(i.key(), list(('%s' % count).encode()))
self.assertEqual(bytearray(i.value()).decode(), "record-%s" % count)
self.assertEqual(i.value_string(), "record-%s" % count)
self.assertEqual(i.key_string(), ("%s" % count))
self.assertEqual(i.key(), list(("%s" % count).encode()))

count += 1
if count >= 10:
Expand All @@ -114,14 +140,11 @@ def test_produce_on_uncreated_topic(self):
fluvio.topic_producer(self.topic)
except FluviorError as e:
error = e
print('ERROR: %s' % e)
print("ERROR: %s" % e)

self.assertTrue(error is not None)
self.assertEqual(
error.args,
(
'Topic not found: %s' % self.topic, # noqa: E501
)
error.args, ("Topic not found: %s" % self.topic,) # noqa: E501
)


Expand All @@ -145,6 +168,7 @@ def test_produce_flush(self):
producer.flush()

import subprocess

result = subprocess.run(
"fluvio consume %s -B 0 -d" % self.topic,
shell=True,
Expand All @@ -154,7 +178,7 @@ def test_produce_flush(self):
)

# The CLI appends an extra newline to the output.
expected_output.append('')
expected_output.append("")
expected_output = "\n".join(expected_output)
stdout = result.stdout

Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
exclude = .git,.tox
max-line-length = 119
52 changes: 52 additions & 0 deletions src/glue.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ use fluvio::{
Offset,
consumer::Record,
};
use std::io::{Read, Error};
use flate2::bufread::GzEncoder;
use flate2::Compression;
use fluvio_future::{
task::run_block_on,
io::{
Stream,
StreamExt,
},
};
use fluvio::consumer::{
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
ConsumerConfig
};
use std::pin::Pin;

mod _Fluvio {
Expand Down Expand Up @@ -52,6 +61,28 @@ foreign_class!(class Fluvio {
});


pub struct ConsumerConfigWrapper {
wasm_module: Vec<u8>,
}

impl ConsumerConfigWrapper {
fn new_config_with_wasm_filter(file: &str) -> Result<ConsumerConfigWrapper, std::io::Error> {
let raw_buffer = match std::fs::read(file) {
Ok(b) => b,
Err(error) => return Err(error)
};
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
match encoder.read_to_end(&mut buffer) {
Ok(encoder) => encoder,
Err(error) => return Err(error)
};
Ok(ConsumerConfigWrapper {
wasm_module: buffer,
})
}
}

mod _PartitionConsumer {
use super::*;
pub fn stream(
Expand All @@ -62,13 +93,34 @@ mod _PartitionConsumer {
inner: Box::pin(run_block_on(consumer.stream(offset.clone()))?)
})
}
pub fn stream_with_config(
consumer: &PartitionConsumer,
offset: &Offset,
wasm_module_path: &str
) -> Result<PartitionConsumerStream, FluvioError> {
let config_wrapper = match ConsumerConfigWrapper::new_config_with_wasm_filter(wasm_module_path) {
Ok(config) => config,
Err(error) => return Err(FluvioError::Other(error.to_string()))
};
let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(config_wrapper.wasm_module),
kind: SmartModuleKind::Filter,
params: Default::default()
}));
let config = builder.build().map_err(|err| FluvioError::Other(err.to_string()))?;
run_block_on(consumer.stream_with_config(offset.clone(), config)).map(|stream| PartitionConsumerStream { inner: Box::pin(stream) })
}
}
foreign_class!(class PartitionConsumer {
self_type PartitionConsumer;
private constructor = empty;
fn _PartitionConsumer::stream(
&self, _: &Offset
) -> Result<PartitionConsumerStream, FluvioError>;
fn _PartitionConsumer::stream_with_config(
&self, _: &Offset, _: &str
) -> Result<PartitionConsumerStream, FluvioError>;
});

type PartitionConsumerIteratorInner =
Expand Down

0 comments on commit 44274db

Please sign in to comment.