Skip to content

Commit

Permalink
RUST-1509 SDAM Logging (#918)
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelatkinson authored Jul 27, 2023
1 parent 85956f7 commit 219b934
Show file tree
Hide file tree
Showing 86 changed files with 5,108 additions and 385 deletions.
9 changes: 5 additions & 4 deletions src/cmap/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ pub struct ConnectionInfo {
#[derivative(Debug)]
pub(crate) struct Connection {
/// Driver-generated ID for the connection.
pub(super) id: u32,
pub(crate) id: u32,

/// Server-generated ID for the connection.
pub(crate) server_id: Option<i64>,

pub(crate) address: ServerAddress,

pub(crate) generation: ConnectionGeneration,

/// The cached StreamDescription from the connection's handshake.
Expand Down Expand Up @@ -164,9 +166,8 @@ impl Connection {

/// Create a connection intended for monitoring purposes.
/// TODO: RUST-1454 Rename this to just `new`, drop the pooling-specific data.
pub(crate) fn new_monitoring(address: ServerAddress, stream: AsyncStream) -> Self {
// Monitoring connections don't have IDs, so just use 0 as a placeholder here.
Self::new(address, stream, 0, ConnectionGeneration::Monitoring)
pub(crate) fn new_monitoring(address: ServerAddress, stream: AsyncStream, id: u32) -> Self {
Self::new(address, stream, id, ConnectionGeneration::Monitoring)
}

pub(crate) fn info(&self) -> ConnectionInfo {
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ impl ConnectionEstablisher {
pub(crate) async fn establish_monitoring_connection(
&self,
address: ServerAddress,
id: u32,
) -> Result<(Connection, HelloReply)> {
let stream = self.make_stream(address.clone()).await?;
let mut connection = Connection::new_monitoring(address, stream);
let mut connection = Connection::new_monitoring(address, stream, id);

let hello_reply = self.handshaker.handshake(&mut connection, None).await?;

Expand Down
23 changes: 23 additions & 0 deletions src/event/sdam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ pub struct ServerHeartbeatStartedEvent {

/// Determines if this heartbeat event is from an awaitable `hello`.
pub awaited: bool,

/// The driver-generated ID for the connection used for the heartbeat.
pub driver_connection_id: u32,

/// The server-generated ID for the connection used for the heartbeat. This value is only
/// present on server versions 4.2+. If this event corresponds to the first heartbeat on a
/// new monitoring connection, this value will not be present.
pub server_connection_id: Option<i64>,
}

/// Published when a server monitor's `hello` or legacy hello command succeeds.
Expand All @@ -139,6 +147,13 @@ pub struct ServerHeartbeatSucceededEvent {

/// Determines if this heartbeat event is from an awaitable `hello`.
pub awaited: bool,

/// The driver-generated ID for the connection used for the heartbeat.
pub driver_connection_id: u32,

/// The server-generated ID for the connection used for the heartbeat. This value is only
/// present for server versions 4.2+.
pub server_connection_id: Option<i64>,
}

/// Published when a server monitor's `hello` or legacy hello command fails.
Expand All @@ -158,6 +173,14 @@ pub struct ServerHeartbeatFailedEvent {

/// Determines if this heartbeat event is from an awaitable `hello`.
pub awaited: bool,

/// The driver-generated ID for the connection used for the heartbeat.
pub driver_connection_id: u32,

/// The server-generated ID for the connection used for the heartbeat. This value is only
/// present on server versions 4.2+. If this event corresponds to the first heartbeat on a
/// new monitoring connection, this value will not be present.
pub server_connection_id: Option<i64>,
}

#[derive(Clone, Debug)]
Expand Down
41 changes: 34 additions & 7 deletions src/sdam/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{
sync::Arc,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant},
};

use bson::doc;
use lazy_static::lazy_static;
use tokio::sync::watch;

use super::{
Expand All @@ -26,6 +30,13 @@ use crate::{
runtime::{self, stream::DEFAULT_CONNECT_TIMEOUT, WorkerHandle, WorkerHandleListener},
};

fn next_monitoring_connection_id() -> u32 {
lazy_static! {
static ref MONITORING_CONNECTION_ID: AtomicU32 = AtomicU32::new(0);
}
MONITORING_CONNECTION_ID.fetch_add(1, Ordering::SeqCst)
}

pub(crate) const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
pub(crate) const MIN_HEARTBEAT_FREQUENCY: Duration = Duration::from_millis(500);

Expand Down Expand Up @@ -162,10 +173,18 @@ impl Monitor {
}

async fn perform_hello(&mut self) -> HelloResult {
let driver_connection_id = self
.connection
.as_ref()
.map(|c| c.id)
.unwrap_or(next_monitoring_connection_id());

self.emit_event(|| {
SdamEvent::ServerHeartbeatStarted(ServerHeartbeatStartedEvent {
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
});

Expand Down Expand Up @@ -215,7 +234,7 @@ impl Monitor {
let start = Instant::now();
let res = self
.connection_establisher
.establish_monitoring_connection(self.address.clone())
.establish_monitoring_connection(self.address.clone(), driver_connection_id)
.await;
match res {
Ok((conn, hello_reply)) => {
Expand Down Expand Up @@ -264,6 +283,8 @@ impl Monitor {
reply,
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
});

Expand All @@ -272,18 +293,21 @@ impl Monitor {
self.topology_version = r.command_response.topology_version;
}
HelloResult::Err(ref e) | HelloResult::Cancelled { reason: ref e } => {
// Per the spec, cancelled requests and errors both require the monitoring
// connection to be closed.
self.connection = None;
self.rtt_monitor_handle.reset_average_rtt();
self.emit_event(|| {
SdamEvent::ServerHeartbeatFailed(ServerHeartbeatFailedEvent {
duration,
failure: e.clone(),
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
});

// Per the spec, cancelled requests and errors both require the monitoring
// connection to be closed.
self.connection = None;
self.rtt_monitor_handle.reset_average_rtt();
self.topology_version.take();
}
}
Expand Down Expand Up @@ -402,7 +426,10 @@ impl RttMonitor {
None => {
let connection = self
.connection_establisher
.establish_monitoring_connection(self.address.clone())
.establish_monitoring_connection(
self.address.clone(),
next_monitoring_connection_id(),
)
.await?
.0;
self.connection = Some(connection);
Expand Down
93 changes: 65 additions & 28 deletions src/sdam/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use crate::{
TopologyType,
};

#[cfg(feature = "tracing-unstable")]
use crate::trace::topology::TopologyTracingEventEmitter;

use super::{
monitor::{MonitorManager, MonitorRequestReceiver},
srv_polling::SrvPollingMonitor,
Expand All @@ -67,22 +70,36 @@ pub(crate) struct Topology {
impl Topology {
pub(crate) fn new(options: ClientOptions) -> Result<Topology> {
let description = TopologyDescription::default();
let id = ObjectId::new();

let event_emitter = options.sdam_event_handler.as_ref().map(|handler| {
let (tx, mut rx) = mpsc::unbounded_channel::<AcknowledgedMessage<SdamEvent>>();

// Spin up a task to handle events so that a user's event handling code can't block the
// TopologyWorker.
let handler = handler.clone();
runtime::execute(async move {
while let Some(event) = rx.recv().await {
let (event, ack) = event.into_parts();
handle_sdam_event(handler.as_ref(), event);
ack.acknowledge(());
}
});
SdamEventEmitter { sender: tx }
});
let event_emitter =
if options.sdam_event_handler.is_some() || cfg!(feature = "tracing-unstable") {
let user_handler = options.sdam_event_handler.clone();

#[cfg(feature = "tracing-unstable")]
let tracing_emitter =
TopologyTracingEventEmitter::new(options.tracing_max_document_length_bytes, id);
let (tx, mut rx) = mpsc::unbounded_channel::<AcknowledgedMessage<SdamEvent>>();
runtime::execute(async move {
while let Some(event) = rx.recv().await {
let (event, ack) = event.into_parts();

if let Some(ref user_handler) = user_handler {
#[cfg(feature = "tracing-unstable")]
handle_sdam_event(user_handler.as_ref(), event.clone());
#[cfg(not(feature = "tracing-unstable"))]
handle_sdam_event(user_handler.as_ref(), event);
}
#[cfg(feature = "tracing-unstable")]
handle_sdam_event(&tracing_emitter, event);

ack.acknowledge(());
}
});
Some(SdamEventEmitter { sender: tx })
} else {
None
};

let (updater, update_receiver) = TopologyUpdater::channel();
let (worker_handle, handle_listener) = WorkerHandleListener::channel();
Expand All @@ -95,8 +112,6 @@ impl Topology {
let connection_establisher =
ConnectionEstablisher::new(EstablisherOptions::from_client_options(&options))?;

let id = ObjectId::new();

let worker = TopologyWorker {
id,
topology_description: description,
Expand Down Expand Up @@ -375,18 +390,41 @@ impl TopologyWorker {
// indicate to the topology watchers that the topology is no longer alive
drop(self.publisher);

// close all the monitors.
let mut close_futures = self
.servers
.into_values()
.map(|server| {
drop(server.inner);
server.monitor_manager.close_monitor()
})
.collect::<FuturesUnordered<_>>();
// Close all the monitors.
let mut close_futures = FuturesUnordered::new();
for (address, server) in self.servers.into_iter() {
if let Some(ref emitter) = self.event_emitter {
emitter
.emit(SdamEvent::ServerClosed(ServerClosedEvent {
address,
topology_id: self.id,
}))
.await;
}
drop(server.inner);
close_futures.push(server.monitor_manager.close_monitor());
}
while close_futures.next().await.is_some() {}

if let Some(emitter) = self.event_emitter {
if !self.topology_description.servers.is_empty()
&& self.options.load_balanced != Some(true)
{
let previous_description = self.topology_description;
let mut new_description = previous_description.clone();
new_description.servers.clear();

emitter
.emit(SdamEvent::TopologyDescriptionChanged(Box::new(
TopologyDescriptionChangedEvent {
topology_id: self.id,
previous_description: previous_description.into(),
new_description: new_description.into(),
},
)))
.await;
}

emitter
.emit(SdamEvent::TopologyClosed(TopologyClosedEvent {
topology_id: self.id,
Expand Down Expand Up @@ -436,11 +474,10 @@ impl TopologyWorker {
let diff = old_description.diff(&self.topology_description);
let changed = diff.is_some();
if let Some(diff) = diff {
// For ordering of events in tests, sort the addresses.

#[cfg(not(test))]
let changed_servers = diff.changed_servers;

// For ordering of events in tests, sort the addresses.
#[cfg(test)]
let changed_servers = {
let mut servers = diff.changed_servers.into_iter().collect::<Vec<_>>();
Expand Down
6 changes: 6 additions & 0 deletions src/test/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ pub(crate) fn deserialize_spec_tests<T: DeserializeOwned>(
continue;
};

if let Ok(unskipped_filename) = std::env::var("TEST_FILE") {
if filename != unskipped_filename {
continue;
}
}

if let Some(skipped_files) = skipped_files {
if skipped_files.contains(&filename) {
log_uncaptured(format!("Skipping deserializing {:?}", &path));
Expand Down
Loading

0 comments on commit 219b934

Please sign in to comment.