Skip to content

Commit

Permalink
Payment driver: ValidateAllocation
Browse files Browse the repository at this point in the history
Added a new GSB endpoint to the payment driver API to validate whether
an allocation could be created (check available funds).

Signed-off-by: Adam Wierzbicki <[email protected]>
  • Loading branch information
Wiezzel committed Nov 16, 2020
1 parent 0c4943c commit 2a7b92b
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 30 deletions.
26 changes: 26 additions & 0 deletions core/model/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use bitflags::bitflags;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use ya_client_model::payment::Allocation;
use ya_service_bus::RpcMessage;

pub fn driver_bus_id<T: Display>(driver_name: T) -> String {
Expand Down Expand Up @@ -210,3 +211,28 @@ impl RpcMessage for GetTransactionBalance {
type Item = BigDecimal;
type Error = GenericError;
}

// ************************** VALIDATE ALLOCATION **************************

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ValidateAllocation {
pub address: String,
pub amount: BigDecimal,
pub existing_allocations: Vec<Allocation>,
}

impl ValidateAllocation {
pub fn new(address: String, amount: BigDecimal, existing: Vec<Allocation>) -> Self {
ValidateAllocation {
address,
amount,
existing_allocations: existing,
}
}
}

impl RpcMessage for ValidateAllocation {
const ID: &'static str = "ValidateAllocation";
type Item = bool;
type Error = GenericError;
}
29 changes: 26 additions & 3 deletions core/model/src/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use ya_service_bus::RpcMessage;

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum RpcMessageError {
#[error("Send error: {0}")]
#[error("{0}")]
Send(#[from] public::SendError),
#[error("Accept/reject error: {0}")]
#[error("{0}")]
AcceptReject(#[from] public::AcceptRejectError),
#[error("Cancel error: {0}")]
#[error("{0}")]
Cancel(#[from] public::CancelError),
#[error("{0}")]
Generic(#[from] local::GenericError),
#[error("{0}")]
ValidateAllocation(#[from] local::ValidateAllocationError),
}

pub mod local {
Expand Down Expand Up @@ -323,6 +325,27 @@ pub mod local {
pub requestor: InvoiceStatusNotes,
pub provider: InvoiceStatusNotes,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ValidateAllocation {
pub platform: String,
pub address: String,
pub amount: BigDecimal,
}

impl RpcMessage for ValidateAllocation {
const ID: &'static str = "ValidateAllocation";
type Item = bool;
type Error = ValidateAllocationError;
}

#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
pub enum ValidateAllocationError {
#[error("Account not registered")]
AccountNotRegistered,
#[error("Error while validating allocation: {0}")]
Other(String),
}
}

pub mod public {
Expand Down
3 changes: 3 additions & 0 deletions core/payment-driver/base/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub async fn bind_service<Driver: PaymentDriver + 'static>(db: &DbExecutor, driv
)
.bind_with_processor(
move |db, dr, c, m| async move { dr.verify_payment(db, c, m).await }
)
.bind_with_processor(
move |db, dr, c, m| async move { dr.validate_allocation(db, c, m).await }
);

log::debug!("Successfully bound payment driver service to service bus");
Expand Down
12 changes: 8 additions & 4 deletions core/payment-driver/base/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@

// Local uses
use crate::dao::DbExecutor;
use crate::model::{
Ack, GenericError, GetAccountBalance, GetTransactionBalance, Init, PaymentDetails,
SchedulePayment, VerifyPayment,
};
use crate::model::*;

// Public revealed uses, required to implement this trait
pub use async_trait::async_trait;
Expand Down Expand Up @@ -61,4 +58,11 @@ pub trait PaymentDriver {
caller: String,
msg: VerifyPayment,
) -> Result<PaymentDetails, GenericError>;

async fn validate_allocation(
&self,
db: DbExecutor,
caller: String,
msg: ValidateAllocation,
) -> Result<bool, GenericError>;
}
11 changes: 10 additions & 1 deletion core/payment-driver/dummy/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub fn bind_service() {
.bind(get_account_balance)
.bind(get_transaction_balance)
.bind(schedule_payment)
.bind(verify_payment);
.bind(verify_payment)
.bind(validate_allocation);

log::debug!("Successfully bound payment driver service to service bus");
}
Expand Down Expand Up @@ -111,3 +112,11 @@ async fn verify_payment(
let details = serde_json::from_str(&json_str).unwrap();
Ok(details)
}

async fn validate_allocation(
_db: (),
_caller: String,
_msg: ValidateAllocation,
) -> Result<bool, GenericError> {
Ok(true)
}
13 changes: 12 additions & 1 deletion core/payment-driver/gnt/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ pub fn bind_service(db: &DbExecutor, processor: GNTDriverProcessor) {
.bind_with_processor(get_account_balance)
.bind_with_processor(get_transaction_balance)
.bind_with_processor(schedule_payment)
.bind_with_processor(verify_payment);
.bind_with_processor(verify_payment)
.bind_with_processor(validate_allocation);

log::debug!("Successfully bound payment driver service to service bus");
}
Expand Down Expand Up @@ -118,6 +119,16 @@ async fn verify_payment(
)
}

async fn validate_allocation(
_db: DbExecutor,
_processor: GNTDriverProcessor,
_caller: String,
_msg: ValidateAllocation,
) -> Result<bool, GenericError> {
log::debug!("Validate allocation: {:?}", _msg);
Ok(true) // TODO: Implement proper check
}

async fn account_event(
_db: DbExecutor,
processor: GNTDriverProcessor,
Expand Down
20 changes: 16 additions & 4 deletions core/payment-driver/zksync/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use ya_payment_driver::{
dao::DbExecutor,
db::models::PaymentEntity,
driver::{async_trait, BigDecimal, IdentityError, IdentityEvent, PaymentDriver},
model::{
Ack, GenericError, GetAccountBalance, GetTransactionBalance, Init, PaymentConfirmation,
PaymentDetails, SchedulePayment, VerifyPayment,
},
model::*,
utils,
};

Expand Down Expand Up @@ -200,6 +197,21 @@ impl PaymentDriver for ZksyncDriver {
// }
from_confirmation(msg.confirmation())
}

async fn validate_allocation(
&self,
_db: DbExecutor,
_caller: String,
msg: ValidateAllocation,
) -> Result<bool, GenericError> {
let account_balance = wallet::account_balance(&msg.address).await?;
let total_allocated_amount: BigDecimal = msg
.existing_allocations
.into_iter()
.map(|allocation| allocation.remaining_amount)
.sum();
Ok(msg.amount <= (account_balance - total_allocated_amount))
}
}

#[async_trait(?Send)]
Expand Down
31 changes: 28 additions & 3 deletions core/payment/src/api/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ use crate::api::*;
use crate::dao::*;
use crate::error::{DbError, Error};
use crate::utils::{listen_for_events, response, with_timeout};
use crate::DEFAULT_PAYMENT_PLATFORM;
use actix_web::web::{delete, get, post, put, Data, Json, Path, Query};
use actix_web::{HttpResponse, Scope};
use metrics::counter;
use serde_json::value::Value::Null;
use ya_agreement_utils::{ClauseOperator, ConstraintKey, Constraints};
use ya_client_model::payment::*;
use ya_core_model::payment::local::{GetAccounts, SchedulePayment, BUS_ID as LOCAL_SERVICE};
use ya_core_model::payment::local::{
GetAccounts, SchedulePayment, ValidateAllocation, ValidateAllocationError,
BUS_ID as LOCAL_SERVICE,
};
use ya_core_model::payment::public::{
AcceptDebitNote, AcceptInvoice, AcceptRejectError, BUS_ID as PUBLIC_SERVICE,
};
Expand Down Expand Up @@ -365,12 +369,33 @@ async fn create_allocation(
id: Identity,
) -> HttpResponse {
// TODO: Handle deposits & timeouts
// TODO: Check available funds
let allocation = body.into_inner();
let node_id = id.identity;
let payment_platform = allocation
.payment_platform
.clone()
.unwrap_or(DEFAULT_PAYMENT_PLATFORM.to_string());
let address = allocation.address.clone().unwrap_or(node_id.to_string());

let validate_msg = ValidateAllocation {
platform: payment_platform.clone(),
address: address.clone(),
amount: allocation.total_amount.clone(),
};
match async move { Ok(bus::service(LOCAL_SERVICE).send(validate_msg).await??) }.await {
Ok(true) => {}
Ok(false) => return response::bad_request(&"Insufficient funds to make allocation"),
Err(Error::Rpc(RpcMessageError::ValidateAllocation(
ValidateAllocationError::AccountNotRegistered,
))) => return response::bad_request(&"Account not registered"),
Err(e) => return response::server_error(&e),
}

let dao: AllocationDao = db.as_dao();
match async move {
let allocation_id = dao.create(allocation, node_id).await?;
let allocation_id = dao
.create(allocation, node_id, payment_platform, address)
.await?;
Ok(dao.get(allocation_id, node_id).await?)
}
.await
Expand Down
26 changes: 24 additions & 2 deletions core/payment/src/dao/allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ pub fn spend_from_allocation(
}

impl<'c> AllocationDao<'c> {
pub async fn create(&self, allocation: NewAllocation, owner_id: NodeId) -> DbResult<String> {
let allocation = WriteObj::new(allocation, owner_id);
pub async fn create(
&self,
allocation: NewAllocation,
owner_id: NodeId,
payment_platform: String,
address: String,
) -> DbResult<String> {
let allocation = WriteObj::new(allocation, owner_id, payment_platform, address);
let allocation_id = allocation.id.clone();
do_with_transaction(self.pool, move |conn| {
diesel::insert_into(dsl::pay_allocation)
Expand Down Expand Up @@ -100,6 +106,22 @@ impl<'c> AllocationDao<'c> {
.await
}

pub async fn get_for_address(
&self,
payment_platform: String,
address: String,
) -> DbResult<Vec<Allocation>> {
readonly_transaction(self.pool, move |conn| {
let allocations: Vec<ReadObj> = dsl::pay_allocation
.filter(dsl::payment_platform.eq(payment_platform))
.filter(dsl::address.eq(address))
.filter(dsl::released.eq(false))
.load(conn)?;
Ok(allocations.into_iter().map(Into::into).collect())
})
.await
}

pub async fn release(&self, allocation_id: String, owner_id: NodeId) -> DbResult<bool> {
do_with_transaction(self.pool, move |conn| {
let num_released = diesel::update(
Expand Down
35 changes: 33 additions & 2 deletions core/payment/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ya_core_model::payment::local::GenericError;
use ya_core_model::payment::local::{GenericError, ValidateAllocationError};
use ya_core_model::payment::public::{AcceptRejectError, CancelError, SendError};
use ya_core_model::payment::RpcMessageError;

Expand Down Expand Up @@ -100,6 +100,12 @@ impl From<GenericError> for Error {
}
}

impl From<ValidateAllocationError> for Error {
fn from(e: ValidateAllocationError) -> Self {
Into::<RpcMessageError>::into(e).into()
}
}

pub mod processor {
use super::DbError;
use crate::models::activity::ReadObj as Activity;
Expand All @@ -108,7 +114,9 @@ pub mod processor {
use bigdecimal::BigDecimal;
use std::fmt::Display;
use ya_core_model::driver::AccountMode;
use ya_core_model::payment::local::GenericError;
use ya_core_model::payment::local::{
GenericError, ValidateAllocationError as GsbValidateAllocationError,
};
use ya_core_model::payment::public::SendError;

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -334,4 +342,27 @@ pub mod processor {
#[error("Error while sending payment: {0}")]
Driver(#[from] ya_core_model::driver::GenericError),
}

#[derive(thiserror::Error, Debug)]
pub enum ValidateAllocationError {
#[error("{0}")]
AccountNotRegistered(#[from] AccountNotRegistered),
#[error("Service bus error: {0}")]
ServiceBus(#[from] ya_service_bus::error::Error),
#[error("Error while sending payment: {0}")]
Driver(#[from] ya_core_model::driver::GenericError),
#[error("Database error: {0}")]
Database(#[from] DbError),
}

impl From<ValidateAllocationError> for GsbValidateAllocationError {
fn from(e: ValidateAllocationError) -> Self {
match e {
ValidateAllocationError::AccountNotRegistered(e) => {
GsbValidateAllocationError::AccountNotRegistered
}
e => GsbValidateAllocationError::Other(e.to_string()),
}
}
}
}
14 changes: 8 additions & 6 deletions core/payment/src/models/allocation.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::schema::pay_allocation;
use crate::DEFAULT_PAYMENT_PLATFORM;
use chrono::{NaiveDateTime, TimeZone, Utc};
use uuid::Uuid;
use ya_client_model::payment::{Allocation, NewAllocation};
Expand All @@ -22,14 +21,17 @@ pub struct WriteObj {
}

impl WriteObj {
pub fn new(allocation: NewAllocation, owner_id: NodeId) -> Self {
pub fn new(
allocation: NewAllocation,
owner_id: NodeId,
payment_platform: String,
address: String,
) -> Self {
Self {
id: Uuid::new_v4().to_string(),
owner_id,
payment_platform: allocation
.payment_platform
.unwrap_or(DEFAULT_PAYMENT_PLATFORM.to_string()),
address: allocation.address.unwrap_or(owner_id.to_string()),
payment_platform,
address,
total_amount: allocation.total_amount.clone().into(),
spent_amount: Default::default(),
remaining_amount: allocation.total_amount.into(),
Expand Down
Loading

0 comments on commit 2a7b92b

Please sign in to comment.