From 19df16fbbcf0d3c7c2984f3fede964ca3c0b94b9 Mon Sep 17 00:00:00 2001 From: lsabi <13497689+lsabi@users.noreply.github.com> Date: Tue, 29 Oct 2024 22:17:31 +0100 Subject: [PATCH] Fix Example and Add Support for Connection String (#24) closes #22 --- python_examples/consumer.py | 6 ++-- python_examples/producer.py | 16 +++++------ src/client.rs | 55 ++++++++++++++++++++++--------------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/python_examples/consumer.py b/python_examples/consumer.py index a4f13bf..2e73978 100644 --- a/python_examples/consumer.py +++ b/python_examples/consumer.py @@ -25,7 +25,7 @@ async def main(): async def consume_messages(client: IggyClient): interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep - logger.info(f"Messages will be consumed from stream: {STREAM_ID}, topic: {TOPIC_ID}, partition: {PARTITION_ID} with " + logger.info(f"Messages will be consumed from stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with " f"interval {interval * 1000} ms.") offset = 0 messages_per_batch = 10 @@ -33,8 +33,8 @@ async def consume_messages(client: IggyClient): try: logger.debug("Polling for messages...") polled_messages = client.poll_messages( - stream_id=STREAM_NAME, - topic_id=TOPIC_NAME, + stream=STREAM_NAME, + topic=TOPIC_NAME, partition_id=PARTITION_ID, count=messages_per_batch, auto_commit=False diff --git a/python_examples/producer.py b/python_examples/producer.py index 45b8cf0..22fc395 100644 --- a/python_examples/producer.py +++ b/python_examples/producer.py @@ -22,17 +22,17 @@ async def main(): def init_system(client: IggyClient): try: - logger.info(f"Creating stream with ID {STREAM_ID}...") - client.create_stream(stream_id=STREAM_ID, name="sample-stream") + logger.info(f"Creating stream with NAME {STREAM_NAME}...") + client.create_stream(name="sample-stream") logger.info("Stream was created successfuly.") except Exception as error: logger.error(f"Error creating stream: {error}") logger.exception(error) - + try: - logger.info(f"Creating topic with ID {TOPIC_ID} in stream {STREAM_ID}") + logger.info(f"Creating topic {TOPIC_NAME} in stream {STREAM_NAME}") client.create_topic( - stream_id=STREAM_NAME, # Assuming a method exists to create a numeric Identifier. + stream=STREAM_NAME, # Assuming a method exists to create a numeric Identifier. partitions_count=1, name="sample-topic", replication_factor=1 @@ -45,7 +45,7 @@ def init_system(client: IggyClient): async def produce_messages(client: IggyClient): interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep - logger.info(f"Messages will be sent to stream: {STREAM_ID}, topic: {TOPIC_ID}, partition: {PARTITION_ID} with interval {interval * 1000} ms.") + logger.info(f"Messages will be sent to stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with interval {interval * 1000} ms.") current_id = 0 messages_per_batch = 10 while True: @@ -58,8 +58,8 @@ async def produce_messages(client: IggyClient): logger.info(f"Attempting to send batch of {messages_per_batch} messages. Batch ID: {current_id // messages_per_batch}") try: client.send_messages( - stream_id=STREAM_NAME, - topic_id=TOPIC_NAME, + stream=STREAM_NAME, + topic=TOPIC_NAME, partitioning=PARTITION_ID, messages=messages, ) diff --git a/src/client.rs b/src/client.rs index aa33dd7..e58062d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use iggy::client::TopicClient; use iggy::client::{Client, MessageClient, StreamClient, UserClient}; use iggy::clients::client::IggyClient as RustIggyClient; +use iggy::clients::builder::IggyClientBuilder; use iggy::compression::compression_algorithm::CompressionAlgorithm; use iggy::consumer::Consumer as RustConsumer; use iggy::identifier::Identifier; @@ -50,15 +51,21 @@ impl IggyClient { /// This initializes a new runtime for asynchronous operations. /// Future versions might utilize asyncio for more Pythonic async. #[new] - fn new() -> Self { + #[pyo3(signature = (conn=None))] + fn new(conn: Option) -> Self { // TODO: use asyncio let runtime = Builder::new_multi_thread() .worker_threads(4) // number of worker threads .enable_all() // enables all available Tokio features .build() .unwrap(); + let client = IggyClientBuilder::new() + .with_tcp() + .with_server_address(conn.unwrap_or("127.0.0.1:8090".to_string())) + .build() + .unwrap(); IggyClient { - inner: RustIggyClient::default(), + inner: client, runtime, } } @@ -91,7 +98,7 @@ impl IggyClient { /// Returns Ok(()) on successful stream creation or a PyRuntimeError on failure. #[pyo3(signature = (name, stream_id = None))] fn create_stream(&self, name: String, stream_id: Option) -> PyResult<()> { - let create_stream_future = self.inner.create_stream(&name, stream_id); + let create_stream_future = self.inner.create_stream(&name, stream_id); let _create_stream = self .runtime .block_on(async move { create_stream_future.await }) @@ -103,23 +110,27 @@ impl IggyClient { /// /// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure. #[pyo3( - signature = (stream_id, name, partitions_count, compression_algorithm, topic_id = None, replication_factor = None) + signature = (stream, name, partitions_count, compression_algorithm = None, topic_id = None, replication_factor = None) )] fn create_topic( &self, - stream_id: PyIdentifier, + stream: PyIdentifier, name: String, partitions_count: u32, - compression_algorithm: String, + compression_algorithm: Option, topic_id: Option, replication_factor: Option, ) -> PyResult<()> { - let stream_id = Identifier::from(stream_id); - let compression_algorithm = CompressionAlgorithm::from_str(&compression_algorithm) - .map_err(|e| PyErr::new::(format!("{:?}", e)))?; + let compression_algorithm = match compression_algorithm { + Some(algo) => CompressionAlgorithm::from_str(&algo) + .map_err(|e| PyErr::new::(format!("{:?}", e)))?, + None => CompressionAlgorithm::default() + }; + + let stream = Identifier::from(stream); let create_topic_future = self.inner.create_topic( - &stream_id, + &stream, &name, partitions_count, compression_algorithm, @@ -140,8 +151,8 @@ impl IggyClient { /// Returns Ok(()) on successful sending or a PyRuntimeError on failure. fn send_messages( &self, - stream_id: PyIdentifier, - topic_id: PyIdentifier, + stream: PyIdentifier, + topic: PyIdentifier, partitioning: u32, messages: &Bound<'_, PyList>, ) -> PyResult<()> { @@ -154,13 +165,13 @@ impl IggyClient { .map(|message| message.inner) .collect::>(); - let stream_id = Identifier::from(stream_id); - let topic_id = Identifier::from(topic_id); - let partitioning = Partitioning::partition_id(partitioning); + let stream = Identifier::from(stream); + let topic = Identifier::from(topic); + let partitioning = Partitioning::partition_id(partitioning); let send_message_future = self.inner - .send_messages(&stream_id, &topic_id, &partitioning, messages.as_mut()); + .send_messages(&stream, &topic, &partitioning, messages.as_mut()); self.runtime .block_on(async move { send_message_future.await }) .map_err(|e| PyErr::new::(format!("{:?}", e)))?; @@ -172,20 +183,20 @@ impl IggyClient { /// Returns a list of received messages or a PyRuntimeError on failure. fn poll_messages( &self, - stream_id: PyIdentifier, - topic_id: PyIdentifier, + stream: PyIdentifier, + topic: PyIdentifier, partition_id: u32, count: u32, auto_commit: bool, ) -> PyResult> { let consumer = RustConsumer::default(); - let stream_id = Identifier::from(stream_id); - let topic_id = Identifier::from(topic_id); + let stream = Identifier::from(stream); + let topic = Identifier::from(topic); let strategy = PollingStrategy::next(); let poll_messages = self.inner.poll_messages( - &stream_id, - &topic_id, + &stream, + &topic, Some(partition_id), &consumer, &strategy,