diff --git a/src/change_stream/options.rs b/src/change_stream/options.rs index 70a587102..6a3351496 100644 --- a/src/change_stream/options.rs +++ b/src/change_stream/options.rs @@ -114,6 +114,7 @@ pub enum FullDocumentType { /// The /// [`ChangeStreamEvent::full_document`](crate::change_stream::event::ChangeStreamEvent::full_document) /// field will be populated with a copy of the entire document that was updated. + #[serde(rename = "updateLookup")] UpdateLookup, /// User-defined other types for forward compatibility. diff --git a/src/test/documentation_examples/mod.rs b/src/test/documentation_examples/mod.rs index 3b6938daf..8cb0f872d 100644 --- a/src/test/documentation_examples/mod.rs +++ b/src/test/documentation_examples/mod.rs @@ -9,7 +9,7 @@ use crate::{ bson::{doc, Bson}, error::{ErrorKind, Result}, options::{ClientOptions, FindOptions, ServerApi, ServerApiVersion}, - test::{TestClient, DEFAULT_URI, LOCK}, + test::{log_uncaptured, TestClient, DEFAULT_URI, LOCK}, Client, Collection, }; @@ -1441,7 +1441,6 @@ async fn versioned_api_examples() -> GenericResult<()> { use std::{error::Error, result::Result}; - use crate::test::log_uncaptured; // Start Versioned API Example 5 // With the `bson-chrono-0_4` feature enabled, this function can be dropped in favor of using // `chrono::DateTime` values directly. @@ -1755,6 +1754,81 @@ async fn index_examples() -> Result<()> { Ok(()) } +async fn change_streams_examples() -> Result<()> { + use crate::{change_stream::options::FullDocumentType, options::ChangeStreamOptions, RUNTIME}; + use std::time::Duration; + + let client = TestClient::new().await; + if !client.is_replica_set() && !client.is_sharded() { + log_uncaptured("skipping change_streams_examples due to unsupported topology"); + return Ok(()); + } + let db = client.database("change_streams_examples"); + db.drop(None).await?; + let inventory = db.collection::("inventory"); + // Populate an item so the collection exists for the change stream to watch. + inventory.insert_one(doc! {}, None).await?; + + // Background writer thread so that the `stream.next()` calls return something. + let (tx, mut rx) = tokio::sync::oneshot::channel(); + let writer_inventory = inventory.clone(); + let handle = RUNTIME + .spawn(async move { + let mut interval = RUNTIME.interval(Duration::from_millis(100)); + loop { + tokio::select! { + _ = interval.tick() => { + writer_inventory.insert_one(doc! {}, None).await?; + } + _ = &mut rx => break, + } + } + Result::Ok(()) + }) + .unwrap(); + + #[allow(unused_variables, unused_imports)] + { + { + // Start Changestream Example 1 + use futures::stream::TryStreamExt; + let mut stream = inventory.watch(None, None).await?; + let next = stream.try_next().await?; + // End Changestream Example 1 + } + + { + // Start Changestream Example 2 + use futures::stream::TryStreamExt; + let options = ChangeStreamOptions::builder() + .full_document(Some(FullDocumentType::UpdateLookup)) + .build(); + let mut stream = inventory.watch(None, options).await?; + let next = stream.try_next().await?; + // End Changestream Example 2 + } + + { + let stream = inventory.watch(None, None).await?; + // Start Changestream Example 3 + use futures::stream::TryStreamExt; + let resume_token = stream.resume_token(); + let options = ChangeStreamOptions::builder() + .resume_after(resume_token) + .build(); + let mut stream = inventory.watch(None, options).await?; + stream.try_next().await?; + // End Changestream Example 3 + } + } + + // Shut down the writer thread. + let _ = tx.send(()); + handle.await?; + + Ok(()) +} + #[cfg_attr(feature = "tokio-runtime", tokio::test)] #[cfg_attr(feature = "async-std-runtime", async_std::test)] async fn test() { @@ -1780,4 +1854,5 @@ async fn test() { aggregation_examples().await.unwrap(); run_command_examples().await.unwrap(); index_examples().await.unwrap(); + change_streams_examples().await.unwrap(); }