Skip to content

Commit

Permalink
Implement Sync on OwnedHeaders
Browse files Browse the repository at this point in the history
The rationale here is the same as the rationale for implementing Sync on
BorrowedMessage (2779f77).

Fix #200.
  • Loading branch information
benesch committed Dec 31, 2019
1 parent f4ea213 commit 17db9c0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
11 changes: 9 additions & 2 deletions examples/asynchronous_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ use crate::example_utils::setup_logger;

mod example_utils;

async fn record_message_receipt(msg: &BorrowedMessage<'_>) {
async fn record_borrowed_message_receipt(msg: &BorrowedMessage<'_>) {
// Simulate some work that must be done in the same order as messages are
// received; i.e., before truly parallel processing can begin.
info!("Message received: {}", msg.offset());
}

async fn record_owned_message_receipt(msg: &OwnedMessage) {
// Like `record_borrowed_message_receipt`, but takes an `OwnedMessage`
// instead, as in a real-world use case an `OwnedMessage` might be more
// convenient than a `BorrowedMessage`.
}

// Emulates an expensive, synchronous computation.
fn expensive_computation<'a>(msg: OwnedMessage) -> String {
info!("Starting expensive computation on message {}", msg.offset());
Expand Down Expand Up @@ -79,10 +85,11 @@ async fn run_async_processor(
let output_topic = output_topic.to_string();
async move {
// Process each message
record_message_receipt(&borrowed_message).await;
record_borrowed_message_receipt(&borrowed_message).await;
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
record_owned_message_receipt(&owned_message).await;
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
Expand Down
1 change: 1 addition & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ pub struct OwnedHeaders {
}

unsafe impl Send for OwnedHeaders {}
unsafe impl Sync for OwnedHeaders {}

impl OwnedHeaders {
/// Create a new `OwnedHeaders` struct with initial capacity 5.
Expand Down

0 comments on commit 17db9c0

Please sign in to comment.