Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Feature/smart module filter #159

Merged
merged 8 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ log = "^0.4.17"

[dependencies]
cpython = { version = "0.7", features = ["extension-module"] }
flate2 = "1.0.24"
fluvio = { version = "0.13" }
fluvio-future = { version = "0.4.2", features = ["task", "io"] }
fluvio-future = { version = "0.4.2", features = ["task", "io"] }
31 changes: 31 additions & 0 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,37 @@ def stream(self, offset: Offset) -> PartitionConsumerStream:
'''
return PartitionConsumerStream(self._inner.stream(offset._inner))

def stream_with_config(self, offset: Offset, wasm_path: str) -> PartitionConsumerStream:
'''
Continuously streams events from a particular offset with a SmartModule
WASM module in the consumer’s partition. This returns a
`PartitionConsumerStream` which is an iterator.

Streaming is one of the two ways to consume events in Fluvio. It is a
continuous request for new records arriving in a partition, beginning
at a particular offset. You specify the starting point of the stream
using an Offset and periodically receive events, either individually or
in batches.

Args:
offset: Offset
wasm_module_path: str - The absolute path to the WASM file

Example:
import os

wmp = os.path.abspath("somefilter.wasm")
for i in consumer.stream_with_config(Offset.beginning(), wmp):
# do something with i

Returns:
PartionConsumerStream

'''
return PartitionConsumerStream(
self._inner.stream_with_config(offset._inner, wasm_path)
)


class TopicProducer:
'''An interface for producing events to a particular topic.
Expand Down
Binary file added integration-tests/smartmodule_filter_on_a.wasm
Binary file not shown.
84 changes: 54 additions & 30 deletions integration-tests/test_fluvio_python.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from fluvio import (Fluvio, FluviorError, Offset)
from string import ascii_lowercase
from fluvio import Fluvio, FluviorError, Offset
import unittest
import uuid
import os


def create_topic(topic):
import subprocess

subprocess.run("fluvio topic create %s" % topic, shell=True)


def delete_topic(topic):
import subprocess

subprocess.run("fluvio topic delete %s" % topic, shell=True)


Expand All @@ -32,7 +36,40 @@ def test_produce(self):
for i in range(10):
producer.send_string("FOOBAR %s " % i)

def test_consume_with_iterator(self):
def test_consume_with_smart_module_iterator(self):
"""
Test adds a the alphabet into a topic in the format of record-[letter]

A wasm smart module is added to the filter and a all messages are retrieved and stored in the records list
We can then assert the following:

- There should be 1 item
- It should be record-a

"""

wasm_module_path = os.path.abspath(
"integration-tests/smartmodule_filter_on_a.wasm"
)

fluvio = Fluvio.connect()
producer = fluvio.topic_producer(self.topic)
for i in list(ascii_lowercase):
producer.send_string(f"record-{i}")
records = []

consumer = fluvio.partition_consumer(self.topic, 0)
records.append(
bytearray(
next(
consumer.stream_with_config(Offset.beginning(), wasm_module_path)
).value()
).decode()
)
self.assertEqual(len(records), 1)
self.assertEqual(records[0], "record-a")

def test_consumer_with_interator(self):
fluvio = Fluvio.connect()
producer = fluvio.topic_producer(self.topic)
for i in range(10):
Expand All @@ -42,10 +79,8 @@ def test_consume_with_iterator(self):
count = 0
for i in consumer.stream(Offset.beginning()):
print("THIS IS IN AN ITERATOR! %s" % i.value())
self.assertEqual(
bytearray(i.value()).decode(), 'record-%s' % count
)
self.assertEqual(i.value_string(), 'record-%s' % count)
self.assertEqual(bytearray(i.value()).decode(), "record-%s" % count)
self.assertEqual(i.value_string(), "record-%s" % count)
count += 1
if count >= 10:
break
Expand All @@ -59,18 +94,11 @@ def test_key_value(self):
consumer = fluvio.partition_consumer(self.topic, 0)
count = 0
for i in consumer.stream(Offset.beginning()):
print(
"THIS IS IN AN ITERATOR! key - %s, value - %s" % (
i.key(),
i.value()
)
)
self.assertEqual(
bytearray(i.value()).decode(), 'record-%s' % count
)
self.assertEqual(i.value_string(), 'record-%s' % count)
self.assertEqual(i.key_string(), 'foo')
self.assertEqual(i.key(), list('foo'.encode()))
print("THIS IS IN AN ITERATOR! key - %s, value - %s" % (i.key(), i.value()))
self.assertEqual(bytearray(i.value()).decode(), "record-%s" % count)
self.assertEqual(i.value_string(), "record-%s" % count)
self.assertEqual(i.key_string(), "foo")
self.assertEqual(i.key(), list("foo".encode()))

