Skip to content

Commit

Permalink
Implement methods for getting stream and topic with base data
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 31, 2024
1 parent 19df16f commit 69d4599
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 43 deletions.
5 changes: 3 additions & 2 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{
"name": "Iggy Python SDK devcontainer",
"dockerComposeFile": "docker-compose.yml",
service: "devcontainer",
"service": "devcontainer",
"features": {
"ghcr.io/devcontainers/features/rust:1": {}
}
},
"workspaceFolder": "/workspace"
}
2 changes: 2 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ services:
build:
context: .
dockerfile: Dockerfile
volumes:
- ..:/workspace:cached
network_mode: service:iggy
command: sleep infinity

Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iggy-py"
version = "0.2.4"
version = "0.2.5"
edition = "2021"
authors = ["Dario Lencina Talarico <[email protected]>"]

Expand All @@ -11,7 +11,7 @@ crate-type = ["cdylib"]

[dependencies]
pyo3 = "0.22.0"
iggy = "0.6.30"
iggy = "0.6.31"
tokio = { version = "1", features = ["full"] }
bytes = "1.7.1"
openssl = { version = "0.10.*", features = ["vendored"] }
9 changes: 4 additions & 5 deletions python_examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ async def main():
logger.exception("Exception occurred in main function: {}", error)



async def consume_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
logger.info(f"Messages will be consumed from stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with "
f"interval {interval * 1000} ms.")
logger.info(
f"Messages will be consumed from stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with "
f"interval {interval * 1000} ms.")
offset = 0
messages_per_batch = 10
while True:
Expand All @@ -37,7 +37,7 @@ async def consume_messages(client: IggyClient):
topic=TOPIC_NAME,
partition_id=PARTITION_ID,
count=messages_per_batch,
auto_commit=False
auto_commit=True
)
if not polled_messages:
logger.warning("No messages found in current poll")
Expand All @@ -58,6 +58,5 @@ def handle_message(message: ReceiveMessage):
logger.info(f"Handling message at offset: {message.offset()} with payload: {payload}...")



if __name__ == "__main__":
asyncio.run(main())
40 changes: 26 additions & 14 deletions python_examples/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from loguru import logger

# Assuming we have a Python module for iggy with similar functionality as the Rust one.
from iggy_py import IggyClient, SendMessage as Message
from iggy_py import IggyClient, SendMessage as Message, StreamDetails, TopicDetails

STREAM_NAME = "sample-stream"
TOPIC_NAME = "sample-topic"
Expand All @@ -22,30 +22,40 @@ async def main():

def init_system(client: IggyClient):
try:
logger.info(f"Creating stream with NAME {STREAM_NAME}...")
client.create_stream(name="sample-stream")
logger.info("Stream was created successfuly.")
logger.info(f"Creating stream with name {STREAM_NAME}...")
stream: StreamDetails = client.get_stream(STREAM_NAME)
if stream is None:
client.create_stream(name=STREAM_NAME)
logger.info("Stream was created successfully.")
else:
logger.info(f"Stream {stream.name} already exists with ID {stream.id}")

except Exception as error:
logger.error(f"Error creating stream: {error}")
logger.exception(error)

try:
logger.info(f"Creating topic {TOPIC_NAME} in stream {STREAM_NAME}")
client.create_topic(
stream=STREAM_NAME, # Assuming a method exists to create a numeric Identifier.
partitions_count=1,
name="sample-topic",
replication_factor=1
)
logger.info(f"Topic: was created successfullly.")
topic: TopicDetails = client.get_topic(STREAM_NAME, TOPIC_NAME)
if topic is None:
client.create_topic(
stream=STREAM_NAME, # Assuming a method exists to create a numeric Identifier.
partitions_count=1,
name=TOPIC_NAME,
replication_factor=1
)
logger.info(f"Topic was created successfully.")
else:
logger.info(f"Topic {topic.name} already exists with ID {topic.id}")
except Exception as error:
logger.error(f"Error creating topic {error}")
logger.exception(error)


async def produce_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
logger.info(f"Messages will be sent to stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")
logger.info(
f"Messages will be sent to stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with interval {interval * 1000} ms.")
current_id = 0
messages_per_batch = 10
while True:
Expand All @@ -55,15 +65,17 @@ async def produce_messages(client: IggyClient):
payload = f"message-{current_id}"
message = Message(payload) # Assuming a method exists to convert str to Message.
messages.append(message)
logger.info(f"Attempting to send batch of {messages_per_batch} messages. Batch ID: {current_id // messages_per_batch}")
logger.info(
f"Attempting to send batch of {messages_per_batch} messages. Batch ID: {current_id // messages_per_batch}")
try:
client.send_messages(
stream=STREAM_NAME,
topic=TOPIC_NAME,
partitioning=PARTITION_ID,
messages=messages,
)
logger.info(f"Succesffuly sent batch of {messages_per_batch} messages. Batch ID: {current_id // messages_per_batch}")
logger.info(
f"Successfully sent batch of {messages_per_batch} messages. Batch ID: {current_id // messages_per_batch}")
except Exception as error:
logger.error(f"Exception type: {type(error).__name__}, message: {error}")
logger.exception(error)
Expand Down
63 changes: 46 additions & 17 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::str::FromStr;

use iggy::client::TopicClient;
use iggy::client::{Client, MessageClient, StreamClient, UserClient};
use iggy::clients::client::IggyClient as RustIggyClient;
use iggy::clients::builder::IggyClientBuilder;
use iggy::clients::client::IggyClient as RustIggyClient;
use iggy::compression::compression_algorithm::CompressionAlgorithm;
use iggy::consumer::Consumer as RustConsumer;
use iggy::identifier::Identifier;
Expand All @@ -17,6 +17,8 @@ use tokio::runtime::{Builder, Runtime};

