Skip to content

Commit

Permalink
Merge pull request #845 from golemfactory/wiezzel/custom_recv_addr
Browse files Browse the repository at this point in the history
 Payment service: allow receiving funds without account init
  • Loading branch information
Wiezzel authored Dec 7, 2020
2 parents ea893c1 + 88a3fe1 commit 60ecce9
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 73 deletions.
42 changes: 31 additions & 11 deletions core/model/src/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@ pub mod local {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
#[error("")]
pub struct NoError {} // This is needed because () doesn't implement Display

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegisterDriver {
pub driver_name: String,
pub platform: String,
pub recv_init_required: bool, // Is account initialization required for receiving payments
}

impl RpcMessage for RegisterDriver {
const ID: &'static str = "RegisterDriver";
type Item = ();
type Error = NoError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct UnregisterDriver(pub String);

impl RpcMessage for UnregisterDriver {
const ID: &'static str = "UnregisterDriver";
type Item = ();
type Error = NoError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RegisterAccount {
pub platform: String,
Expand All @@ -135,8 +161,10 @@ pub mod local {

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum RegisterAccountError {
#[error("Account already registered")]
AlreadyRegistered,
#[error("Account already registered: address={0}, driver={1}")]
AlreadyRegistered(String, String),
#[error("Driver not registered: {0}")]
DriverNotRegistered(String),
#[error("Error while registering account: {0}")]
Other(String),
}
Expand All @@ -153,18 +181,10 @@ pub mod local {
pub address: String,
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum UnregisterAccountError {
#[error("Account not registered")]
NotRegistered,
#[error("Error while unregistering account: {0}")]
Other(String),
}

impl RpcMessage for UnregisterAccount {
const ID: &'static str = "UnregisterAccount";
type Item = ();
type Error = UnregisterAccountError;
type Error = NoError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
38 changes: 25 additions & 13 deletions core/payment-driver/base/src/bus.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/*
Collection of interactions a PaymendDriver can have with ya_service_bus
Collection of interactions a PaymentDriver can have with ya_service_bus
All interactions with the bus from the driver should go through this mod.
*/

// Extrernal crates
// External crates
use std::sync::Arc;

// Workspace uses
Expand All @@ -23,8 +23,11 @@ use ya_service_bus::{
use crate::dao::DbExecutor;
use crate::driver::PaymentDriver;

pub async fn bind_service<Driver: PaymentDriver + 'static>(db: &DbExecutor, driver: Arc<Driver>) {
log::debug!("Binding payment driver service to service bus");
pub async fn bind_service<Driver: PaymentDriver + 'static>(
db: &DbExecutor,
driver: Arc<Driver>,
) -> anyhow::Result<()> {
log::debug!("Binding payment driver service to service bus...");
let bus_id = driver_bus_id(driver.get_name());

/* Short variable names explained:
Expand All @@ -33,8 +36,8 @@ pub async fn bind_service<Driver: PaymentDriver + 'static>(db: &DbExecutor, driv
c = caller
m = message
*/
#[rustfmt::skip] // Keep move's neatly alligned
ServiceBinder::new(&bus_id, db, driver)
#[rustfmt::skip] // Keep move's neatly aligned
ServiceBinder::new(&bus_id, db, driver.clone())
.bind_with_processor(
move |db, dr, c, m| async move { dr.init(db, c, m).await }
)
Expand All @@ -57,14 +60,23 @@ pub async fn bind_service<Driver: PaymentDriver + 'static>(db: &DbExecutor, driv
move |db, dr, c, m| async move { dr.validate_allocation(db, c, m).await }
);

log::debug!("Successfully bound payment driver service to service bus");
log::debug!("Subscribing to identity events");
log::debug!("Successfully bound payment driver service to service bus.");

log::debug!("Subscribing to identity events...");
let message = identity::Subscribe { endpoint: bus_id };
let result = service(identity::BUS_ID).send(message).await;
match result {
Err(e) => log::error!("init app-key listener error: {}", e),
_ => log::debug!("Successfully subscribed payment driver service to identity events"),
}
service(identity::BUS_ID).send(message).await??;
log::debug!("Successfully subscribed payment driver service to identity events.");

log::debug!("Registering driver in payment service...");
let message = payment_srv::RegisterDriver {
driver_name: driver.get_name(),
platform: driver.get_platform(),
recv_init_required: driver.recv_init_required(),
};
service(payment_srv::BUS_ID).send(message).await?.unwrap(); // Unwrap on purpose because it's NoError
log::debug!("Successfully registered driver in payment service.");

Ok(())
}

pub async fn list_unlocked_identities() -> Result<Vec<NodeId>, GenericError> {
Expand Down
1 change: 1 addition & 0 deletions core/payment-driver/base/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub trait PaymentDriver {
// used by bus to bind service
fn get_name(&self) -> String;
fn get_platform(&self) -> String;
fn recv_init_required(&self) -> bool;

async fn get_transaction_balance(
&self,
Expand Down
1 change: 1 addition & 0 deletions core/payment-driver/dummy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct PaymentDriverService;
impl PaymentDriverService {
pub async fn gsb<Context>(_context: &Context) -> anyhow::Result<()> {
self::service::bind_service();
self::service::register_in_payment_service().await?;
Ok(())
}
}
14 changes: 14 additions & 0 deletions core/payment-driver/dummy/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::str::FromStr;
use uuid::Uuid;
use ya_core_model::driver::*;
use ya_core_model::payment::local as payment_srv;
use ya_service_bus::typed::service;
use ya_service_bus::{typed as bus, RpcEndpoint};

pub fn bind_service() {
Expand All @@ -22,6 +23,19 @@ pub fn bind_service() {
log::debug!("Successfully bound payment driver service to service bus");
}

pub async fn register_in_payment_service() -> anyhow::Result<()> {
log::debug!("Registering driver in payment service...");
let message = payment_srv::RegisterDriver {
driver_name: DRIVER_NAME.to_string(),
platform: PLATFORM_NAME.to_string(),
recv_init_required: false,
};
service(payment_srv::BUS_ID).send(message).await?.unwrap(); // Unwrap on purpose because it's NoError
log::debug!("Successfully registered driver in payment service.");

Ok(())
}

async fn init(_db: (), _caller: String, msg: Init) -> Result<Ack, GenericError> {
log::info!("init: {:?}", msg);

Expand Down
3 changes: 2 additions & 1 deletion core/payment-driver/gnt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ impl PaymentDriverService {
let driver = GntDriver::new(db.clone()).await?;
let processor = GNTDriverProcessor::new(driver);
self::service::bind_service(&db, processor);
self::service::subscribe_to_identity_events().await;
self::service::subscribe_to_identity_events().await?;
self::service::register_in_payment_service().await?;
Ok(())
}
}
Expand Down
27 changes: 20 additions & 7 deletions core/payment-driver/gnt/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::processor::GNTDriverProcessor;
use crate::DRIVER_NAME;
use crate::{DRIVER_NAME, PLATFORM_NAME};
use bigdecimal::BigDecimal;
use ya_core_model::driver::*;
use ya_core_model::payment::local as payment_srv;
use ya_persistence::executor::DbExecutor;
use ya_service_bus::typed::service;
use ya_service_bus::{typed as bus, RpcEndpoint};

pub fn bind_service(db: &DbExecutor, processor: GNTDriverProcessor) {
Expand All @@ -20,15 +22,26 @@ pub fn bind_service(db: &DbExecutor, processor: GNTDriverProcessor) {
log::debug!("Successfully bound payment driver service to service bus");
}

pub async fn subscribe_to_identity_events() {
if let Err(e) = bus::service(ya_core_model::identity::BUS_ID)
pub async fn subscribe_to_identity_events() -> anyhow::Result<()> {
bus::service(ya_core_model::identity::BUS_ID)
.send(ya_core_model::identity::Subscribe {
endpoint: driver_bus_id(DRIVER_NAME),
})
.await
{
log::error!("init app-key listener error: {}", e)
}
.await??;
Ok(())
}

pub async fn register_in_payment_service() -> anyhow::Result<()> {
log::debug!("Registering driver in payment service...");
let message = payment_srv::RegisterDriver {
driver_name: DRIVER_NAME.to_string(),
platform: PLATFORM_NAME.to_string(),
recv_init_required: false,
};
service(payment_srv::BUS_ID).send(message).await?.unwrap(); // Unwrap on purpose because it's NoError
log::debug!("Successfully registered driver in payment service.");

Ok(())
}

async fn init(
Expand Down
4 changes: 4 additions & 0 deletions core/payment-driver/zksync/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ impl PaymentDriver for ZksyncDriver {
PLATFORM_NAME.to_string()
}

fn recv_init_required(&self) -> bool {
false
}

async fn get_transaction_balance(
&self,
_db: DbExecutor,
Expand Down
2 changes: 1 addition & 1 deletion core/payment-driver/zksync/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl ZksyncService {
let driver = ZksyncDriver::new(db.clone());
driver.load_active_accounts().await;
let driver_rc = Arc::new(driver);
bus::bind_service(&db, driver_rc.clone()).await;
bus::bind_service(&db, driver_rc.clone()).await?;
log::debug!("Driver loaded");

// Start cron
Expand Down
8 changes: 4 additions & 4 deletions core/payment/examples/payment_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ async fn main() -> anyhow::Result<()> {
ya_sb_router::bind_gsb_router(None).await?;
log::debug!("bind_gsb_router()");

let processor = PaymentProcessor::new(db.clone());
ya_payment::service::bind_service(&db, processor);
log::debug!("bind_service()");

let (driver_name, platform) = match args.driver {
Driver::Dummy => {
start_dummy_driver().await?;
Expand All @@ -210,10 +214,6 @@ async fn main() -> anyhow::Result<()> {
}
};

let processor = PaymentProcessor::new(db.clone());
ya_payment::service::bind_service(&db, processor);
log::debug!("bind_service()");

bus::service(driver_bus_id(driver_name))
.call(Init::new(provider_id.clone(), AccountMode::RECV))
.await??;
Expand Down
Loading

0 comments on commit 60ecce9

Please sign in to comment.