Skip to content

Commit

Permalink
Merge pull request #80 from vertexclique/manual-commit
Browse files Browse the repository at this point in the history
Implement manual commit
  • Loading branch information
vertexclique authored May 8, 2022
2 parents f72f9fb + 8be2175 commit efe9f8a
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 5 deletions.
54 changes: 54 additions & 0 deletions examples/manual-commit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use callysto::futures::StreamExt;
use callysto::prelude::message::*;
use callysto::prelude::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

#[derive(Clone)]
struct SharedState {
value: Arc<AtomicU32>,
}

impl SharedState {
fn new() -> Self {
Self {
value: Arc::new(AtomicU32::default()),
}
}
}

async fn manual_commit_counter_agent(mut stream: CStream, ctx: Context<SharedState>) -> Result<()> {
while let Some(msg) = stream.next().await {
let m = msg.unwrap();
// Read the incoming bytes as string
let strm = m.payload_view::<str>().unwrap().unwrap().to_owned();
println!("Received payload: `{}`", strm);

// Increment message counter and print it.
// Show how you can store an application state.
let state = ctx.state();
let msgcount = state.value.fetch_add(1, Ordering::AcqRel);
println!("Message count: `{}`", msgcount);

// We are done here.
ctx.commit_async(m).await?;
}

Ok(())
}

fn main() {
let mut app = Callysto::with_state(SharedState::new());

let mut config = Config::default();
config.kafka_config.enable_auto_commit = false;

app.with_name("manual-commit").with_config(config);
app.agent(
"manual_commit_counter_agent",
app.topic("example"),
manual_commit_counter_agent,
);

app.run();
}
2 changes: 1 addition & 1 deletion src/kafka/cconsumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Stream for CStream {
}

impl CConsumer {
fn consumer_instance(&self) -> Arc<BaseConsumer<CConsumerContext>> {
pub(crate) fn consumer_instance(&self) -> Arc<BaseConsumer<CConsumerContext>> {
self.consumer.clone()
}

Expand Down
3 changes: 2 additions & 1 deletion src/types/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ where
}
let stream = consumer.cstream();
let state = state.clone();
let context = Context::new(state);
let mut context = Context::new(state);
context.set_consumer(consumer.consumer_instance());
if let Err(e) = Agent::<State>::call(self, stream, context).await {
error!("CAgent failed: {}", e);
self.crash().await;
Expand Down
60 changes: 58 additions & 2 deletions src/types/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ use super::service::{Service, ServiceState};
use crate::errors::Result as CResult;
use crate::errors::*;
use crate::kafka::ctopic::*;
use crate::prelude::CConsumerContext;
use crate::types::table::CTable;
use async_trait::*;
use futures::future::{BoxFuture, TryFutureExt};
use futures::FutureExt;
use rdkafka::message::OwnedMessage;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer};
use rdkafka::message::{BorrowedMessage, OwnedMessage};
use rdkafka::Offset::Offset;
use rdkafka::{Message, TopicPartitionList};
use std::collections::HashMap;
use std::future::Future;
use std::io::Read;
Expand All @@ -26,20 +30,72 @@ where
State: Clone + Send + Sync + 'static,
{
state: State,
consumer: Option<Arc<BaseConsumer<CConsumerContext>>>,
}

impl<State> Context<State>
where
State: Clone + Send + Sync + 'static,
{
///
/// Constructor of the [Context].
pub fn new(state: State) -> Self {
Self { state }
Self {
state,
consumer: None,
}
}

///
/// Borrow state immutably
pub fn state(&self) -> &State {
&self.state
}

pub(crate) fn set_consumer(&mut self, consumer: Arc<BaseConsumer<CConsumerContext>>) {
self.consumer = Some(consumer);
}

pub(crate) fn get_consumer(&self) -> Option<Arc<BaseConsumer<CConsumerContext>>> {
self.consumer.clone()
}

///
/// Check context has the consumer instance or not.
pub fn has_consumer(&self) -> bool {
self.consumer.is_some()
}

///
/// Commit specific offset of a given message with given mode.
fn commit_internal(&self, msg: OwnedMessage, commit_mode: CommitMode) -> Result<()> {
let consumer = self.get_consumer().ok_or_else(|| {
CallystoError::GeneralError("No consumer instance set in context.".into())
})?;

let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(msg.topic(), msg.partition(), Offset(msg.offset()))?;
consumer.commit(&tpl, commit_mode);

Ok(())
}

///
/// Commit specific offset of a given message synchronously.
/// NOTE: This will also automatically commit every message with lower offset within the same partition.
pub fn commit(&self, msg: OwnedMessage) -> Result<()> {
self.commit_internal(msg, CommitMode::Sync)
}

///
/// Commit specific offset of a given message asynchronously.
/// NOTE: This will also automatically commit every message with lower offset within the same partition.
pub async fn commit_async(&self, msg: OwnedMessage) -> Result<()> {
self.commit_internal(msg, CommitMode::Async)
}

///
/// Borrow the state mutably.
pub fn state_mut(&mut self) -> &mut State {
&mut self.state
}
Expand Down
3 changes: 2 additions & 1 deletion src/types/table_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ where
// Error while receiving from Kafka.
break 'fallback;
}
let context = Context::new(state);
let mut context = Context::new(state);
context.set_consumer(consumer.consumer_instance());
if let Err(e) = TableAgent::<State>::call(self, message, tables, context).await
{
error!("CTableAgent failed: {}", e);
Expand Down

0 comments on commit efe9f8a

Please sign in to comment.