Skip to content

Commit

Permalink
Add support for reading/writing commit metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
dsabadie-datadog authored and benesch committed Oct 16, 2021
1 parent 3aa7f92 commit 2392a97
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
21 changes: 20 additions & 1 deletion src/topic_partition_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use std::ffi::{CStr, CString};
use std::fmt;
use std::slice;

use libc::c_void;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{KafkaDrop, NativePtr};
use crate::message::ToBytes;
use crate::util::{self, KafkaDrop, NativePtr};

const PARTITION_UNASSIGNED: i32 = -1;

Expand Down Expand Up @@ -134,6 +136,23 @@ impl<'a> TopicPartitionListElem<'a> {
)),
}
}

/// Returns the optional metadata associated with the entry.
pub fn metadata(&self) -> &[u8] {
unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) }
}

/// Sets the optional metadata associated with the entry.
pub fn set_metadata<B>(&mut self, metadata: &B)
where
B: ToBytes + ?Sized,
{
let metadata = metadata.to_bytes();
let buf = unsafe { libc::malloc(metadata.len()) };
unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
self.ptr.metadata = buf;
self.ptr.metadata_size = metadata.len();
}
}

impl<'a> PartialEq for TopicPartitionListElem<'a> {
Expand Down
56 changes: 56 additions & 0 deletions tests/test_high_consumers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Test data consumption using high level consumers.
use std::collections::HashMap;
use std::error::Error;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -372,3 +373,58 @@ async fn test_consumer_store_offset_commit() {
.unwrap();
assert_eq!(position, consumer.position().unwrap());
}

#[tokio::test(flavor = "multi_thread")]
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let topic_name = rand_test_topic();
let group_name = rand_test_group();
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;

let create_consumer = || async {
// Disable auto-commit so we can manually drive the commits.
let mut config = HashMap::new();
config.insert("enable.auto.commit", "false");
let consumer = create_stream_consumer(&group_name, Some(config));

// Subscribe to the topic and wait for at least one message, which
// ensures that the consumer group has been joined and such.
consumer.subscribe(&[topic_name.as_str()])?;
let _ = consumer.stream().next().await;

Ok::<_, Box<dyn Error>>(consumer)
};

// Create a topic partition list where each element has some associated
// metadata.
let tpl = {
let mut tpl = TopicPartitionList::new();
let mut tpl1 = tpl.add_partition(&topic_name, 0);
tpl1.set_offset(Offset::Offset(1))?;
tpl1.set_metadata(b"one");
let mut tpl2 = tpl.add_partition(&topic_name, 1);
tpl2.set_offset(Offset::Offset(1))?;
tpl2.set_metadata("t\x00wo");
let mut tpl3 = tpl.add_partition(&topic_name, 2);
tpl3.set_offset(Offset::Offset(1))?;
tpl3.set_metadata(b"badutf8\x80");
tpl
};

// Ensure that the commit state immediately includes the metadata.
{
let consumer = create_consumer().await?;
consumer.commit(&tpl, CommitMode::Sync)?;
assert_eq!(consumer.committed(None)?, tpl);
}

// Ensure that the commit state on a new consumer in the same group
// can see the same metadata.
{
let consumer = create_consumer().await?;
assert_eq!(consumer.committed(None)?, tpl);
}

Ok(())
}

0 comments on commit 2392a97

Please sign in to comment.