diff --git a/core/model/src/driver.rs b/core/model/src/driver.rs index 4b35918834..1ff3601efa 100644 --- a/core/model/src/driver.rs +++ b/core/model/src/driver.rs @@ -3,6 +3,7 @@ use bitflags::bitflags; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fmt::Display; +use ya_client_model::payment::Allocation; use ya_service_bus::RpcMessage; pub fn driver_bus_id(driver_name: T) -> String { @@ -210,3 +211,28 @@ impl RpcMessage for GetTransactionBalance { type Item = BigDecimal; type Error = GenericError; } + +// ************************** VALIDATE ALLOCATION ************************** + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ValidateAllocation { + pub address: String, + pub amount: BigDecimal, + pub existing_allocations: Vec, +} + +impl ValidateAllocation { + pub fn new(address: String, amount: BigDecimal, existing: Vec) -> Self { + ValidateAllocation { + address, + amount, + existing_allocations: existing, + } + } +} + +impl RpcMessage for ValidateAllocation { + const ID: &'static str = "ValidateAllocation"; + type Item = bool; + type Error = GenericError; +} diff --git a/core/model/src/payment.rs b/core/model/src/payment.rs index eefaae7e19..98bc04ae93 100644 --- a/core/model/src/payment.rs +++ b/core/model/src/payment.rs @@ -4,14 +4,16 @@ use ya_service_bus::RpcMessage; #[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] pub enum RpcMessageError { - #[error("Send error: {0}")] + #[error("{0}")] Send(#[from] public::SendError), - #[error("Accept/reject error: {0}")] + #[error("{0}")] AcceptReject(#[from] public::AcceptRejectError), - #[error("Cancel error: {0}")] + #[error("{0}")] Cancel(#[from] public::CancelError), #[error("{0}")] Generic(#[from] local::GenericError), + #[error("{0}")] + ValidateAllocation(#[from] local::ValidateAllocationError), } pub mod local { @@ -323,6 +325,27 @@ pub mod local { pub requestor: InvoiceStatusNotes, pub provider: InvoiceStatusNotes, } + + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ValidateAllocation { + pub platform: String, + pub address: String, + pub amount: BigDecimal, + } + + impl RpcMessage for ValidateAllocation { + const ID: &'static str = "ValidateAllocation"; + type Item = bool; + type Error = ValidateAllocationError; + } + + #[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)] + pub enum ValidateAllocationError { + #[error("Account not registered")] + AccountNotRegistered, + #[error("Error while validating allocation: {0}")] + Other(String), + } } pub mod public { diff --git a/core/payment-driver/base/src/bus.rs b/core/payment-driver/base/src/bus.rs index 1b35c91dba..8e4f571f01 100644 --- a/core/payment-driver/base/src/bus.rs +++ b/core/payment-driver/base/src/bus.rs @@ -52,6 +52,9 @@ pub async fn bind_service(db: &DbExecutor, driv ) .bind_with_processor( move |db, dr, c, m| async move { dr.verify_payment(db, c, m).await } + ) + .bind_with_processor( + move |db, dr, c, m| async move { dr.validate_allocation(db, c, m).await } ); log::debug!("Successfully bound payment driver service to service bus"); diff --git a/core/payment-driver/base/src/driver.rs b/core/payment-driver/base/src/driver.rs index 6379755d0c..c308bd812f 100644 --- a/core/payment-driver/base/src/driver.rs +++ b/core/payment-driver/base/src/driver.rs @@ -8,10 +8,7 @@ // Local uses use crate::dao::DbExecutor; -use crate::model::{ - Ack, GenericError, GetAccountBalance, GetTransactionBalance, Init, PaymentDetails, - SchedulePayment, VerifyPayment, -}; +use crate::model::*; // Public revealed uses, required to implement this trait pub use async_trait::async_trait; @@ -61,4 +58,11 @@ pub trait PaymentDriver { caller: String, msg: VerifyPayment, ) -> Result; + + async fn validate_allocation( + &self, + db: DbExecutor, + caller: String, + msg: ValidateAllocation, + ) -> Result; } diff --git a/core/payment-driver/dummy/src/service.rs b/core/payment-driver/dummy/src/service.rs index 5a488bc6a8..be6c4c554b 100644 --- a/core/payment-driver/dummy/src/service.rs +++ b/core/payment-driver/dummy/src/service.rs @@ -16,7 +16,8 @@ pub fn bind_service() { .bind(get_account_balance) .bind(get_transaction_balance) .bind(schedule_payment) - .bind(verify_payment); + .bind(verify_payment) + .bind(validate_allocation); log::debug!("Successfully bound payment driver service to service bus"); } @@ -111,3 +112,11 @@ async fn verify_payment( let details = serde_json::from_str(&json_str).unwrap(); Ok(details) } + +async fn validate_allocation( + _db: (), + _caller: String, + _msg: ValidateAllocation, +) -> Result { + Ok(true) +} diff --git a/core/payment-driver/gnt/src/service.rs b/core/payment-driver/gnt/src/service.rs index c41d641a0b..77f7ff5923 100644 --- a/core/payment-driver/gnt/src/service.rs +++ b/core/payment-driver/gnt/src/service.rs @@ -14,7 +14,8 @@ pub fn bind_service(db: &DbExecutor, processor: GNTDriverProcessor) { .bind_with_processor(get_account_balance) .bind_with_processor(get_transaction_balance) .bind_with_processor(schedule_payment) - .bind_with_processor(verify_payment); + .bind_with_processor(verify_payment) + .bind_with_processor(validate_allocation); log::debug!("Successfully bound payment driver service to service bus"); } @@ -118,6 +119,16 @@ async fn verify_payment( ) } +async fn validate_allocation( + _db: DbExecutor, + _processor: GNTDriverProcessor, + _caller: String, + _msg: ValidateAllocation, +) -> Result { + log::debug!("Validate allocation: {:?}", _msg); + Ok(true) // TODO: Implement proper check +} + async fn account_event( _db: DbExecutor, processor: GNTDriverProcessor, diff --git a/core/payment-driver/zksync/src/driver.rs b/core/payment-driver/zksync/src/driver.rs index 67a735f200..6528381e52 100644 --- a/core/payment-driver/zksync/src/driver.rs +++ b/core/payment-driver/zksync/src/driver.rs @@ -16,10 +16,7 @@ use ya_payment_driver::{ dao::DbExecutor, db::models::PaymentEntity, driver::{async_trait, BigDecimal, IdentityError, IdentityEvent, PaymentDriver}, - model::{ - Ack, GenericError, GetAccountBalance, GetTransactionBalance, Init, PaymentConfirmation, - PaymentDetails, SchedulePayment, VerifyPayment, - }, + model::*, utils, }; @@ -200,6 +197,21 @@ impl PaymentDriver for ZksyncDriver { // } from_confirmation(msg.confirmation()) } + + async fn validate_allocation( + &self, + _db: DbExecutor, + _caller: String, + msg: ValidateAllocation, + ) -> Result { + let account_balance = wallet::account_balance(&msg.address).await?; + let total_allocated_amount: BigDecimal = msg + .existing_allocations + .into_iter() + .map(|allocation| allocation.remaining_amount) + .sum(); + Ok(msg.amount <= (account_balance - total_allocated_amount)) + } } #[async_trait(?Send)] diff --git a/core/payment/src/api/requestor.rs b/core/payment/src/api/requestor.rs index fcf50de06e..634a61a6e0 100644 --- a/core/payment/src/api/requestor.rs +++ b/core/payment/src/api/requestor.rs @@ -2,13 +2,17 @@ use crate::api::*; use crate::dao::*; use crate::error::{DbError, Error}; use crate::utils::{listen_for_events, response, with_timeout}; +use crate::DEFAULT_PAYMENT_PLATFORM; use actix_web::web::{delete, get, post, put, Data, Json, Path, Query}; use actix_web::{HttpResponse, Scope}; use metrics::counter; use serde_json::value::Value::Null; use ya_agreement_utils::{ClauseOperator, ConstraintKey, Constraints}; use ya_client_model::payment::*; -use ya_core_model::payment::local::{GetAccounts, SchedulePayment, BUS_ID as LOCAL_SERVICE}; +use ya_core_model::payment::local::{ + GetAccounts, SchedulePayment, ValidateAllocation, ValidateAllocationError, + BUS_ID as LOCAL_SERVICE, +}; use ya_core_model::payment::public::{ AcceptDebitNote, AcceptInvoice, AcceptRejectError, BUS_ID as PUBLIC_SERVICE, }; @@ -365,12 +369,33 @@ async fn create_allocation( id: Identity, ) -> HttpResponse { // TODO: Handle deposits & timeouts - // TODO: Check available funds let allocation = body.into_inner(); let node_id = id.identity; + let payment_platform = allocation + .payment_platform + .clone() + .unwrap_or(DEFAULT_PAYMENT_PLATFORM.to_string()); + let address = allocation.address.clone().unwrap_or(node_id.to_string()); + + let validate_msg = ValidateAllocation { + platform: payment_platform.clone(), + address: address.clone(), + amount: allocation.total_amount.clone(), + }; + match async move { Ok(bus::service(LOCAL_SERVICE).send(validate_msg).await??) }.await { + Ok(true) => {} + Ok(false) => return response::bad_request(&"Insufficient funds to make allocation"), + Err(Error::Rpc(RpcMessageError::ValidateAllocation( + ValidateAllocationError::AccountNotRegistered, + ))) => return response::bad_request(&"Account not registered"), + Err(e) => return response::server_error(&e), + } + let dao: AllocationDao = db.as_dao(); match async move { - let allocation_id = dao.create(allocation, node_id).await?; + let allocation_id = dao + .create(allocation, node_id, payment_platform, address) + .await?; Ok(dao.get(allocation_id, node_id).await?) } .await diff --git a/core/payment/src/dao/allocation.rs b/core/payment/src/dao/allocation.rs index df037dcc23..113e53c4a3 100644 --- a/core/payment/src/dao/allocation.rs +++ b/core/payment/src/dao/allocation.rs @@ -44,8 +44,14 @@ pub fn spend_from_allocation( } impl<'c> AllocationDao<'c> { - pub async fn create(&self, allocation: NewAllocation, owner_id: NodeId) -> DbResult { - let allocation = WriteObj::new(allocation, owner_id); + pub async fn create( + &self, + allocation: NewAllocation, + owner_id: NodeId, + payment_platform: String, + address: String, + ) -> DbResult { + let allocation = WriteObj::new(allocation, owner_id, payment_platform, address); let allocation_id = allocation.id.clone(); do_with_transaction(self.pool, move |conn| { diesel::insert_into(dsl::pay_allocation) @@ -100,6 +106,22 @@ impl<'c> AllocationDao<'c> { .await } + pub async fn get_for_address( + &self, + payment_platform: String, + address: String, + ) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let allocations: Vec = dsl::pay_allocation + .filter(dsl::payment_platform.eq(payment_platform)) + .filter(dsl::address.eq(address)) + .filter(dsl::released.eq(false)) + .load(conn)?; + Ok(allocations.into_iter().map(Into::into).collect()) + }) + .await + } + pub async fn release(&self, allocation_id: String, owner_id: NodeId) -> DbResult { do_with_transaction(self.pool, move |conn| { let num_released = diesel::update( diff --git a/core/payment/src/error.rs b/core/payment/src/error.rs index aa590a088c..4aed6bf414 100644 --- a/core/payment/src/error.rs +++ b/core/payment/src/error.rs @@ -1,4 +1,4 @@ -use ya_core_model::payment::local::GenericError; +use ya_core_model::payment::local::{GenericError, ValidateAllocationError}; use ya_core_model::payment::public::{AcceptRejectError, CancelError, SendError}; use ya_core_model::payment::RpcMessageError; @@ -100,6 +100,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ValidateAllocationError) -> Self { + Into::::into(e).into() + } +} + pub mod processor { use super::DbError; use crate::models::activity::ReadObj as Activity; @@ -108,7 +114,9 @@ pub mod processor { use bigdecimal::BigDecimal; use std::fmt::Display; use ya_core_model::driver::AccountMode; - use ya_core_model::payment::local::GenericError; + use ya_core_model::payment::local::{ + GenericError, ValidateAllocationError as GsbValidateAllocationError, + }; use ya_core_model::payment::public::SendError; #[derive(thiserror::Error, Debug)] @@ -334,4 +342,27 @@ pub mod processor { #[error("Error while sending payment: {0}")] Driver(#[from] ya_core_model::driver::GenericError), } + + #[derive(thiserror::Error, Debug)] + pub enum ValidateAllocationError { + #[error("{0}")] + AccountNotRegistered(#[from] AccountNotRegistered), + #[error("Service bus error: {0}")] + ServiceBus(#[from] ya_service_bus::error::Error), + #[error("Error while sending payment: {0}")] + Driver(#[from] ya_core_model::driver::GenericError), + #[error("Database error: {0}")] + Database(#[from] DbError), + } + + impl From for GsbValidateAllocationError { + fn from(e: ValidateAllocationError) -> Self { + match e { + ValidateAllocationError::AccountNotRegistered(e) => { + GsbValidateAllocationError::AccountNotRegistered + } + e => GsbValidateAllocationError::Other(e.to_string()), + } + } + } } diff --git a/core/payment/src/models/allocation.rs b/core/payment/src/models/allocation.rs index e0a21c737b..da996d9b2c 100644 --- a/core/payment/src/models/allocation.rs +++ b/core/payment/src/models/allocation.rs @@ -1,5 +1,4 @@ use crate::schema::pay_allocation; -use crate::DEFAULT_PAYMENT_PLATFORM; use chrono::{NaiveDateTime, TimeZone, Utc}; use uuid::Uuid; use ya_client_model::payment::{Allocation, NewAllocation}; @@ -22,14 +21,17 @@ pub struct WriteObj { } impl WriteObj { - pub fn new(allocation: NewAllocation, owner_id: NodeId) -> Self { + pub fn new( + allocation: NewAllocation, + owner_id: NodeId, + payment_platform: String, + address: String, + ) -> Self { Self { id: Uuid::new_v4().to_string(), owner_id, - payment_platform: allocation - .payment_platform - .unwrap_or(DEFAULT_PAYMENT_PLATFORM.to_string()), - address: allocation.address.unwrap_or(owner_id.to_string()), + payment_platform, + address, total_amount: allocation.total_amount.clone().into(), spent_amount: Default::default(), remaining_amount: allocation.total_amount.into(), diff --git a/core/payment/src/processor.rs b/core/payment/src/processor.rs index 08caab00a0..8c402acea5 100644 --- a/core/payment/src/processor.rs +++ b/core/payment/src/processor.rs @@ -1,7 +1,7 @@ -use crate::dao::{ActivityDao, AgreementDao, OrderDao, PaymentDao}; +use crate::dao::{ActivityDao, AgreementDao, AllocationDao, OrderDao, PaymentDao}; use crate::error::processor::{ AccountNotRegistered, DriverNotRegistered, GetStatusError, NotifyPaymentError, - OrderValidationError, SchedulePaymentError, VerifyPaymentError, + OrderValidationError, SchedulePaymentError, ValidateAllocationError, VerifyPaymentError, }; use crate::models::order::ReadObj as DbOrder; use bigdecimal::{BigDecimal, Zero}; @@ -12,7 +12,7 @@ use std::collections::HashMap; use std::sync::Arc; use ya_client_model::payment::{ActivityPayment, AgreementPayment, Payment}; use ya_core_model::driver::{ - self, driver_bus_id, AccountMode, PaymentConfirmation, PaymentDetails, + self, driver_bus_id, AccountMode, PaymentConfirmation, PaymentDetails, ValidateAllocation, }; use ya_core_model::payment::local::{ Account, NotifyPayment, RegisterAccount, RegisterAccountError, SchedulePayment, @@ -387,4 +387,29 @@ impl PaymentProcessor { .await??; Ok(amount) } + + pub async fn validate_allocation( + &self, + platform: String, + address: String, + amount: BigDecimal, + ) -> Result { + let existing_allocations = self + .db_executor + .as_dao::() + .get_for_address(platform.clone(), address.clone()) + .await?; + let driver = + self.registry + .lock() + .await + .driver(&platform, &address, AccountMode::empty())?; + let msg = ValidateAllocation { + address, + amount, + existing_allocations, + }; + let result = driver_endpoint(&driver).send(msg).await??; + Ok(result) + } } diff --git a/core/payment/src/service.rs b/core/payment/src/service.rs index bef6cfd3a5..e54d2bb9bf 100644 --- a/core/payment/src/service.rs +++ b/core/payment/src/service.rs @@ -31,7 +31,8 @@ mod local { .bind_with_processor(notify_payment) .bind_with_processor(get_status) .bind_with_processor(get_invoice_stats) - .bind_with_processor(get_accounts); + .bind_with_processor(get_accounts) + .bind_with_processor(validate_allocation); // Initialize counters to 0 value. Otherwise they won't appear on metrics endpoint // until first change to value will be made. @@ -199,6 +200,17 @@ mod local { } Ok(output_stats) } + + async fn validate_allocation( + db: DbExecutor, + processor: PaymentProcessor, + sender: String, + msg: ValidateAllocation, + ) -> Result { + Ok(processor + .validate_allocation(msg.platform, msg.address, msg.amount) + .await?) + } } mod public {