Skip to content

Commit

Permalink
Redefine ExcService and add SendExcService (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nouzan authored Dec 4, 2023
1 parent cb1963f commit 3f08ff1
Show file tree
Hide file tree
Showing 26 changed files with 410 additions and 81 deletions.
7 changes: 5 additions & 2 deletions examples/examples/box_exc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{ops::RangeBounds, str::FromStr, time::Duration};

use clap::Parser;
use exc::{
core::types::{Candle, SubscribeTickers, SubscribeTrades, Ticker, Trade},
core::{
types::{Candle, SubscribeTickers, SubscribeTrades, Ticker, Trade},
IntoService,
},
prelude::*,
types::instrument::InstrumentMeta,
util::instrument::PollInstrumentsLayer,
Expand Down Expand Up @@ -48,7 +51,7 @@ impl Exchange {
let trade_svc =
exc::Okx::endpoint()
.connect_exc()
.into_layered(&layer_fn(|svc: exc::Okx| {
.into_layered(&layer_fn(|svc: IntoService<_, _>| {
svc
// We use the `adapt` method to convert the request type to `SubscribeTickers`.
.adapt::<SubscribeTickers>()
Expand Down
11 changes: 6 additions & 5 deletions exc-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
#![deny(missing_docs)]

/// Core services.
pub use exc_service as service;

/// The definition of an exchange.
pub mod exchange;

Expand All @@ -24,6 +21,10 @@ pub mod util;
/// Exc Symbol.
pub use exc_symbol as symbol;

pub use self::service::{Adaptor, Exc, ExcLayer, ExcService, ExcServiceExt, IntoExc, Request};
pub use exc_service::{error::InstrumentError, ExchangeError};
pub use self::service::{
traits::{AsService, IntoService},
Adaptor, Exc, ExcLayer, ExcService, ExcServiceExt, IntoExc, Request,
};
pub use exc_service::{self as service, error::InstrumentError, ExchangeError, SendExcService};

pub use positions::prelude::{Asset, Instrument, ParseAssetError, ParseSymbolError, Str, Symbol};
11 changes: 6 additions & 5 deletions exc-core/src/util/fetch_candles.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_stream::try_stream;
use exc_service::ExcServiceExt;
use futures::{future::BoxFuture, FutureExt, StreamExt};
use std::ops::Bound;
use tower::{Layer, Service, ServiceExt};
Expand All @@ -8,7 +9,7 @@ use crate::{
candle::{CandleStream, QueryCandles, QueryLastCandles},
QueryFirstCandles,
},
ExcService, ExchangeError,
ExcService, ExchangeError, IntoService,
};

use std::num::NonZeroUsize;
Expand Down Expand Up @@ -50,7 +51,7 @@ where

fn layer(&self, inner: S) -> Self::Service {
FetchCandlesBackward {
svc: Buffer::new(inner, self.bound),
svc: Buffer::new(inner.into_service(), self.bound),
limit: self.limit,
}
}
Expand All @@ -61,7 +62,7 @@ pub struct FetchCandlesBackward<S>
where
S: ExcService<QueryLastCandles> + 'static,
{
svc: Buffer<S, QueryLastCandles>,
svc: Buffer<IntoService<S, QueryLastCandles>, QueryLastCandles>,
limit: NonZeroUsize,
}

Expand Down Expand Up @@ -145,7 +146,7 @@ where

fn layer(&self, inner: S) -> Self::Service {
FetchCandlesForward {
svc: Buffer::new(inner, self.bound),
svc: Buffer::new(inner.into_service(), self.bound),
limit: self.limit,
}
}
Expand All @@ -156,7 +157,7 @@ pub struct FetchCandlesForward<S>
where
S: ExcService<QueryFirstCandles> + 'static,
{
svc: Buffer<S, QueryFirstCandles>,
svc: Buffer<IntoService<S, QueryFirstCandles>, QueryFirstCandles>,
limit: NonZeroUsize,
}

Expand Down
9 changes: 7 additions & 2 deletions exc-core/src/util/poll_instruments.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use async_stream::stream;
use exc_service::{ExcService, ExchangeError};
use exc_service::{ExcService, ExcServiceExt, ExchangeError};
use exc_types::{FetchInstruments, SubscribeInstruments};
use futures::{
future::{ready, Ready},
Expand Down Expand Up @@ -46,7 +46,12 @@ where
interval.tick().await;
}
};
let stream = self.inner.clone().call_all(req).try_flatten();
let stream = self
.inner
.clone()
.into_service()
.call_all(req)
.try_flatten();
ready(Ok(stream.boxed()))
}
}
Expand Down
8 changes: 4 additions & 4 deletions exc-core/src/util/trade_bid_ask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@ where
S: Clone + Send + 'static,
S: ExcService<SubscribeTrades>,
S: ExcService<SubscribeBidAsk>,
<S as Service<SubscribeTrades>>::Future: Send,
<S as Service<SubscribeBidAsk>>::Future: Send,
<S as ExcService<SubscribeTrades>>::Future: Send,
<S as ExcService<SubscribeBidAsk>>::Future: Send,
{
type Response = TickerStream;
type Error = ExchangeError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::<SubscribeTrades>::poll_ready(&mut self.svc, cx)
Service::<SubscribeTrades>::poll_ready(&mut self.svc.as_service(), cx)
}

fn call(&mut self, req: SubscribeTickers) -> Self::Future {
let trade = Service::<SubscribeTrades>::call(
&mut self.svc,
&mut self.svc.as_service(),
SubscribeTrades {
instrument: req.instrument.clone(),
},
Expand Down
2 changes: 1 addition & 1 deletion exc-make/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description.workspace = true
rust-version.workspace = true

[dependencies]
exc-service = { workspace = true }
exc-service = { workspace = true, features = ["send"] }
exc-types = { workspace = true }
futures = { workspace = true }
tower-make = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions exc-make/src/cancel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use exc_service::{ExcService, ExchangeError, Request};
use exc_service::{ExchangeError, Request, SendExcService};
use exc_types::CancelOrder;
use futures::{future::MapErr, TryFutureExt};
use std::{
Expand All @@ -14,7 +14,7 @@ pub struct MakeCancelOrderOptions {}
/// Make a service to cancel orders.
pub trait MakeCancelOrder {
/// Service to cancel orders.
type Service: ExcService<CancelOrder>;
type Service: SendExcService<CancelOrder>;

/// The future of the service.
type Future: Future<Output = Result<Self::Service, ExchangeError>>;
Expand Down Expand Up @@ -42,7 +42,7 @@ where
Response = <CancelOrder as Request>::Response,
Error = ExchangeError,
>,
M::Service: ExcService<CancelOrder>,
M::Service: SendExcService<CancelOrder>,
M::MakeError: Into<ExchangeError>,
{
type Service = M::Service;
Expand Down
6 changes: 3 additions & 3 deletions exc-make/src/candles.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use exc_service::{ExcService, ExchangeError, Request};
use exc_service::{ExchangeError, Request, SendExcService};
use exc_types::QueryCandles;
use futures::{future::MapErr, TryFutureExt};
use std::{
Expand All @@ -19,7 +19,7 @@ pub struct MakeFetchCandlesOptions {
/// Make a service to subscribe instruments.
pub trait MakeFetchCandles {
/// Service to fetch candles.
type Service: ExcService<QueryCandles>;
type Service: SendExcService<QueryCandles>;

/// The future of the service.
type Future: Future<Output = Result<Self::Service, ExchangeError>>;
Expand Down Expand Up @@ -47,7 +47,7 @@ where
Response = <QueryCandles as Request>::Response,
Error = ExchangeError,
>,
M::Service: ExcService<QueryCandles>,
M::Service: SendExcService<QueryCandles>,
M::MakeError: Into<ExchangeError>,
{
type Service = M::Service;
Expand Down
6 changes: 3 additions & 3 deletions exc-make/src/check.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use exc_service::{ExcService, ExchangeError, Request};
use exc_service::{ExchangeError, Request, SendExcService};
use exc_types::GetOrder;
use futures::{future::MapErr, TryFutureExt};
use std::{
Expand All @@ -14,7 +14,7 @@ pub struct MakeCheckOrderOptions {}
/// Make a service to check orders.
pub trait MakeCheckOrder {
/// Service to check orders.
type Service: ExcService<GetOrder>;
type Service: SendExcService<GetOrder>;

/// The future of the service.
type Future: Future<Output = Result<Self::Service, ExchangeError>>;
Expand Down Expand Up @@ -42,7 +42,7 @@ where
Response = <GetOrder as Request>::Response,
Error = ExchangeError,
>,
M::Service: ExcService<GetOrder>,
M::Service: SendExcService<GetOrder>,
M::MakeError: Into<ExchangeError>,
{
type Service = M::Service;
Expand Down
6 changes: 3 additions & 3 deletions exc-make/src/instruments.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use exc_service::{ExcService, ExchangeError, Request};
use exc_service::{ExchangeError, Request, SendExcService};
use exc_types::SubscribeInstruments;
use futures::{future::MapErr, TryFutureExt};
use std::{
Expand All @@ -14,7 +14,7 @@ pub struct MakeInstrumentsOptions {}
/// Make a service to subscribe instruments.
pub trait MakeInstruments {
/// Service to subscribe instruments.
type Service: ExcService<SubscribeInstruments>;
type Service: SendExcService<SubscribeInstruments>;

/// The future of the service.
type Future: Future<Output = Result<Self::Service, ExchangeError>>;
Expand Down Expand Up @@ -42,7 +42,7 @@ where
Response = <SubscribeInstruments as Request>::Response,
Error = ExchangeError,
>,
M::Service: ExcService<SubscribeInstruments>,
M::Service: SendExcService<SubscribeInstruments>,
M::MakeError: Into<ExchangeError>,
{
type Service = M::Service;
Expand Down
6 changes: 3 additions & 3 deletions exc-make/src/orders.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use exc_service::{ExcService, ExchangeError, Request};
use exc_service::{ExchangeError, Request, SendExcService};
use exc_types::SubscribeOrders;
use futures::{future::MapErr, TryFutureExt};
use std::{
Expand All @@ -14,7 +14,7 @@ pub struct MakeSubscribeOrdersOptions {}
/// Make a service to subscribe orders.
pub trait MakeSubscribeOrders {
/// Service to subscribe orders.
type Service: ExcService<SubscribeOrders>;
type Service: SendExcService<SubscribeOrders>;

/// The future of the service.
type Future: Future<Output = Result<Self::Service, ExchangeError>>;
Expand Down Expand Up @@ -42,7 +42,7 @@ where
Response = <SubscribeOrders as Request>::Response,
Error = ExchangeError,
>,
M::Service: ExcService<SubscribeOrders>,
M::Service: SendExcService<SubscribeOrders>,
M::MakeError: Into<ExchangeError>,
{
type Service = M::Service;
Expand Down
6 changes: 3 additions & 3 deletions exc-make/src/place.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use exc_service::{ExcService, ExchangeError, Request};
use exc_service::{ExchangeError, Request, SendExcService};
use exc_types::PlaceOrder;
use futures::{future::MapErr, TryFutureExt};
use std::{
Expand All @@ -14,7 +14,7 @@ pub struct MakePlaceOrderOptions {}
/// Make a service to place orders.
pub trait MakePlaceOrder {
/// Service to place orders.
type Service: ExcService<PlaceOrder>;
type Service: SendExcService<PlaceOrder>;

/// The future of the service.
type Future: Future<Output = Result<Self::Service, ExchangeError>>;
Expand Down Expand Up @@ -42,7 +42,7 @@ where
Response = <PlaceOrder as Request>::Response,
Error = ExchangeError,
>,
M::Service: ExcService<PlaceOrder>,
M::Service: SendExcService<PlaceOrder>,
M::MakeError: Into<ExchangeError>,
{
type Service = M::Service;
Expand Down
6 changes: 3 additions & 3 deletions exc-make/src/tickers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use exc_service::{ExcService, ExchangeError, Request};
use exc_service::{ExchangeError, Request, SendExcService};
use exc_types::SubscribeTickers;
use futures::{future::MapErr, TryFutureExt};
use std::{
Expand All @@ -14,7 +14,7 @@ pub struct MakeTickersOptions {}
/// Make a service to subscribe tickers.
pub trait MakeTickers {
/// Service to subscribe tickers.
type Service: ExcService<SubscribeTickers>;
type Service: SendExcService<SubscribeTickers>;

/// The future of the service.
type Future: Future<Output = Result<Self::Service, ExchangeError>>;
Expand Down Expand Up @@ -42,7 +42,7 @@ where
Response = <SubscribeTickers as Request>::Response,
Error = ExchangeError,
>,
M::Service: ExcService<SubscribeTickers>,
M::Service: SendExcService<SubscribeTickers>,
M::MakeError: Into<ExchangeError>,
{
type Service = M::Service;
Expand Down
4 changes: 4 additions & 0 deletions exc-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ retry = ["tower/retry", "humantime", "tokio/time", "tracing"]
limit = ["tower/limit"]
http = ["hyper"]

# Add [`SendExcSerivce`] which is a [`ExcService`] that is `Send`.
# as a workaround for https://github.com/rust-lang/rust/issues/20671
send = []

[dependencies]
tower = { workspace = true, default-features = false, features = ["util"] }
futures = { workspace = true }
Expand Down
Loading

0 comments on commit 3f08ff1

Please sign in to comment.