use crate::receive_message::ReceiveMessage;
use crate::send_message::SendMessage;
use crate::stream::StreamDetails;
use crate::topic::TopicDetails;

/// A Python class representing the Iggy client.
/// It wraps the RustIggyClient and provides asynchronous functionality
Expand Down Expand Up @@ -51,19 +53,19 @@ impl IggyClient {
/// This initializes a new runtime for asynchronous operations.
/// Future versions might utilize asyncio for more Pythonic async.
#[new]
#[pyo3(signature = (conn=None))]
#[pyo3(signature = (conn=None))]
fn new(conn: Option<String>) -> Self {
// TODO: use asyncio
let runtime = Builder::new_multi_thread()
.worker_threads(4) // number of worker threads
.enable_all() // enables all available Tokio features
.build()
.unwrap();
let client = IggyClientBuilder::new()
.with_tcp()
.with_server_address(conn.unwrap_or("127.0.0.1:8090".to_string()))
.build()
.unwrap();
let client = IggyClientBuilder::new()
.with_tcp()
.with_server_address(conn.unwrap_or("127.0.0.1:8090".to_string()))
.build()
.unwrap();
IggyClient {
inner: client,
runtime,
Expand Down Expand Up @@ -98,14 +100,27 @@ impl IggyClient {
/// Returns Ok(()) on successful stream creation or a PyRuntimeError on failure.
#[pyo3(signature = (name, stream_id = None))]
fn create_stream(&self, name: String, stream_id: Option<u32>) -> PyResult<()> {
let create_stream_future = self.inner.create_stream(&name, stream_id);
let create_stream_future = self.inner.create_stream(&name, stream_id);
let _create_stream = self
.runtime
.block_on(async move { create_stream_future.await })
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;
Ok(())
}

/// Gets stream by id.
///
/// Returns Option of stream details or a PyRuntimeError on failure.
fn get_stream(&self, stream_id: PyIdentifier) -> PyResult<Option<StreamDetails>> {
let stream_id = Identifier::from(stream_id);
let stream_future = self.inner.get_stream(&stream_id);
let stream = self
.runtime
.block_on(async move { stream_future.await })
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;
Ok(stream.map(StreamDetails::from))
}

/// Creates a new topic with the given parameters.
///
/// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure.
Expand All @@ -114,21 +129,21 @@ impl IggyClient {
)]
fn create_topic(
&self,
stream: PyIdentifier,
stream: PyIdentifier,
name: String,
partitions_count: u32,
compression_algorithm: Option<String>,
topic_id: Option<u32>,
replication_factor: Option<u8>,
) -> PyResult<()> {
let compression_algorithm = match compression_algorithm {
Some(algo) => CompressionAlgorithm::from_str(&algo).map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e))
})?,
None => CompressionAlgorithm::default(),
};

let compression_algorithm = match compression_algorithm {
Some(algo) => CompressionAlgorithm::from_str(&algo)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?,
None => CompressionAlgorithm::default()
};

let stream = Identifier::from(stream);
let stream = Identifier::from(stream);
let create_topic_future = self.inner.create_topic(
&stream,
&name,
Expand All @@ -146,6 +161,20 @@ impl IggyClient {
PyResult::Ok(())
}

/// Gets topic by stream and id.
///
/// Returns Option of topic details or a PyRuntimeError on failure.
fn get_topic(&self, stream_id: PyIdentifier, topic_id: PyIdentifier) -> PyResult<Option<TopicDetails>> {
let stream_id = Identifier::from(stream_id);
let topic_id = Identifier::from(topic_id);
let topic_future = self.inner.get_topic(&stream_id, &topic_id);
let topic = self
.runtime
.block_on(async move { topic_future.await })
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{:?}", e)))?;
Ok(topic.map(TopicDetails::from))
}

/// Sends a list of messages to the specified topic.
///
/// Returns Ok(()) on successful sending or a PyRuntimeError on failure.
Expand All @@ -167,7 +196,7 @@ impl IggyClient {

let stream = Identifier::from(stream);
let topic = Identifier::from(topic);
let partitioning = Partitioning::partition_id(partitioning);
let partitioning = Partitioning::partition_id(partitioning);

let send_message_future =
self.inner
Expand Down
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
mod client;
mod receive_message;
mod send_message;
mod stream;
mod topic;

use client::IggyClient;
use pyo3::prelude::*;
use receive_message::ReceiveMessage;
use send_message::SendMessage;
use stream::StreamDetails;
use topic::TopicDetails;

/// A Python module implemented in Rust.
#[pymodule]
fn iggy_py(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<SendMessage>()?;
m.add_class::<ReceiveMessage>()?;
m.add_class::<IggyClient>()?;
m.add_class::<StreamDetails>()?;
m.add_class::<TopicDetails>()?;
Ok(())
}
38 changes: 38 additions & 0 deletions src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use iggy::models::stream::StreamDetails as RustStreamDetails;
use pyo3::prelude::*;

#[pyclass]
pub struct StreamDetails {
pub(crate) inner: RustStreamDetails,
}

impl From<RustStreamDetails> for StreamDetails {
fn from(stream_details: RustStreamDetails) -> Self {
StreamDetails {
inner: stream_details,
}
}
}

#[pymethods]
impl StreamDetails {
#[getter]
pub fn id(&self) -> u32 {
self.inner.id
}

#[getter]
pub fn name(&self) -> String {
self.inner.name.to_string()
}

#[getter]
pub fn messages_count(&self) -> u64 {
self.inner.messages_count
}

#[getter]
pub fn topics_count(&self) -> u32 {
self.inner.topics_count
}
}
Loading

0 comments on commit 69d4599

Please sign in to comment.