Skip to content

Commit

Permalink
another ICE
Browse files Browse the repository at this point in the history
  • Loading branch information
znx3p0 committed Jan 16, 2022
1 parent 27f1892 commit 1920c7e
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 34 deletions.
3 changes: 2 additions & 1 deletion canary/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![forbid(unsafe_code)]
#![forbid(missing_docs)]
#![deny(missing_docs)]

//! # Canary
//! Canary is a library for making communication through the network easy.
Expand Down Expand Up @@ -36,3 +36,4 @@ pub use serde::{Deserialize, Serialize};
pub use igcp::Result;
pub use providers::Addr;
pub use providers::ServiceAddr;
pub use routes::Ctx;
267 changes: 238 additions & 29 deletions canary/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use compact_str::CompactStr;
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::sync::Arc;
use std::sync::{Arc, Weak};

use camino::Utf8Path;
use dashmap::DashMap;
Expand All @@ -14,16 +14,67 @@ use crate::service::{Service, Svc};
use crate::Result;

type RouteKey = CompactStr;
#[derive(Default)]
type InnerRoute = DashMap<RouteKey, Storable>;

/// used for discovering services.
/// it stores services inside with a key and it can introduce channels to services.
pub struct Route(DashMap<RouteKey, Storable>);
pub enum Route {
Owned(InnerRoute),
Static(&'static InnerRoute),
Dynamic(Weak<InnerRoute>)
}

enum Storable {
Route(Arc<Route>),
Route(Route),
Service(Svc),
}

/// context associated with a service
pub struct Ctx {
top_route: RouteRef,
}

impl Ctx {
fn new(top_route: RouteRef) -> Self {
Ctx {
top_route
}
}
}

impl std::ops::Deref for Ctx {
type Target = Route;

fn deref(&self) -> &Self::Target {
self.top_route.deref()
}
}

enum RouteRef {
Static(&'static Route), // global
Dynamic(Arc<Route>), // arc cannot outlive due to tree structure
}

impl RouteRef {
fn new_static(route: &'static Route) -> Self {
RouteRef::Static(route)
}
fn new_dynamic(route: impl Into<Arc<Route>>) -> Self {
RouteRef::Dynamic(route.into())
}
}

impl std::ops::Deref for RouteRef {
type Target = Route;

fn deref(&self) -> &Self::Target {
match self {
RouteRef::Static(route) => route,
RouteRef::Dynamic(route) => route,
}
}
}

/// has an endpoint at which a type should be registered
pub trait RegisterEndpoint {
/// inner endpoint
Expand Down Expand Up @@ -52,6 +103,22 @@ pub enum Status {
/// global route on which initial services are laid on
pub static GLOBAL_ROUTE: Lazy<Route> = Lazy::new(Default::default);

trait Context {
fn context(self) -> Ctx;
}

impl Context for &'static Route {
fn context(self) -> Ctx {
Ctx::new(RouteRef::new_static(self))
}
}

impl Context for Arc<Route> {
fn context(self) -> Ctx {
Ctx::new(RouteRef::new_dynamic(self))
}
}

impl Route {
/// adds a service at a specific id to the route
/// ```norun
Expand Down Expand Up @@ -169,50 +236,115 @@ impl Route {
pub fn register<T: Register>(&self, meta: T::Meta) -> Result<()> {
T::register(self, meta)
}

fn static_switch(
&'static self,
id: impl AsRef<Utf8Path>,
chan: impl Into<BareChannel>,
) -> ::core::result::Result<(), (igcp::Error, BareChannel)> {
let mut id = id.as_ref().into_iter();
let chan = chan.into();
let first = match id.next() {
Some(id) => id,
None => return Err((err!(invalid_data, "service name is empty"), chan))?,
};
let value = match self.0.get(first) {
Some(id) => id,
None => {
return Err((
err!(invalid_data, format!("service `{:?}` not found", id)),
chan,
))?
}
};
let ctx = self.context_static();
match value.value() {
Storable::Route(r) => {
let mut map = r.clone();
loop {
let next = match id.next() {
Some(id) => id,
None => {
return Err((
err!(not_found, format!("service `{:?}` not found", id)),
chan,
))
}
};
let next_map = {
let val = match map.0.get(next) {
Some(val) => val,
None => {
return Err((
err!(not_found, format!("service `{:?}` not found", id)),
chan,
))
}
};
match val.value() {
Storable::Route(r) => r.clone(),
Storable::Service(f) => {
f(chan, ctx);
return Ok(());
}
}
};
map = next_map;
}
}
Storable::Service(f) => {
f(chan, ctx);
Ok(())
}
}
}

// all next are used for the routing system

pub(crate) fn introduce_static(&'static self, c: BareChannel) {
let mut c: Channel = c.into();
spawn(async move {
let id = match c.rx::<RouteKey>().await {
let id = match c.receive::<RouteKey>().await {
Ok(s) => s,
Err(e) => {
tracing::error!("found error receiving id of service: {:?}", &e);
err!((other, e))?
}
};
self.introduce_service(id.as_ref(), c.bare()).await?;
self.introduce_service_static(id.as_ref(), c.bare()).await?;
Ok::<_, igcp::Error>(())
});
}

pub(crate) async fn introduce_static_unspawn(&'static self, c: BareChannel) -> Result<()> {
let mut c: Channel = c.into();
let id = match c.rx::<RouteKey>().await {
let id = match c.receive::<RouteKey>().await {
Ok(s) => s,
Err(e) => {
tracing::error!("found error receiving id of service: {:?}", &e);
err!((other, e))?
}
};
self.introduce_service(id.as_ref(), c.bare()).await?;
self.introduce_service_static(id.as_ref(), c.bare()).await?;
Ok(())
}

pub(crate) async fn introduce_service(
&self,
pub(crate) async fn introduce_service_static(
&'static self,
id: impl AsRef<Utf8Path>,
bare: BareChannel,
) -> Result<()> {
let id = id.as_ref();
if let Err((e, c)) = self.__introduce_inner(id, bare).await {
if let Err((e, c)) = self.__introduce_inner_static(id, bare).await {
let mut chan: Channel = c.into();
chan.tx(Status::NotFound).await?;
chan.send(Status::NotFound).await?;
err!((e))?
}
Ok(())
}

async fn __introduce_inner(
&self,
async fn __introduce_inner_static(
&'static self,
id: impl AsRef<Utf8Path>,
chan: BareChannel,
) -> ::core::result::Result<(), (igcp::Error, BareChannel)> {
Expand All @@ -230,6 +362,7 @@ impl Route {
))?
}
};
let ctx = self.context_static();
match value.value() {
Storable::Route(r) => {
let mut map = r.clone();
Expand Down Expand Up @@ -258,7 +391,7 @@ impl Route {
Storable::Service(f) => {
let mut chan: Channel = chan.into();
chan.tx(Status::Found).await.ok();
f(chan.bare());
f(chan.bare(), ctx);
return Ok(());
}
}
Expand All @@ -269,22 +402,98 @@ impl Route {
Storable::Service(f) => {
let mut chan: Channel = chan.into();
chan.tx(Status::Found).await.ok();
f(chan.bare());
f(chan.bare(), ctx);
Ok(())
}
}
}
/// should only be used whenever debugging
pub fn show(&self) {
for i in &self.0 {
match i.value() {
&Storable::Route(_) => {
tracing::info!("Route({:?})", i.key());
}
&Storable::Service(_) => {
tracing::info!("Service({:?})", i.key());
}
}
}
}
// pub(crate) async fn introduce_service(
// &self,
// id: impl AsRef<Utf8Path>,
// bare: BareChannel,
// ) -> Result<()> {
// let id = id.as_ref();
// if let Err((e, c)) = self.__introduce_inner(id, bare).await {
// let mut chan: Channel = c.into();
// chan.send(Status::NotFound).await?;
// err!((e))?
// }
// Ok(())
// }
// pub(crate) async fn introduce_service(
// &self,
// id: impl AsRef<Utf8Path>,
// bare: BareChannel,
// ) -> Result<()> {
// let id = id.as_ref();
// if let Err((e, c)) = self.__introduce_inner(id, bare).await {
// let mut chan: Channel = c.into();
// chan.send(Status::NotFound).await?;
// err!((e))?
// }
// Ok(())
// }
// async fn __introduce_inner(
// &self,
// id: impl AsRef<Utf8Path>,
// chan: BareChannel,
// ) -> ::core::result::Result<(), (igcp::Error, BareChannel)> {
// let mut id = id.as_ref().into_iter();
// let first = match id.next() {
// Some(id) => id,
// None => return Err((err!(invalid_data, "service name is empty"), chan))?,
// };
// let value = match self.0.get(first) {
// Some(id) => id,
// None => {
// return Err((
// err!(invalid_data, format!("service `{:?}` not found", id)),
// chan,
// ))?
// }
// };
// match value.value() {
// Storable::Route(r) => {
// let mut map = r.clone();
// loop {
// let next = match id.next() {
// Some(id) => id,
// None => {
// return Err((
// err!(not_found, format!("service `{:?}` not found", id)),
// chan,
// ))
// }
// };
// let next_map = {
// let val = match map.0.get(next) {
// Some(val) => val,
// None => {
// return Err((
// err!(not_found, format!("service `{:?}` not found", id)),
// chan,
// ))
// }
// };
// match val.value() {
// Storable::Route(r) => r.clone(),
// Storable::Service(f) => {
// let mut chan: Channel = chan.into();
// chan.tx(Status::Found).await.ok();
// f(chan.bare());
// return Ok(());
// }
// }
// };
// map = next_map;
// }
// }
// Storable::Service(f) => {
// let mut chan: Channel = chan.into();
// chan.tx(Status::Found).await.ok();
// f(chan.bare());
// Ok(())
// }
// }
// }
}
Loading

0 comments on commit 1920c7e

Please sign in to comment.