count += 1
if count >= 10:
Expand All @@ -90,12 +118,10 @@ def test_batch_produce(self):
consumer = fluvio.partition_consumer(self.topic, 0)
count = 0
for i in consumer.stream(Offset.beginning()):
self.assertEqual(
bytearray(i.value()).decode(), 'record-%s' % count
)
self.assertEqual(i.value_string(), 'record-%s' % count)
self.assertEqual(i.key_string(), ('%s' % count))
self.assertEqual(i.key(), list(('%s' % count).encode()))
self.assertEqual(bytearray(i.value()).decode(), "record-%s" % count)
self.assertEqual(i.value_string(), "record-%s" % count)
self.assertEqual(i.key_string(), ("%s" % count))
self.assertEqual(i.key(), list(("%s" % count).encode()))

count += 1
if count >= 10:
Expand All @@ -114,14 +140,11 @@ def test_produce_on_uncreated_topic(self):
fluvio.topic_producer(self.topic)
except FluviorError as e:
error = e
print('ERROR: %s' % e)
print("ERROR: %s" % e)

self.assertTrue(error is not None)
self.assertEqual(
error.args,
(
'Topic not found: %s' % self.topic, # noqa: E501
)
error.args, ("Topic not found: %s" % self.topic,) # noqa: E501
)


Expand All @@ -145,6 +168,7 @@ def test_produce_flush(self):
producer.flush()

import subprocess

result = subprocess.run(
"fluvio consume %s -B 0 -d" % self.topic,
shell=True,
Expand All @@ -154,7 +178,7 @@ def test_produce_flush(self):
)

# The CLI appends an extra newline to the output.
expected_output.append('')
expected_output.append("")
expected_output = "\n".join(expected_output)
stdout = result.stdout

Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[flake8]
exclude = .git,.tox
max-line-length = 119
52 changes: 52 additions & 0 deletions src/glue.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ use fluvio::{
Offset,
consumer::Record,
};
use std::io::{Read, Error};
use flate2::bufread::GzEncoder;
use flate2::Compression;
use fluvio_future::{
task::run_block_on,
io::{
Stream,
StreamExt,
},
};
use fluvio::consumer::{
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
ConsumerConfig
};
use std::pin::Pin;

mod _Fluvio {
Expand Down Expand Up @@ -52,6 +61,28 @@ foreign_class!(class Fluvio {
});


pub struct ConsumerConfigWrapper {
wasm_module: Vec<u8>,
}

impl ConsumerConfigWrapper {
fn new_config_with_wasm_filter(file: &str) -> Result<ConsumerConfigWrapper, std::io::Error> {
let raw_buffer = match std::fs::read(file) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a show stopper. this could be reduced to raw_buffer = read(file)?

Ok(b) => b,
Err(error) => return Err(error)
};
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
match encoder.read_to_end(&mut buffer) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Ok(encoder) => encoder,
Err(error) => return Err(error)
};
Ok(ConsumerConfigWrapper {
wasm_module: buffer,
})
}
}

mod _PartitionConsumer {
use super::*;
pub fn stream(
Expand All @@ -62,13 +93,34 @@ mod _PartitionConsumer {
inner: Box::pin(run_block_on(consumer.stream(offset.clone()))?)
})
}
pub fn stream_with_config(
consumer: &PartitionConsumer,
offset: &Offset,
wasm_module_path: &str
) -> Result<PartitionConsumerStream, FluvioError> {
let config_wrapper = match ConsumerConfigWrapper::new_config_with_wasm_filter(wasm_module_path) {
Ok(config) => config,
Err(error) => return Err(FluvioError::Other(error.to_string()))
};
let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(config_wrapper.wasm_module),
kind: SmartModuleKind::Filter,
params: Default::default()
}));
let config = builder.build().map_err(|err| FluvioError::Other(err.to_string()))?;
run_block_on(consumer.stream_with_config(offset.clone(), config)).map(|stream| PartitionConsumerStream { inner: Box::pin(stream) })
}
}
foreign_class!(class PartitionConsumer {
self_type PartitionConsumer;
private constructor = empty;
fn _PartitionConsumer::stream(
&self, _: &Offset
) -> Result<PartitionConsumerStream, FluvioError>;
fn _PartitionConsumer::stream_with_config(
&self, _: &Offset, _: &str
) -> Result<PartitionConsumerStream, FluvioError>;
});

type PartitionConsumerIteratorInner =
Expand Down