Skip to content

Commit

Permalink
RUST-1631 Always use polling monitoring when running in a FaaS enviro…
Browse files Browse the repository at this point in the history
…nment (#1030)
  • Loading branch information
abr-egn authored Feb 21, 2024
1 parent 8d09980 commit 4d9a320
Show file tree
Hide file tree
Showing 16 changed files with 834 additions and 24 deletions.
4 changes: 3 additions & 1 deletion .evergreen/aws-lambda-test/mongodb/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ impl Stats {

fn handle_sdam(&mut self, event: &SdamEvent) {
match event {
SdamEvent::ServerHeartbeatStarted(_) => {
SdamEvent::ServerHeartbeatStarted(ev) => {
assert!(!ev.awaited);
self.heartbeats_started += 1;
}
SdamEvent::ServerHeartbeatFailed(ev) => {
assert!(!ev.awaited);
self.failed_heartbeat_durations_millis.push(ev.duration.as_millis());
}
_ => (),
Expand Down
46 changes: 46 additions & 0 deletions src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const URI_OPTIONS: &[&str] = &[
"replicaset",
"retrywrites",
"retryreads",
"servermonitoringmode",
"serverselectiontimeoutms",
"sockettimeoutms",
"tls",
Expand Down Expand Up @@ -512,6 +513,12 @@ pub struct ClientOptions {
#[builder(default)]
pub retry_writes: Option<bool>,

/// Configures which server monitoring protocol to use.
///
/// The default is [`Auto`](ServerMonitoringMode::Auto).
#[builder(default)]
pub server_monitoring_mode: Option<ServerMonitoringMode>,

/// The handler that should process all Server Discovery and Monitoring events.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[builder(default, setter(strip_option))]
Expand Down Expand Up @@ -683,6 +690,8 @@ impl Serialize for ClientOptions {

retrywrites: &'a Option<bool>,

servermonitoringmode: Option<String>,

#[serde(
flatten,
serialize_with = "SelectionCriteria::serialize_for_client_options"
Expand Down Expand Up @@ -723,6 +732,10 @@ impl Serialize for ClientOptions {
replicaset: &self.repl_set_name,
retryreads: &self.retry_reads,
retrywrites: &self.retry_writes,
servermonitoringmode: self
.server_monitoring_mode
.as_ref()
.map(|m| format!("{:?}", m).to_lowercase()),
selectioncriteria: &self.selection_criteria,
serverselectiontimeoutms: &self.server_selection_timeout,
sockettimeoutms: &self.socket_timeout,
Expand Down Expand Up @@ -844,6 +857,11 @@ pub struct ConnectionString {
/// The default value is true.
pub retry_writes: Option<bool>,

/// Configures which server monitoring protocol to use.
///
/// The default is [`Auto`](ServerMonitoringMode::Auto).
pub server_monitoring_mode: Option<ServerMonitoringMode>,

/// Specifies whether the Client should directly connect to a single host rather than
/// autodiscover all servers in the cluster.
///
Expand Down Expand Up @@ -1340,6 +1358,7 @@ impl ClientOptions {
connect_timeout: conn_str.connect_timeout,
retry_reads: conn_str.retry_reads,
retry_writes: conn_str.retry_writes,
server_monitoring_mode: conn_str.server_monitoring_mode,
socket_timeout: conn_str.socket_timeout,
direct_connection: conn_str.direct_connection,
default_database: conn_str.default_database,
Expand Down Expand Up @@ -2182,6 +2201,19 @@ impl ConnectionString {
k @ "retryreads" => {
self.retry_reads = Some(get_bool!(value, k));
}
"servermonitoringmode" => {
self.server_monitoring_mode = Some(match value.to_lowercase().as_str() {
"stream" => ServerMonitoringMode::Stream,
"poll" => ServerMonitoringMode::Poll,
"auto" => ServerMonitoringMode::Auto,
other => {
return Err(Error::invalid_argument(format!(
"{:?} is not a valid server monitoring mode",
other
)));
}
});
}
k @ "serverselectiontimeoutms" => {
self.server_selection_timeout = Some(Duration::from_millis(get_duration!(value, k)))
}
Expand Down Expand Up @@ -2875,3 +2907,17 @@ pub struct TransactionOptions {
)]
pub max_commit_time: Option<Duration>,
}

/// Which server monitoring protocol to use.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[non_exhaustive]
pub enum ServerMonitoringMode {
/// The client will use the streaming protocol when the server supports it and fall back to the
/// polling protocol otherwise.
Stream,
/// The client will use the polling protocol.
Poll,
/// The client will use the polling protocol when running on a FaaS platform and behave the
/// same as `Stream` otherwise.
Auto,
}
4 changes: 4 additions & 0 deletions src/cmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,7 @@ impl ConnectionPool {
self.manager.broadcast(msg)
}
}

pub(crate) fn is_faas() -> bool {
establish::handshake::FaasEnvironmentName::new().is_some()
}
4 changes: 2 additions & 2 deletions src/cmap/establish/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct RuntimeEnvironment {
}

#[derive(Copy, Clone, Debug, PartialEq)]
enum FaasEnvironmentName {
pub(crate) enum FaasEnvironmentName {
AwsLambda,
AzureFunc,
GcpFunc,
Expand Down Expand Up @@ -221,7 +221,7 @@ fn var_set(name: &str) -> bool {
}

impl FaasEnvironmentName {
fn new() -> Option<Self> {
pub(crate) fn new() -> Option<Self> {
use FaasEnvironmentName::*;
let mut found: Option<Self> = None;
let lambda_env = env::var_os("AWS_EXECUTION_ENV")
Expand Down
51 changes: 40 additions & 11 deletions src/sdam/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::{
TopologyWatcher,
};
use crate::{
client::options::ServerMonitoringMode,
cmap::{establish::ConnectionEstablisher, Connection},
error::{Error, Result},
event::sdam::{
Expand Down Expand Up @@ -48,11 +49,15 @@ pub(crate) struct Monitor {
sdam_event_emitter: Option<SdamEventEmitter>,
client_options: ClientOptions,

/// Whether this monitor is allowed to use the streaming protocol.
allow_streaming: bool,

/// The most recent topology version returned by the server in a hello response.
/// If some, indicates that this monitor should use the streaming protocol. If none, it should
/// use the polling protocol.
topology_version: Option<TopologyVersion>,

/// The RTT monitor; once it's started this is None.
pending_rtt_monitor: Option<RttMonitor>,

/// Handle to the RTT monitor, used to get the latest known round trip time for a given server
/// and to reset the RTT when the monitor disconnects from the server.
rtt_monitor_handle: RttMonitorHandle,
Expand All @@ -79,21 +84,31 @@ impl Monitor {
connection_establisher.clone(),
client_options.clone(),
);
let allow_streaming = match client_options
.server_monitoring_mode
.clone()
.unwrap_or(ServerMonitoringMode::Auto)
{
ServerMonitoringMode::Stream => true,
ServerMonitoringMode::Poll => false,
ServerMonitoringMode::Auto => !crate::cmap::is_faas(),
};
let monitor = Self {
address,
client_options,
connection_establisher,
topology_updater,
topology_watcher,
sdam_event_emitter,
pending_rtt_monitor: Some(rtt_monitor),
rtt_monitor_handle,
request_receiver: manager_receiver,
connection: None,
allow_streaming,
topology_version: None,
};

runtime::execute(monitor.execute());
runtime::execute(rtt_monitor.execute());
}

async fn execute(mut self) {
Expand All @@ -102,13 +117,19 @@ impl Monitor {
while self.is_alive() {
let check_succeeded = self.check_server().await;

if self.topology_version.is_some() && self.allow_streaming {
if let Some(rtt_monitor) = self.pending_rtt_monitor.take() {
runtime::execute(rtt_monitor.execute());
}
}

// In the streaming protocol, we read from the socket continuously
// rather than polling at specific intervals, unless the most recent check
// failed.
//
// We only go to sleep when using the polling protocol (i.e. server never returned a
// topologyVersion) or when the most recent check failed.
if self.topology_version.is_none() || !check_succeeded {
if self.topology_version.is_none() || !check_succeeded || !self.allow_streaming {
self.request_receiver
.wait_for_check_request(
self.client_options.min_heartbeat_frequency(),
Expand Down Expand Up @@ -180,7 +201,7 @@ impl Monitor {
self.emit_event(|| {
SdamEvent::ServerHeartbeatStarted(ServerHeartbeatStartedEvent {
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
awaited: self.topology_version.is_some() && self.allow_streaming,
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
Expand Down Expand Up @@ -213,10 +234,14 @@ impl Monitor {
} else {
// If the initial handshake returned a topology version, send it back to the
// server to begin streaming responses.
let opts = self.topology_version.map(|tv| AwaitableHelloOptions {
topology_version: tv,
max_await_time: heartbeat_frequency,
});
let opts = if self.allow_streaming {
self.topology_version.map(|tv| AwaitableHelloOptions {
topology_version: tv,
max_await_time: heartbeat_frequency,
})
} else {
None
};

let command = hello_command(
self.client_options.server_api.as_ref(),
Expand Down Expand Up @@ -266,8 +291,12 @@ impl Monitor {
};
let duration = start.elapsed();

let awaited = self.topology_version.is_some() && self.allow_streaming;
match result {
HelloResult::Ok(ref r) => {
if !awaited {
self.rtt_monitor_handle.add_sample(duration);
}
self.emit_event(|| {
let mut reply = r
.raw_command_response
Expand All @@ -280,7 +309,7 @@ impl Monitor {
duration,
reply,
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
awaited,
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
Expand All @@ -296,7 +325,7 @@ impl Monitor {
duration,
failure: e.clone(),
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
awaited,
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
Expand Down
4 changes: 2 additions & 2 deletions src/test/spec/json/server-discovery-and-monitoring/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ Integration Tests

Integration tests are provided in the "unified" directory and are
written in the `Unified Test Format
<../unified-test-format/unified-test-format.rst>`_.
<../../unified-test-format/unified-test-format.md>`_.

Prose Tests
-----------
Expand Down Expand Up @@ -264,4 +264,4 @@ Run the following test(s) on MongoDB 4.4+.

.. Section for links.
.. _Server Description Equality: /source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#server-description-equality
.. _Server Description Equality: /source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#server-description-equality
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ tests:
event:
poolClearedEvent: {}
count: 1
# Perform an operation to ensure the node still useable.
# Perform an operation to ensure the node still usable.
- name: insertOne
object: *collection
arguments:
Expand Down
Loading

0 comments on commit 4d9a320

Please sign in to comment.