Skip to content

Commit

Permalink
Connection string config for replication.
Browse files Browse the repository at this point in the history
Co-authored-by: Petros Angelatos <[email protected]>
  • Loading branch information
jeff-davis and petrosagg committed Jan 13, 2023
1 parent 32cfa91 commit b7755b2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
45 changes: 45 additions & 0 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ pub enum ChannelBinding {
Require,
}

/// Replication mode configuration.
///
/// It is recommended that you use a PostgreSQL server patch version
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
/// handle pipelined requests after streaming has stopped.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ReplicationMode {
/// Physical replication.
Physical,
/// Logical replication.
Logical,
}

/// A host specification.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Host {
Expand Down Expand Up @@ -177,6 +192,7 @@ pub struct Config {
pub(crate) keepalive_config: KeepaliveConfig,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
pub(crate) replication_mode: Option<ReplicationMode>,
}

impl Default for Config {
Expand Down Expand Up @@ -210,6 +226,7 @@ impl Config {
keepalive_config,
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
replication_mode: None,
}
}

Expand Down Expand Up @@ -479,6 +496,22 @@ impl Config {
self.channel_binding
}

/// Set replication mode.
///
/// It is recommended that you use a PostgreSQL server patch version
/// of at least: 14.0, 13.2, 12.6, 11.11, 10.16, 9.6.21, or
/// 9.5.25. Earlier patch levels have a bug that doesn't properly
/// handle pipelined requests after streaming has stopped.
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
self.replication_mode = Some(replication_mode);
self
}

/// Get replication mode.
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
self.replication_mode
}

fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
match key {
"user" => {
Expand Down Expand Up @@ -608,6 +641,17 @@ impl Config {
};
self.channel_binding(channel_binding);
}
"replication" => {
let mode = match value {
"off" => None,
"true" => Some(ReplicationMode::Physical),
"database" => Some(ReplicationMode::Logical),
_ => return Err(Error::config_parse(Box::new(InvalidValue("replication")))),
};
if let Some(mode) = mode {
self.replication_mode(mode);
}
}
key => {
return Err(Error::config_parse(Box::new(UnknownOption(
key.to_string(),
Expand Down Expand Up @@ -685,6 +729,7 @@ impl fmt::Debug for Config {
.field("keepalives_retries", &self.keepalive_config.retries)
.field("target_session_attrs", &self.target_session_attrs)
.field("channel_binding", &self.channel_binding)
.field("replication", &self.replication_mode)
.finish()
}
}
Expand Down
8 changes: 7 additions & 1 deletion tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{self, Config};
use crate::config::{self, Config, ReplicationMode};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{TlsConnect, TlsStream};
Expand Down Expand Up @@ -124,6 +124,12 @@ where
if let Some(application_name) = &config.application_name {
params.push(("application_name", &**application_name));
}
if let Some(replication_mode) = &config.replication_mode {
match replication_mode {
ReplicationMode::Physical => params.push(("replication", "true")),
ReplicationMode::Logical => params.push(("replication", "database")),
}
}

let mut buf = BytesMut::new();
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;
Expand Down

0 comments on commit b7755b2

Please sign in to comment.