Skip to content

Commit

Permalink
Fix Example and Add Support for Connection String (#24)
Browse files Browse the repository at this point in the history
closes #22
  • Loading branch information
lsabi authored Oct 29, 2024
1 parent 51fd128 commit 19df16f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 33 deletions.
6 changes: 3 additions & 3 deletions python_examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ async def main():

async def consume_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
logger.info(f"Messages will be consumed from stream: {STREAM_ID}, topic: {TOPIC_ID}, partition: {PARTITION_ID} with "
logger.info(f"Messages will be consumed from stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with "
f"interval {interval * 1000} ms.")
offset = 0
messages_per_batch = 10
while True:
try:
logger.debug("Polling for messages...")
polled_messages = client.poll_messages(
stream_id=STREAM_NAME,
topic_id=TOPIC_NAME,
stream=STREAM_NAME,
topic=TOPIC_NAME,
partition_id=PARTITION_ID,
count=messages_per_batch,
auto_commit=False
Expand Down
16 changes: 8 additions & 8 deletions python_examples/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ async def main():

def init_system(client: IggyClient):
try:
logger.info(f"Creating stream with ID {STREAM_ID}...")
client.create_stream(stream_id=STREAM_ID, name="sample-stream")
logger.info(f"Creating stream with NAME {STREAM_NAME}...")
client.create_stream(name="sample-stream")
logger.info("Stream was created successfuly.")
except Exception as error:
logger.error(f"Error creating stream: {error}")
logger.exception(error)

try:
logger.info(f"Creating topic with ID {TOPIC_ID} in stream {STREAM_ID}")
logger.info(f"Creating topic {TOPIC_NAME} in stream {STREAM_NAME}")
client.create_topic(
stream_id=STREAM_NAME, # Assuming a method exists to create a numeric Identifier.
stream=STREAM_NAME, # Assuming a method exists to create a numeric Identifier.
partitions_count=1,
name="sample-topic",
replication_factor=1
Expand All @@ -45,7 +45,7 @@ def init_system(client: IggyClient):

async def produce_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
logger.info(f"Messages will be sent to stream: {STREAM_ID}, topic: {TOPIC_ID}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")
logger.info(f"Messages will be sent to stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")
current_id = 0
messages_per_batch = 10
while True:
Expand All @@ -58,8 +58,8 @@ async def produce_messages(client: IggyClient):
logger.info(f"Attempting to send batch of {messages_per_batch} messages. Batch ID: {current_id // messages_per_batch}")
try:
client.send_messages(
stream_id=STREAM_NAME,
topic_id=TOPIC_NAME,
stream=STREAM_NAME,
topic=TOPIC_NAME,
partitioning=PARTITION_ID,
messages=messages,
)
Expand Down
55 changes: 33 additions & 22 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::str::FromStr;
use iggy::client::TopicClient;
use iggy::client::{Client, MessageClient, StreamClient, UserClient};
use iggy::clients::client::IggyClient as RustIggyClient;
use iggy::clients::builder::IggyClientBuilder;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::consumer::Consumer as RustConsumer;
use iggy::identifier::Identifier;
Expand Down Expand Up @@ -50,15 +51,21 @@ impl IggyClient {
/// This initializes a new runtime for asynchronous operations.
/// Future versions might utilize asyncio for more Pythonic async.
#[new]
fn new() -> Self {
#[pyo3(signature = (conn=None))]
fn new(conn: Option<String>) -> Self {
// TODO: use asyncio
let runtime = Builder::new_multi_thread()
.worker_threads(4) // number of worker threads
.enable_all() // enables all available Tokio features
.build()
.unwrap();
let client = IggyClientBuilder::new()
.with_tcp()
.with_server_address(conn.unwrap_or("127.0.0.1:8090".to_string()))
.build()
.unwrap();
IggyClient {
inner: RustIggyClient::default(),
inner: client,
runtime,
}
}
Expand Down Expand Up @@ -91,7 +98,7 @@ impl IggyClient {
/// Returns Ok(()) on successful stream creation or a PyRuntimeError on failure.
#[pyo3(signature = (name, stream_id = None))]
fn create_stream(&self, name: String, stream_id: Option<u32>) -> PyResult<()> {
let create_stream_future = self.inner.create_stream(&name, stream_id);
let create_stream_future = self.inner.create_stream(&name, stream_id);
let _create_stream = self
.runtime
.block_on(async move { create_stream_future.await })
Expand All @@ -103,23 +110,27 @@ impl IggyClient {
///
/// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure.
#[pyo3(
signature = (stream_id, name, partitions_count, compression_algorithm, topic_id = None, replication_factor = None)
signature = (stream, name, partitions_count, compression_algorithm = None, topic_id = None, replication_factor = None)
)]
fn create_topic(
&self,
stream_id: PyIdentifier,
stream: PyIdentifier,
name: String,
partitions_count: u32,
compression_algorithm: String,
compression_algorithm: Option<String>,
topic_id: Option<u32>,
replication_factor: Option<u8>,
) -> PyResult<()> {
let stream_id = Identifier::from(stream_id);
let compression_algorithm = CompressionAlgorithm::from_str(&compression_algorithm)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;

let compression_algorithm = match compression_algorithm {
Some(algo) => CompressionAlgorithm::from_str(&algo)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?,
None => CompressionAlgorithm::default()
};

let stream = Identifier::from(stream);
let create_topic_future = self.inner.create_topic(
&stream_id,
&stream,
&name,
partitions_count,
compression_algorithm,
Expand All @@ -140,8 +151,8 @@ impl IggyClient {
/// Returns Ok(()) on successful sending or a PyRuntimeError on failure.
fn send_messages(
&self,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
stream: PyIdentifier,
topic: PyIdentifier,
partitioning: u32,
messages: &Bound<'_, PyList>,
) -> PyResult<()> {
Expand All @@ -154,13 +165,13 @@ impl IggyClient {
.map(|message| message.inner)
.collect::<Vec<_>>();

let stream_id = Identifier::from(stream_id);
let topic_id = Identifier::from(topic_id);
let partitioning = Partitioning::partition_id(partitioning);
let stream = Identifier::from(stream);
let topic = Identifier::from(topic);
let partitioning = Partitioning::partition_id(partitioning);

let send_message_future =
self.inner
.send_messages(&stream_id, &topic_id, &partitioning, messages.as_mut());
.send_messages(&stream, &topic, &partitioning, messages.as_mut());
self.runtime
.block_on(async move { send_message_future.await })
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;
Expand All @@ -172,20 +183,20 @@ impl IggyClient {
/// Returns a list of received messages or a PyRuntimeError on failure.
fn poll_messages(
&self,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
stream: PyIdentifier,
topic: PyIdentifier,
partition_id: u32,
count: u32,
auto_commit: bool,
) -> PyResult<Vec<ReceiveMessage>> {
let consumer = RustConsumer::default();
let stream_id = Identifier::from(stream_id);
let topic_id = Identifier::from(topic_id);
let stream = Identifier::from(stream);
let topic = Identifier::from(topic);
let strategy = PollingStrategy::next();

let poll_messages = self.inner.poll_messages(
&stream_id,
&topic_id,
&stream,
&topic,
Some(partition_id),
&consumer,
&strategy,
Expand Down

0 comments on commit 19df16f

Please sign in to comment.