Skip to content

Commit

Permalink
Feat/cluster (#138)
Browse files Browse the repository at this point in the history
* refactor(cluster): add Cluster MockImpl

* refactor(triple): use ClientBuilder to init Cluster ability

* Update builder.rs

update default direct value

* Update triple.rs

handle unused var

* Update mod.rs

comment some codes

* refactor(triple): rm unused var in clientBuilder
  • Loading branch information
yang20150702 authored May 16, 2023
1 parent d2cb383 commit 54181bf
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 58 deletions.
1 change: 1 addition & 0 deletions dubbo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ axum = "0.5.9"
async-stream = "0.3"
flate2 = "1.0"
aws-smithy-http = "0.54.1"
dyn-clone = "1.0.11"
itertools.workspace = true
urlencoding.workspace = true
lazy_static.workspace = true
Expand Down
101 changes: 89 additions & 12 deletions dubbo/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,41 @@
* limitations under the License.
*/

use std::{sync::Arc, task::Poll};
use std::{collections::HashMap, fmt::Debug, sync::Arc, task::Poll};

use aws_smithy_http::body::SdkBody;
use tower_service::Service;
use dubbo_base::Url;

use crate::{empty_body, protocol::BoxInvoker};
use crate::{
empty_body,
invocation::RpcInvocation,
protocol::{BoxInvoker, Invoker},
};

pub mod directory;
pub mod loadbalance;
pub mod support;

pub trait Directory {
fn list(&self, meta: String) -> Vec<BoxInvoker>;
pub trait Directory: Debug {
fn list(&self, invocation: Arc<RpcInvocation>) -> Vec<BoxInvoker>;
fn is_empty(&self) -> bool;
}

type BoxDirectory = Box<dyn Directory>;
type BoxDirectory = Box<dyn Directory + Send + Sync>;

pub trait Cluster {
fn join(&self, dir: BoxDirectory) -> BoxInvoker;
}

#[derive(Debug, Default)]
pub struct MockCluster {}

impl Cluster for MockCluster {
fn join(&self, dir: BoxDirectory) -> BoxInvoker {
Box::new(FailoverCluster::new(dir))
}
}
#[derive(Clone, Debug)]
pub struct FailoverCluster {
dir: Arc<BoxDirectory>,
}
Expand All @@ -43,7 +60,7 @@ impl FailoverCluster {
}
}

impl Service<http::Request<SdkBody>> for FailoverCluster {
impl Invoker<http::Request<SdkBody>> for FailoverCluster {
type Response = http::Response<crate::BoxBody>;

type Error = crate::Error;
Expand All @@ -66,7 +83,11 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
.method(req.method().clone());
*clone_req.headers_mut().unwrap() = req.headers().clone();
let r = clone_req.body(clone_body).unwrap();
let invokers = self.dir.list("service_name".to_string());
let invokers = self.dir.list(
RpcInvocation::default()
.with_service_unique_name("hello".to_string())
.into(),
);
for mut invoker in invokers {
let fut = async move {
let res = invoker.call(r).await;
Expand All @@ -83,19 +104,75 @@ impl Service<http::Request<SdkBody>> for FailoverCluster {
.unwrap())
})
}

fn get_url(&self) -> dubbo_base::Url {
Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap()
}
}

#[derive(Debug, Default)]
pub struct MockDirectory {
// router_chain: RouterChain,
invokers: Vec<BoxInvoker>,
}

pub struct MockDirectory {}
impl MockDirectory {
pub fn new(invokers: Vec<BoxInvoker>) -> MockDirectory {
Self {
// router_chain: RouterChain::default(),
invokers,
}
}
}

impl Directory for MockDirectory {
fn list(&self, _meta: String) -> Vec<BoxInvoker> {
fn list(&self, _invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
// tracing::info!("MockDirectory: {}", meta);
// let u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
let _u = Url::from_url("triple://127.0.0.1:8888/helloworld.Greeter").unwrap();
// vec![Box::new(TripleInvoker::new(u))]
todo!()
// self.router_chain.route(u, invo);
self.invokers.clone()
}

fn is_empty(&self) -> bool {
false
}
}

#[derive(Debug, Default)]
pub struct RouterChain {
router: HashMap<String, BoxRouter>,
invokers: Vec<BoxInvoker>,
}

impl RouterChain {
pub fn route(&self, url: Url, invo: Arc<RpcInvocation>) -> Vec<BoxInvoker> {
let r = self.router.get("mock").unwrap();
r.route(self.invokers.clone(), url, invo)
}
}

pub trait Router: Debug {
fn route(
&self,
invokers: Vec<BoxInvoker>,
url: Url,
invo: Arc<RpcInvocation>,
) -> Vec<BoxInvoker>;
}

pub type BoxRouter = Box<dyn Router + Sync + Send>;

#[derive(Debug, Default)]
pub struct MockRouter {}

impl Router for MockRouter {
fn route(
&self,
invokers: Vec<BoxInvoker>,
_url: Url,
_invo: Arc<RpcInvocation>,
) -> Vec<BoxInvoker> {
invokers
}
}
12 changes: 11 additions & 1 deletion dubbo/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{

use async_trait::async_trait;
use aws_smithy_http::body::SdkBody;
use dyn_clone::DynClone;
use tower_service::Service;

use dubbo_base::Url;
Expand All @@ -43,7 +44,7 @@ pub trait Exporter {
fn unexport(&self);
}

pub trait Invoker<ReqBody>: Debug {
pub trait Invoker<ReqBody>: Debug + DynClone {
type Response;

type Error;
Expand All @@ -68,6 +69,15 @@ pub type BoxInvoker = Box<
+ Sync,
>;

dyn_clone::clone_trait_object!(
Invoker<
http::Request<SdkBody>,
Response = http::Response<crate::BoxBody>,
Error = crate::Error,
Future = crate::BoxFuture<http::Response<crate::BoxBody>, crate::Error>,
>
);

pub struct WrapperInvoker<T>(T);

impl<T, ReqBody> Service<http::Request<ReqBody>> for WrapperInvoker<T>
Expand Down
26 changes: 17 additions & 9 deletions dubbo/src/protocol/triple/triple_invoker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,32 @@

use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;
use std::fmt::{Debug, Formatter};
use std::{
fmt::{Debug, Formatter},
str::FromStr,
};
use tower_service::Service;

use crate::{protocol::Invoker, triple::client::builder::ClientBoxService};
use crate::{
protocol::Invoker,
triple::{client::builder::ClientBoxService, transport::connection::Connection},
utils::boxed_clone::BoxCloneService,
};

#[derive(Clone)]
pub struct TripleInvoker {
url: Url,
conn: ClientBoxService,
}

impl TripleInvoker {
// pub fn new(url: Url) -> TripleInvoker {
// let uri = http::Uri::from_str(&url.to_url()).unwrap();
// Self {
// url,
// conn: ClientBuilder::from_uri(&uri).build()connect(),
// }
// }
pub fn new(url: Url) -> TripleInvoker {
let uri = http::Uri::from_str(&url.to_url()).unwrap();
Self {
url,
conn: BoxCloneService::new(Connection::new().with_host(uri)),
}
}
}

impl Debug for TripleInvoker {
Expand Down
44 changes: 30 additions & 14 deletions dubbo/src/triple/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,28 @@
*/

use crate::{
cluster::directory::StaticDirectory,
codegen::{ClusterInvoker, Directory, RegistryDirectory},
cluster::{directory::StaticDirectory, Cluster, MockCluster, MockDirectory},
codegen::{ClusterInvoker, Directory, RegistryDirectory, TripleInvoker},
triple::compression::CompressionEncoding,
utils::boxed::BoxService,
utils::boxed_clone::BoxCloneService,
};

use aws_smithy_http::body::SdkBody;
use dubbo_base::Url;

use super::TripleClient;

pub type ClientBoxService =
BoxService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>;
BoxCloneService<http::Request<SdkBody>, http::Response<crate::BoxBody>, crate::Error>;

#[derive(Clone, Debug, Default)]
pub struct ClientBuilder {
pub timeout: Option<u64>,
pub connector: &'static str,
directory: Option<Box<dyn Directory>>,
cluster_invoker: Option<ClusterInvoker>,
pub direct: bool,
host: String,
}

impl ClientBuilder {
Expand All @@ -44,6 +47,8 @@ impl ClientBuilder {
connector: "",
directory: None,
cluster_invoker: None,
direct: false,
host: "".to_string(),
}
}

Expand All @@ -53,15 +58,8 @@ impl ClientBuilder {
connector: "",
directory: Some(Box::new(StaticDirectory::new(&host))),
cluster_invoker: None,
}
}

pub fn from_uri(uri: &http::Uri) -> ClientBuilder {
Self {
timeout: None,
connector: "",
directory: Some(Box::new(StaticDirectory::from_uri(&uri))),
cluster_invoker: None,
direct: true,
host: host.clone().to_string(),
}
}

Expand Down Expand Up @@ -104,11 +102,29 @@ impl ClientBuilder {
}
}

pub fn with_direct(self, direct: bool) -> Self {
Self { direct, ..self }
}

pub fn build(self) -> TripleClient {
TripleClient {
let mut cli = TripleClient {
send_compression_encoding: Some(CompressionEncoding::Gzip),
directory: self.directory,
cluster_invoker: self.cluster_invoker,
invoker: None,
};
if self.direct {
cli.invoker = Some(Box::new(TripleInvoker::new(
Url::from_url(&self.host).unwrap(),
)));
return cli;
}

let cluster = MockCluster::default().join(Box::new(MockDirectory::new(vec![Box::new(
TripleInvoker::new(Url::from_url("http://127.0.0.1:8888").unwrap()),
)])));

cli.invoker = Some(cluster);
cli
}
}
33 changes: 11 additions & 22 deletions dubbo/src/triple/client/triple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::codegen::{ClusterInvoker, Directory, RpcInvocation};
use crate::{
cluster::support::cluster_invoker::ClusterRequestBuilder,
invocation::{IntoStreamingRequest, Metadata, Request, Response},
protocol::BoxInvoker,
triple::{codec::Codec, compression::CompressionEncoding, decode::Decoding, encode::encode},
};

Expand All @@ -38,11 +39,12 @@ pub struct TripleClient {
pub(crate) send_compression_encoding: Option<CompressionEncoding>,
pub(crate) directory: Option<Box<dyn Directory>>,
pub(crate) cluster_invoker: Option<ClusterInvoker>,
pub invoker: Option<BoxInvoker>,
}

impl TripleClient {
pub fn connect(host: String) -> Self {
let builder = ClientBuilder::from_static(&host);
let builder = ClientBuilder::from_static(&host).with_direct(true);

builder.build()
}
Expand Down Expand Up @@ -135,7 +137,7 @@ impl TripleClient {
req: Request<M1>,
mut codec: C,
path: http::uri::PathAndQuery,
invocation: RpcInvocation,
_invocation: RpcInvocation,
) -> Result<Response<M2>, crate::status::Status>
where
C: Codec<Encode = M1, Decode = M2>,
Expand All @@ -150,27 +152,14 @@ impl TripleClient {
)
.into_stream();
let body = hyper::Body::wrap_stream(body_stream);
let sdk_body = SdkBody::from(body);
let arc_invocation = Arc::new(invocation);
let req;
let http_uri;
if self.cluster_invoker.is_some() {
let cluster_invoker = self.cluster_invoker.as_ref().unwrap().clone();
req = cluster_invoker.build_req(self, path, arc_invocation.clone(), sdk_body);
http_uri = req.uri().clone();
} else {
let url_list = self
.directory
.as_ref()
.expect("msg")
.list(arc_invocation.clone());
let real_url = url_list.choose(&mut rand::thread_rng()).expect("msg");
http_uri =
http::Uri::from_str(&format!("http://{}:{}/", real_url.ip, real_url.port)).unwrap();
req = self.map_request(http_uri.clone(), path, sdk_body);
}
let bytes = hyper::body::to_bytes(body).await.unwrap();
let sdk_body = SdkBody::from(bytes);

// let mut conn = Connection::new().with_host(http_uri);
let mut conn = self.invoker.clone().unwrap();
let http_uri = http::Uri::from_str(&conn.get_url().to_url()).unwrap();
let req = self.map_request(http_uri.clone(), path, sdk_body);

let mut conn = Connection::new().with_host(http_uri);
let response = conn
.call(req)
.await
Expand Down

0 comments on commit 54181bf

Please sign in to comment.