Skip to content

Commit

Permalink
Dynamically add/remove endpoint (#32)
Browse files Browse the repository at this point in the history
Co-authored-by: David Li <[email protected]>
  • Loading branch information
belltoy and davidli2010 authored Apr 5, 2022
1 parent 8f76fc4 commit 050257a
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tokio-stream = "0.1.8"
tower-service = "0.3.1"
http = "0.2.6"
visible = { version = "0.0.1", optional = true }
tower = "0.4.12"

[dev-dependencies]
tokio = { version = "1.17.0", features = ["full"] }
Expand Down
115 changes: 104 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ use crate::rpc::maintenance::{
use crate::rpc::watch::{WatchClient, WatchOptions, WatchStream, Watcher};
#[cfg(feature = "tls")]
use crate::TlsOptions;
use http::uri::Uri;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tonic::transport::{Channel, Endpoint};
use tower::discover::Change;

const HTTP_PREFIX: &str = "http://";
const HTTPS_PREFIX: &str = "https://";
Expand All @@ -51,13 +55,15 @@ pub struct Client {
maintenance: MaintenanceClient,
cluster: ClusterClient,
election: ElectionClient,
options: Option<ConnectOptions>,
tx: Sender<Change<Uri, Endpoint>>,
}

impl Client {
/// Connect to `etcd` servers from given `endpoints`.
pub async fn connect<E: AsRef<str>, S: AsRef<[E]>>(
endpoints: S,
mut options: Option<ConnectOptions>,
options: Option<ConnectOptions>,
) -> Result<Self> {
let endpoints = {
let mut eps = Vec::new();
Expand All @@ -68,14 +74,23 @@ impl Client {
eps
};

let channel = match endpoints.len() {
0 => return Err(Error::InvalidArgs(String::from("empty endpoints"))),
1 => endpoints[0].connect_lazy(),
_ => Channel::balance_list(endpoints.into_iter()),
};
if endpoints.is_empty() {
return Err(Error::InvalidArgs(String::from("empty endpoints")));
}

// Always use balance strategy even if there is only one endpoint.
let (channel, tx) = Channel::balance_channel(64);
for endpoint in endpoints {
// The rx inside `channel` won't be closed or dropped here
let _ = tx
.send(Change::Insert(endpoint.uri().clone(), endpoint))
.await
.unwrap();
}

let mut options = options;
let auth_token = Self::auth(channel.clone(), &mut options).await?;
Ok(Self::build_client(channel, auth_token))
Ok(Self::build_client(channel, tx, auth_token, options))
}

fn build_endpoint(url: &str, options: &Option<ConnectOptions>) -> Result<Endpoint> {
Expand Down Expand Up @@ -156,19 +171,27 @@ impl Client {
) -> Result<Option<Arc<http::HeaderValue>>> {
let user = match options {
None => return Ok(None),
Some(opt) => opt.user.take(),
Some(opt) => {
// Take away the user, the password should not be stored in client.
opt.user.take()
}
};

if let Some((name, password)) = user {
let mut tmp_auth = AuthClient::new(channel.clone(), None);
let mut tmp_auth = AuthClient::new(channel, None);
let resp = tmp_auth.authenticate(name, password).await?;
Ok(Some(Arc::new(resp.token().parse()?)))
} else {
Ok(None)
}
}

fn build_client(channel: Channel, auth_token: Option<Arc<http::HeaderValue>>) -> Self {
fn build_client(
channel: Channel,
tx: Sender<Change<Uri, Endpoint>>,
auth_token: Option<Arc<http::HeaderValue>>,
options: Option<ConnectOptions>,
) -> Self {
let kv = KvClient::new(channel.clone(), auth_token.clone());
let watch = WatchClient::new(channel.clone(), auth_token.clone());
let lease = LeaseClient::new(channel.clone(), auth_token.clone());
Expand All @@ -187,9 +210,44 @@ impl Client {
maintenance,
cluster,
election,
options,
tx,
}
}

/// Dynamically add an endpoint to the client.
///
/// Which can be used to add a new member to the underlying balance cache.
/// The typical scenario is that application can use a services discovery
/// to discover the member list changes and add/remove them to/from the client.
///
/// Note that the [`Client`] doesn't check the authentication before added.
/// So the etcd member of the added endpoint REQUIRES to use the same auth
/// token as when create the client. Otherwise, the underlying balance
/// services will not be able to connect to the new endpoint.
#[inline]
pub async fn add_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
let endpoint = Self::build_endpoint(endpoint.as_ref(), &self.options)?;
let tx = &self.tx;
tx.send(Change::Insert(endpoint.uri().clone(), endpoint))
.await
.map_err(|e| Error::EndpointError(format!("failed to add endpoint because of {}", e)))
}

/// Dynamically remove an endpoint from the client.
///
/// Note that the `endpoint` str should be the same as it was added.
/// And the underlying balance services cache used the hash from the Uri,
/// which was parsed from `endpoint` str, to do the equality comparisons.
#[inline]
pub async fn remove_endpoint<E: AsRef<str>>(&self, endpoint: E) -> Result<()> {
let uri = http::Uri::from_str(endpoint.as_ref())?;
let tx = &self.tx;
tx.send(Change::Remove(uri)).await.map_err(|e| {
Error::EndpointError(format!("failed to remove endpoint because of {}", e))
})
}

/// Gets a KV client.
#[inline]
pub fn kv_client(&self) -> KvClient {
Expand Down Expand Up @@ -696,9 +754,11 @@ mod tests {
use super::*;
use crate::{Compare, CompareOp, EventType, PermissionType, TxnOp, TxnOpResponse};

const DEFAULT_TEST_ENDPOINT: &str = "localhost:2379";

/// Get client for testing.
async fn get_client() -> Result<Client> {
Client::connect(["localhost:2379"], None).await
Client::connect([DEFAULT_TEST_ENDPOINT], None).await
}

#[tokio::test]
Expand Down Expand Up @@ -1431,4 +1491,37 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_remove_and_add_endpoint() -> Result<()> {
let mut client = get_client().await?;
client.put("endpoint", "add_remove", None).await?;

// get key
{
let resp = client.get("endpoint", None).await?;
assert_eq!(resp.count(), 1);
assert!(!resp.more());
assert_eq!(resp.kvs().len(), 1);
assert_eq!(resp.kvs()[0].key(), b"endpoint");
assert_eq!(resp.kvs()[0].value(), b"add_remove");
}

// remove endpoint
client.remove_endpoint(DEFAULT_TEST_ENDPOINT).await?;
// `Client::get` will hang before adding the endpoint back
client.add_endpoint(DEFAULT_TEST_ENDPOINT).await?;

// get key after remove and add endpoint
{
let resp = client.get("endpoint", None).await?;
assert_eq!(resp.count(), 1);
assert!(!resp.more());
assert_eq!(resp.kvs().len(), 1);
assert_eq!(resp.kvs()[0].key(), b"endpoint");
assert_eq!(resp.kvs()[0].value(), b"add_remove");
}

Ok(())
}
}
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub enum Error {

/// Invalid header value
InvalidHeaderValue(http::header::InvalidHeaderValue),

/// Endpoint error
EndpointError(String),
}

impl Display for Error {
Expand All @@ -53,6 +56,7 @@ impl Display for Error {
Error::LeaseKeepAliveError(e) => write!(f, "lease keep alive error: {}", e),
Error::ElectError(e) => write!(f, "election error: {}", e),
Error::InvalidHeaderValue(e) => write!(f, "invalid metadata value: {}", e),
Error::EndpointError(e) => write!(f, "endpoint error: {}", e),
}
}
}
Expand Down

0 comments on commit 050257a

Please sign in to comment.