diff --git a/Cargo.lock b/Cargo.lock index 719090fd3c..4527072d54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7522,12 +7522,23 @@ dependencies = [ name = "ya-payment-driver" version = "0.1.0" dependencies = [ + "actix", + "anyhow", "async-trait", "bigdecimal", "chrono", + "diesel", + "diesel_migrations", + "ethereum-types 0.6.0", + "hex", "log 0.4.11", + "num", + "r2d2", + "thiserror", + "tokio 0.2.22", "ya-client-model 0.2.0", "ya-core-model", + "ya-persistence", "ya-service-bus", ] @@ -7898,7 +7909,6 @@ dependencies = [ name = "ya-zksync-driver" version = "0.1.0" dependencies = [ - "actix", "anyhow", "async-trait", "awc 2.0.1", @@ -7918,6 +7928,7 @@ dependencies = [ "uuid 0.8.1", "ya-client-model 0.2.0", "ya-payment-driver", + "ya-service-api-interfaces", "zksync", "zksync_eth_signer", ] diff --git a/core/payment-driver/base/Cargo.toml b/core/payment-driver/base/Cargo.toml index 413e1140fe..ada62500e5 100644 --- a/core/payment-driver/base/Cargo.toml +++ b/core/payment-driver/base/Cargo.toml @@ -8,14 +8,25 @@ edition = "2018" default = [] [dependencies] +actix = "0.9" +anyhow = "1.0" async-trait = "0.1" bigdecimal = { version = "0.1.0" } chrono = { version = "0.4", features = ["serde"] } +diesel = { version = "1.4", features = ["sqlite", "r2d2", "chrono"] } +diesel_migrations = "1.4" +ethereum-types = "0.6.0" +hex = "0.4" log = "0.4.8" +num = { version = "0.2", features = ["serde"] } +r2d2 = "0.8" +thiserror = "1.0" +tokio = { version = "0.2", features = ["macros"] } ## yagna dependencies ya-client-model = "0.2.0" ya-core-model = { version = "0.2.1", features = ["driver", "identity", "payment"] } +ya-persistence = "0.2" ya-service-bus = "0.2" [dev-dependencies] diff --git a/core/payment-driver/base/diesel.toml b/core/payment-driver/base/diesel.toml new file mode 100644 index 0000000000..71215dbf76 --- /dev/null +++ b/core/payment-driver/base/diesel.toml @@ -0,0 +1,5 @@ +# For documentation on how to configure this file, +# see diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/db/schema.rs" diff --git a/core/payment-driver/base/migrations/2020-02-24-085334_init/down.sql b/core/payment-driver/base/migrations/2020-02-24-085334_init/down.sql new file mode 100644 index 0000000000..4c5dfbeacb --- /dev/null +++ b/core/payment-driver/base/migrations/2020-02-24-085334_init/down.sql @@ -0,0 +1,9 @@ +DROP TABLE payment; + +DROP TABLE transaction; + +DROP TABLE payment_status; + +DROP TABLE transaction_status; + +DROP TABLE transaction_type; diff --git a/core/payment-driver/base/migrations/2020-02-24-085334_init/up.sql b/core/payment-driver/base/migrations/2020-02-24-085334_init/up.sql new file mode 100644 index 0000000000..cdd00f9ffa --- /dev/null +++ b/core/payment-driver/base/migrations/2020-02-24-085334_init/up.sql @@ -0,0 +1,87 @@ +CREATE TABLE `payment_status` +( + status_id INTEGER NOT NULL PRIMARY KEY, + status VARCHAR(50) NOT NULL +); + +INSERT INTO `payment_status` + (status_id, status) +VALUES(1, "REQUESTED"); +INSERT INTO `payment_status` + (status_id, status) +VALUES(2, "DONE"); +INSERT INTO `payment_status` + (status_id, status) +VALUES(3, "NOT_ENOUGH_FUNDS"); +INSERT INTO `payment_status` + (status_id, status) +VALUES(4, "NOT_ENOUGH_GAS"); +INSERT INTO `payment_status` + (status_id, status) +VALUES(5, "FAILED"); + + +CREATE TABLE `transaction_status` +( + status_id INTEGER NOT NULL PRIMARY KEY, + status VARCHAR(50) NOT NULL +); + +INSERT INTO `transaction_status` + (status_id, status) +VALUES(1, "CREATED"); +INSERT INTO `transaction_status` + (status_id, status) +VALUES(2, "SENT"); +INSERT INTO `transaction_status` + (status_id, status) +VALUES(3, "CONFIRMED"); +INSERT INTO `transaction_status` + (status_id, status) +VALUES(0, "FAILED"); + + +CREATE TABLE transaction_type +( + type_id INTEGER NOT NULL PRIMARY KEY, + tx_type VARCHAR(50) NOT NULL +); + +INSERT INTO `transaction_type` + (type_id, tx_type) +VALUES(0, "FAUCET"); +INSERT INTO `transaction_type` + (type_id, tx_type) +VALUES(1, "TRANSFER"); + +CREATE TABLE `transaction` +( + tx_id VARCHAR(128) NOT NULL PRIMARY KEY, + sender VARCHAR(40) NOT NULL, + -- U256 in big endian hex + nonce VARCHAR(64) NOT NULL, + timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + status INTEGER NOT NULL, + tx_type INTEGER NOT NULL, + encoded VARCHAR (8000) NOT NULL, + signature VARCHAR (130) NOT NULL, + tx_hash VARCHAR(64) NULL UNIQUE, + FOREIGN KEY(status) REFERENCES transaction_status (status_id), + FOREIGN KEY(tx_type) REFERENCES transaction_type (type_id) +); + +CREATE TABLE `payment` +( + order_id VARCHAR(50) NOT NULL PRIMARY KEY, + -- U256 in big endian hex + amount VARCHAR(64) NOT NULL, + -- U256 in big endian hex + gas VARCHAR(64) NOT NULL, + sender VARCHAR(40) NOT NULL, + recipient VARCHAR(40) NOT NULL, + payment_due_date DATETIME NOT NULL, + status INTEGER NOT NULL, + tx_id VARCHAR(128), + FOREIGN KEY(tx_id) REFERENCES `transaction` (tx_id), + FOREIGN KEY(status) REFERENCES `payment_status` (status_id) +); diff --git a/core/payment-driver/base/src/account.rs b/core/payment-driver/base/src/account.rs index 42d55f62e5..991d53a41c 100644 --- a/core/payment-driver/base/src/account.rs +++ b/core/payment-driver/base/src/account.rs @@ -1,19 +1,5 @@ /* Helper to store active accounts in the driver by . - - To use Accounts on your driver: - - Add type AccountsRc to the struct, for example: - struct SomePaymentDriver { - active_accounts: AccountsRc, - } - - Implement get_accounts: - fn get_accounts(&self) -> AccountsRefMut { - self.active_accounts.borrow_mut() - } - - Make sure your "DriverService" subscribes to identity events - bus::subscribe_to_identity_events(driver).await; - - The PaymentDriver trait will keep the list updated - */ // External crates @@ -65,7 +51,7 @@ impl Accounts { } } - fn add_account(&mut self, account: NodeId) { + pub fn add_account(&mut self, account: NodeId) { self.accounts.insert(account.to_string(), account); log::info!("Account: {:?} is unlocked", account.to_string()); } diff --git a/core/payment-driver/base/src/bus.rs b/core/payment-driver/base/src/bus.rs index 3968f43d40..1b35c91dba 100644 --- a/core/payment-driver/base/src/bus.rs +++ b/core/payment-driver/base/src/bus.rs @@ -20,11 +20,11 @@ use ya_service_bus::{ }; // Local uses +use crate::dao::DbExecutor; use crate::driver::PaymentDriver; -pub async fn bind_service(driver: D) { +pub async fn bind_service(db: &DbExecutor, driver: Arc) { log::debug!("Binding payment driver service to service bus"); - let driver = Arc::new(driver); let bus_id = driver_bus_id(driver.get_name()); /* Short variable names explained: @@ -34,7 +34,7 @@ pub async fn bind_service(driver: D) { m = message */ #[rustfmt::skip] // Keep move's neatly alligned - ServiceBinder::new(&bus_id, &(), driver) + ServiceBinder::new(&bus_id, db, driver) .bind_with_processor( move |db, dr, c, m| async move { dr.init(db, c, m).await } ) @@ -64,6 +64,26 @@ pub async fn bind_service(driver: D) { } } +pub async fn list_unlocked_identities() -> Result, GenericError> { + log::debug!("list_unlocked_identities"); + let message = identity::List {}; + let result = service(identity::BUS_ID) + .send(message) + .await + .map_err(GenericError::new)? + .map_err(GenericError::new)?; + let unlocked_list = result + .iter() + .filter(|n| !n.is_locked) + .map(|n| n.node_id) + .collect(); + log::debug!( + "list_unlocked_identities completed. result={:?}", + unlocked_list + ); + Ok(unlocked_list) +} + pub async fn register_account( driver: &(dyn PaymentDriver), address: &str, diff --git a/core/payment-driver/base/src/cron.rs b/core/payment-driver/base/src/cron.rs new file mode 100644 index 0000000000..3e4ca35cf4 --- /dev/null +++ b/core/payment-driver/base/src/cron.rs @@ -0,0 +1,64 @@ +/* + Manage PaymentDriver tasks to be ran on set intervals. +*/ + +// Extrernal crates +use actix::Arbiter; +use actix::AsyncContext; +use actix::{ + prelude::{Addr, Context}, + Actor, +}; +use std::sync::Arc; +use std::time::Duration; + +pub use async_trait::async_trait; + +#[async_trait(?Send)] +pub trait PaymentDriverCron { + async fn confirm_payments(&self); + async fn process_payments(&self); +} + +pub struct Cron { + driver: Arc, +} + +impl Cron { + pub fn new(driver: Arc) -> Addr { + log::trace!("Creating Cron for PaymentDriver."); + let me = Self { driver }; + me.start() + } + + fn start_confirmation_job(&mut self, ctx: &mut Context) { + let _ = ctx.run_interval(Duration::from_secs(10), |act, _ctx| { + log::trace!("Spawning confirmation job."); + let driver = act.driver.clone(); + Arbiter::spawn(async move { + driver.confirm_payments().await; + log::trace!("Confirmation job finished."); + }); + }); + } + + fn start_payment_job(&mut self, ctx: &mut Context) { + let _ = ctx.run_interval(Duration::from_secs(30), |act, _ctx| { + log::trace!("Spawning payment job."); + let driver = act.driver.clone(); + Arbiter::spawn(async move { + driver.process_payments().await; + log::trace!("Payment job finished."); + }); + }); + } +} + +impl Actor for Cron { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.start_confirmation_job(ctx); + self.start_payment_job(ctx); + } +} diff --git a/core/payment-driver/base/src/dao/error.rs b/core/payment-driver/base/src/dao/error.rs new file mode 100644 index 0000000000..4f9828a08a --- /dev/null +++ b/core/payment-driver/base/src/dao/error.rs @@ -0,0 +1,15 @@ +/* + Database errors. +*/ + +#[derive(thiserror::Error, Debug)] +pub enum DbError { + #[error("Database connection error: {0}")] + Connection(#[from] r2d2::Error), + #[error("Database query error: {0}")] + Query(#[from] diesel::result::Error), + #[error("Runtime error: {0}")] + Runtime(#[from] tokio::task::JoinError), + #[error("{0}")] + InvalidData(String), +} diff --git a/core/payment-driver/base/src/dao/mod.rs b/core/payment-driver/base/src/dao/mod.rs new file mode 100644 index 0000000000..8c3e035f65 --- /dev/null +++ b/core/payment-driver/base/src/dao/mod.rs @@ -0,0 +1,20 @@ +/* + Shared Data Access Object tools. + + Contains: Errors, Dao's, Migration tools. +*/ + +// Private +mod error; + +pub use error::DbError; +pub mod payment; +pub mod transaction; + +pub use ya_persistence::executor::DbExecutor; + +pub type DbResult = Result; + +pub async fn init(db: &DbExecutor) -> anyhow::Result<()> { + db.apply_migration(crate::db::migrations::run_with_output) +} diff --git a/core/payment-driver/base/src/dao/payment.rs b/core/payment-driver/base/src/dao/payment.rs new file mode 100644 index 0000000000..8006f979e5 --- /dev/null +++ b/core/payment-driver/base/src/dao/payment.rs @@ -0,0 +1,82 @@ +/* + Data access object for payment, linking `PaymentEntity` with `payment` +*/ + +// External crates +use diesel::{self, ExpressionMethods, QueryDsl, RunQueryDsl}; + +// Workspace uses +use ya_persistence::executor::{do_with_transaction, readonly_transaction, AsDao, PoolType}; + +// Local uses +use crate::{ + dao::DbResult, + db::{ + models::{PaymentEntity, PAYMENT_STATUS_NOT_YET, PAYMENT_STATUS_OK}, + schema::payment::dsl, + }, +}; + +#[allow(unused)] +pub struct PaymentDao<'c> { + pool: &'c PoolType, +} + +impl<'c> AsDao<'c> for PaymentDao<'c> { + fn as_dao(pool: &'c PoolType) -> Self { + Self { pool } + } +} + +impl<'c> PaymentDao<'c> { + pub async fn get_pending_payments(&self, address: String) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let payments: Vec = dsl::payment + .filter(dsl::sender.eq(address)) + .filter(dsl::status.eq(PAYMENT_STATUS_NOT_YET)) + .order(dsl::payment_due_date.asc()) + .load(conn)?; + Ok(payments) + }) + .await + } + + pub async fn insert(&self, payment: PaymentEntity) -> DbResult<()> { + do_with_transaction(self.pool, move |conn| { + diesel::insert_into(dsl::payment) + .values(payment) + .execute(conn)?; + Ok(()) + }) + .await + } + + pub async fn update_status(&self, order_id: String, status: i32) -> DbResult<()> { + do_with_transaction(self.pool, move |conn| { + diesel::update(dsl::payment.find(order_id)) + .set(dsl::status.eq(status)) + .execute(conn)?; + Ok(()) + }) + .await + } + + pub async fn update_tx_id(&self, order_id: String, tx_id: String) -> DbResult<()> { + do_with_transaction(self.pool, move |conn| { + diesel::update(dsl::payment.find(order_id)) + .set((dsl::tx_id.eq(tx_id), dsl::status.eq(PAYMENT_STATUS_OK))) + .execute(conn)?; + Ok(()) + }) + .await + } + + pub async fn get_by_tx_id(&self, tx_id: String) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let payments: Vec = + dsl::payment.filter(dsl::tx_id.eq(tx_id)).load(conn)?; + Ok(payments) + }) + .await + } +} diff --git a/core/payment-driver/base/src/dao/transaction.rs b/core/payment-driver/base/src/dao/transaction.rs new file mode 100644 index 0000000000..eef4839a68 --- /dev/null +++ b/core/payment-driver/base/src/dao/transaction.rs @@ -0,0 +1,99 @@ +/* + Data access object for transaction, linking `TransactionEntity` with `transaction` +*/ + +// External crates +use diesel::{self, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl}; + +// Workspace uses +use ya_persistence::executor::{do_with_transaction, readonly_transaction, AsDao, PoolType}; + +// Local uses +use crate::{ + dao::DbResult, + db::{ + models::{TransactionEntity, TransactionStatus}, + schema::transaction::dsl, + }, +}; + +#[allow(unused)] +pub struct TransactionDao<'c> { + pool: &'c PoolType, +} + +impl<'c> AsDao<'c> for TransactionDao<'c> { + fn as_dao(pool: &'c PoolType) -> Self { + Self { pool } + } +} + +impl<'c> TransactionDao<'c> { + pub async fn get(&self, tx_id: String) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let tx: Option = + dsl::transaction.find(tx_id).first(conn).optional()?; + Ok(tx) + }) + .await + } + + pub async fn insert_transactions(&self, txs: Vec) -> DbResult<()> { + do_with_transaction(self.pool, move |conn| { + for tx in txs { + diesel::insert_into(dsl::transaction) + .values(tx) + .execute(conn)?; + } + Ok(()) + }) + .await + } + + pub async fn get_used_nonces(&self, address: String) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let nonces: Vec = dsl::transaction + .filter(dsl::sender.eq(address)) + .select(dsl::nonce) + .order(dsl::nonce.asc()) + .load(conn)?; + Ok(nonces) + }) + .await + } + + pub async fn get_unconfirmed_txs(&self) -> DbResult> { + self.get_by_status(TransactionStatus::Sent.into()).await + } + + pub async fn get_by_status(&self, status: i32) -> DbResult> { + readonly_transaction(self.pool, move |conn| { + let txs: Vec = + dsl::transaction.filter(dsl::status.eq(status)).load(conn)?; + Ok(txs) + }) + .await + } + + pub async fn update_tx_sent(&self, tx_id: String, tx_hash: String) -> DbResult<()> { + do_with_transaction(self.pool, move |conn| { + let sent_status: i32 = TransactionStatus::Sent.into(); + diesel::update(dsl::transaction.find(tx_id)) + .set((dsl::status.eq(sent_status), dsl::tx_hash.eq(tx_hash))) + .execute(conn)?; + Ok(()) + }) + .await + } + + pub async fn update_tx_status(&self, tx_id: String, status: TransactionStatus) -> DbResult<()> { + let status: i32 = status.into(); + do_with_transaction(self.pool, move |conn| { + diesel::update(dsl::transaction.find(tx_id)) + .set(dsl::status.eq(status)) + .execute(conn)?; + Ok(()) + }) + .await + } +} diff --git a/core/payment-driver/base/src/db/mod.rs b/core/payment-driver/base/src/db/mod.rs new file mode 100644 index 0000000000..d1fdd54593 --- /dev/null +++ b/core/payment-driver/base/src/db/mod.rs @@ -0,0 +1,11 @@ +/* + Raw database components. Schemas, models and migrations. +*/ + +pub mod migrations { + #[derive(diesel_migrations::EmbedMigrations)] + struct _Dummy; +} + +pub mod models; +pub mod schema; diff --git a/core/payment-driver/base/src/db/models.rs b/core/payment-driver/base/src/db/models.rs new file mode 100644 index 0000000000..c15cc51056 --- /dev/null +++ b/core/payment-driver/base/src/db/models.rs @@ -0,0 +1,86 @@ +/* + Raw database models. +*/ + +// External crates +use chrono::NaiveDateTime; +use std::convert::TryFrom; + +// Local uses +use crate::dao::{DbError, DbResult}; +use crate::db::schema::*; + +pub const TX_CREATED: i32 = 1; +pub const TX_SENT: i32 = 2; +pub const TX_CONFIRMED: i32 = 3; +pub const TX_FAILED: i32 = 0; + +pub const PAYMENT_STATUS_NOT_YET: i32 = 1; +pub const PAYMENT_STATUS_OK: i32 = 2; +pub const PAYMENT_STATUS_NOT_ENOUGH_FUNDS: i32 = 3; +pub const PAYMENT_STATUS_NOT_ENOUGH_GAS: i32 = 4; +pub const PAYMENT_STATUS_FAILED: i32 = 5; + +pub enum TransactionStatus { + Created, + Sent, + Confirmed, + Failed, +} + +impl TryFrom for TransactionStatus { + type Error = DbError; + + fn try_from(status: i32) -> DbResult { + match status { + TX_CREATED => Ok(TransactionStatus::Created), + TX_SENT => Ok(TransactionStatus::Sent), + TX_CONFIRMED => Ok(TransactionStatus::Confirmed), + TX_FAILED => Ok(TransactionStatus::Failed), + _ => Err(DbError::InvalidData(format!( + "Unknown tx status. {}", + status + ))), + } + } +} + +impl Into for TransactionStatus { + fn into(self) -> i32 { + match &self { + TransactionStatus::Created => TX_CREATED, + TransactionStatus::Sent => TX_SENT, + TransactionStatus::Confirmed => TX_CONFIRMED, + TransactionStatus::Failed => TX_FAILED, + } + } +} + +#[derive(Clone, Queryable, Debug, Identifiable, Insertable, PartialEq)] +#[primary_key(tx_hash)] +#[table_name = "transaction"] +pub struct TransactionEntity { + pub tx_id: String, + pub sender: String, + pub nonce: String, + pub timestamp: NaiveDateTime, + pub status: i32, + pub tx_type: i32, + pub encoded: String, + pub signature: String, + pub tx_hash: Option, +} + +#[derive(Queryable, Clone, Debug, Identifiable, Insertable, PartialEq)] +#[primary_key(order_id)] +#[table_name = "payment"] +pub struct PaymentEntity { + pub order_id: String, + pub amount: String, + pub gas: String, + pub sender: String, + pub recipient: String, + pub payment_due_date: NaiveDateTime, + pub status: i32, + pub tx_id: Option, +} diff --git a/core/payment-driver/base/src/db/schema.rs b/core/payment-driver/base/src/db/schema.rs new file mode 100644 index 0000000000..7aca9b56fc --- /dev/null +++ b/core/payment-driver/base/src/db/schema.rs @@ -0,0 +1,60 @@ +table! { + payment (order_id) { + order_id -> Text, + amount -> Text, + gas -> Text, + sender -> Text, + recipient -> Text, + payment_due_date -> Timestamp, + status -> Integer, + tx_id -> Nullable, + } +} + +table! { + payment_status (status_id) { + status_id -> Integer, + status -> Text, + } +} + +table! { + transaction (tx_id) { + tx_id -> Text, + sender -> Text, + nonce -> Text, + timestamp -> Timestamp, + status -> Integer, + tx_type -> Integer, + encoded -> Text, + signature -> Text, + tx_hash -> Nullable, + } +} + +table! { + transaction_status (status_id) { + status_id -> Integer, + status -> Text, + } +} + +table! { + transaction_type (type_id) { + type_id -> Integer, + tx_type -> Text, + } +} + +joinable!(payment -> payment_status (status)); +joinable!(payment -> transaction (tx_id)); +joinable!(transaction -> transaction_status (status)); +joinable!(transaction -> transaction_type (tx_type)); + +allow_tables_to_appear_in_same_query!( + payment, + payment_status, + transaction, + transaction_status, + transaction_type, +); diff --git a/core/payment-driver/base/src/driver.rs b/core/payment-driver/base/src/driver.rs index abe3b67382..6379755d0c 100644 --- a/core/payment-driver/base/src/driver.rs +++ b/core/payment-driver/base/src/driver.rs @@ -7,6 +7,7 @@ // Workspace uses // Local uses +use crate::dao::DbExecutor; use crate::model::{ Ack, GenericError, GetAccountBalance, GetTransactionBalance, Init, PaymentDetails, SchedulePayment, VerifyPayment, @@ -22,14 +23,14 @@ pub use ya_core_model::identity::{event::Event as IdentityEvent, Error as Identi pub trait PaymentDriver { async fn account_event( &self, - _db: (), + _db: DbExecutor, _caller: String, msg: IdentityEvent, ) -> Result<(), IdentityError>; async fn get_account_balance( &self, - db: (), + db: DbExecutor, caller: String, msg: GetAccountBalance, ) -> Result; @@ -40,23 +41,23 @@ pub trait PaymentDriver { async fn get_transaction_balance( &self, - db: (), + db: DbExecutor, caller: String, msg: GetTransactionBalance, ) -> Result; - async fn init(&self, db: (), caller: String, msg: Init) -> Result; + async fn init(&self, db: DbExecutor, caller: String, msg: Init) -> Result; async fn schedule_payment( &self, - db: (), + db: DbExecutor, caller: String, msg: SchedulePayment, ) -> Result; async fn verify_payment( &self, - db: (), + db: DbExecutor, caller: String, msg: VerifyPayment, ) -> Result; diff --git a/core/payment-driver/base/src/lib.rs b/core/payment-driver/base/src/lib.rs index 6ec1fc7bb9..2d3ca63591 100644 --- a/core/payment-driver/base/src/lib.rs +++ b/core/payment-driver/base/src/lib.rs @@ -4,10 +4,16 @@ Contains a trait, error stubs (TBD) and utils. */ +#[macro_use] +extern crate diesel; + extern crate log; pub mod account; pub mod bus; +pub mod cron; +pub mod dao; +pub mod db; pub mod driver; pub mod utils; diff --git a/core/payment-driver/base/src/utils.rs b/core/payment-driver/base/src/utils.rs index bb4a8fed65..a5370fcc30 100644 --- a/core/payment-driver/base/src/utils.rs +++ b/core/payment-driver/base/src/utils.rs @@ -3,12 +3,21 @@ */ // External crates +use bigdecimal::BigDecimal; use chrono::{DateTime, Utc}; +use ethereum_types::U256; +use num::bigint::ToBigInt; // Local uses +use crate::db::models::PaymentEntity; use crate::model::{PaymentDetails, SchedulePayment}; -pub fn to_payment_details(msg: SchedulePayment, date: Option>) -> PaymentDetails { +const PRECISION: u64 = 1_000_000_000_000_000_000; + +pub fn msg_to_payment_details( + msg: &SchedulePayment, + date: Option>, +) -> PaymentDetails { PaymentDetails { recipient: msg.recipient().to_string(), sender: msg.sender().to_string(), @@ -16,3 +25,39 @@ pub fn to_payment_details(msg: SchedulePayment, date: Option>) -> date, } } + +pub fn db_to_payment_details(payment: &PaymentEntity) -> PaymentDetails { + // TODO: Put date in database? + let date = Utc::now(); + let amount = u256_from_big_endian_hex(payment.amount.clone()); + let amount = u256_to_big_dec(amount); + PaymentDetails { + recipient: payment.recipient.clone(), + sender: payment.sender.clone(), + amount, + date: Some(date), + } +} + +pub fn u256_to_big_endian_hex(value: U256) -> String { + let mut bytes = [0u8; 32]; + value.to_big_endian(&mut bytes); + hex::encode(&bytes) +} + +pub fn u256_from_big_endian_hex(bytes: String) -> U256 { + let bytes = hex::decode(&bytes).unwrap(); + U256::from_big_endian(&bytes) +} + +pub fn big_dec_to_u256(v: BigDecimal) -> U256 { + let v = v * Into::::into(PRECISION); + let v = v.to_bigint().unwrap(); + let v = &v.to_string(); + U256::from_dec_str(v).unwrap() +} + +pub fn u256_to_big_dec(v: U256) -> BigDecimal { + let v: BigDecimal = v.to_string().parse().unwrap(); + v / Into::::into(PRECISION) +} diff --git a/core/payment-driver/zksync/Cargo.toml b/core/payment-driver/zksync/Cargo.toml index d7e7ed1485..44373fa456 100644 --- a/core/payment-driver/zksync/Cargo.toml +++ b/core/payment-driver/zksync/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" default = [] [dependencies] -actix = "0.9" async-trait = "0.1" anyhow = "1.0" awc = "2.0.0" @@ -27,8 +26,9 @@ zksync = { git = "https://github.com/matter-labs/zksync", branch = "master"} zksync_eth_signer = { git = "https://github.com/matter-labs/zksync", branch = "master"} ## yagna dependencies -ya-payment-driver = { version = "0.1.0" } -ya-client-model = { version = "0.2.0" } +ya-payment-driver = "0.1.0" +ya-client-model = "0.2.0" +ya-service-api-interfaces = "0.1" [dev-dependencies] dotenv = "0.15.0" diff --git a/core/payment-driver/zksync/src/dao.rs b/core/payment-driver/zksync/src/dao.rs new file mode 100644 index 0000000000..66f87dce72 --- /dev/null +++ b/core/payment-driver/zksync/src/dao.rs @@ -0,0 +1,196 @@ +/* + Database Access Object, all you need to interact with the database. +*/ + +// Extrernal crates +use chrono::{DateTime, Utc}; +use uuid::Uuid; + +// Workspace uses +use ya_payment_driver::{ + dao::{payment::PaymentDao, transaction::TransactionDao, DbExecutor}, + db::models::{ + PaymentEntity, TransactionEntity, TransactionStatus, PAYMENT_STATUS_FAILED, + PAYMENT_STATUS_NOT_YET, TX_CREATED, + }, + model::{GenericError, PaymentDetails, SchedulePayment}, + utils, +}; + +pub struct ZksyncDao { + db: DbExecutor, +} + +impl ZksyncDao { + pub fn new(db: DbExecutor) -> Self { + Self { db } + } + + fn payment(&self) -> PaymentDao { + self.db.as_dao::() + } + + fn transaction(&self) -> TransactionDao { + self.db.as_dao::() + } + + pub async fn get_pending_payments(&self, node_id: &str) -> Vec { + match self + .payment() + .get_pending_payments(node_id.to_string()) + .await + { + Ok(payments) => payments, + Err(e) => { + log::error!( + "Failed to fetch pending payments for {:?} : {:?}", + node_id, + e + ); + vec![] + } + } + } + + pub async fn insert_payment(&self, order_id: &str, msg: &SchedulePayment) { + let recipient = msg.recipient().to_owned(); + let gnt_amount = utils::big_dec_to_u256(msg.amount()); + let gas_amount = Default::default(); + + let payment = PaymentEntity { + amount: utils::u256_to_big_endian_hex(gnt_amount), + gas: utils::u256_to_big_endian_hex(gas_amount), + order_id: order_id.to_string(), + payment_due_date: msg.due_date().naive_utc(), + sender: msg.sender().clone(), + recipient: recipient.clone(), + status: PAYMENT_STATUS_NOT_YET, + tx_id: None, + }; + if let Err(e) = self.payment().insert(payment).await { + log::error!( + "Failed to store transaction for {:?} , msg={:?}, err={:?}", + order_id, + msg, + e + ) + // TO CHECK: Should it continue or stop the process... + } + } + + pub async fn insert_transaction( + &self, + details: &PaymentDetails, + date: DateTime, + ) -> String { + // TO CHECK: No difference between tx_id and tx_hash on zksync + // TODO: Implement pre-sign + let tx_id = Uuid::new_v4().to_string(); + let tx = TransactionEntity { + tx_id: tx_id.clone(), + sender: details.sender.clone(), + nonce: "".to_string(), // not used till pre-sign + status: TX_CREATED, + timestamp: date.naive_utc(), + tx_type: 0, // Zksync only knows transfers, unused field + encoded: "".to_string(), // not used till pre-sign + signature: "".to_string(), // not used till pre-sign + tx_hash: None, + }; + + if let Err(e) = self.transaction().insert_transactions(vec![tx]).await { + log::error!("Failed to store transaction for {:?} : {:?}", details, e) + // TO CHECK: Should it continue or stop the process... + } + tx_id + } + + pub async fn transaction_confirmed(&self, tx_id: &str, result: bool) -> Vec { + let status = if result { + TransactionStatus::Confirmed + } else { + TransactionStatus::Failed + }; + + if let Err(e) = self + .transaction() + .update_tx_status(tx_id.to_string(), status.into()) + .await + { + log::error!("Failed to update tx status for {:?} : {:?}", tx_id, e) + // TO CHECK: Should it continue or stop the process... + } + if result { + match self.payment().get_by_tx_id(tx_id.to_string()).await { + Ok(payments) => return payments, + Err(e) => log::error!("Failed to fetch `payments` for tx {:?} : {:?}", tx_id, e), + }; + } + vec![] + } + + pub async fn transaction_success(&self, tx_id: &str, tx_hash: &str, order_id: &str) { + if let Err(e) = self + .payment() + .update_tx_id(order_id.to_string(), tx_id.to_string()) + .await + { + log::error!("Failed to update for transaction {:?} : {:?}", tx_id, e) + // TO CHECK: Should it continue or stop the process... + } + if let Err(e) = self + .transaction() + .update_tx_sent(tx_id.to_string(), tx_hash.to_string()) + .await + { + log::error!("Failed to update for transaction {:?} : {:?}", tx_id, e) + // TO CHECK: Should it continue or stop the process... + } + } + + pub async fn transaction_failed(&self, tx_id: &str, error: &GenericError, order_id: &str) { + if let Err(e) = self + .payment() + .update_status( + order_id.to_string(), + match error { + // TODO: Handle other statusses + // GNTDriverError::InsufficientFunds => PAYMENT_STATUS_NOT_ENOUGH_FUNDS, + // GNTDriverError::InsufficientGas => PAYMENT_STATUS_NOT_ENOUGH_GAS, + _ => PAYMENT_STATUS_FAILED, + }, + ) + .await + { + log::error!( + "Failed to update transaction failed in `payment` {:?} : {:?}", + tx_id, + e + ) + // TO CHECK: Should it continue or stop the process... + } + + if let Err(e) = self + .transaction() + .update_tx_status(tx_id.to_string(), TransactionStatus::Failed.into()) + .await + { + log::error!( + "Failed to update transaction failed in `transaction` {:?} : {:?}", + tx_id, + e + ) + // TO CHECK: Should it continue or stop the process... + } + } + + pub async fn get_unconfirmed_txs(&self) -> Vec { + match self.transaction().get_unconfirmed_txs().await { + Ok(txs) => txs, + Err(e) => { + log::error!("Failed to fetch unconfirmed transactions : {:?}", e); + vec![] + } + } + } +} diff --git a/core/payment-driver/zksync/src/driver.rs b/core/payment-driver/zksync/src/driver.rs index d91b8466e9..67a735f200 100644 --- a/core/payment-driver/zksync/src/driver.rs +++ b/core/payment-driver/zksync/src/driver.rs @@ -4,7 +4,6 @@ Please limit the logic in this file, use local mods to handle the calls. */ // Extrnal crates -use actix::Arbiter; use chrono::Utc; use serde_json; use uuid::Uuid; @@ -13,34 +12,93 @@ use uuid::Uuid; use ya_payment_driver::{ account::{Accounts, AccountsRc}, bus, + cron::PaymentDriverCron, + dao::DbExecutor, + db::models::PaymentEntity, driver::{async_trait, BigDecimal, IdentityError, IdentityEvent, PaymentDriver}, model::{ Ack, GenericError, GetAccountBalance, GetTransactionBalance, Init, PaymentConfirmation, PaymentDetails, SchedulePayment, VerifyPayment, }, - utils as driver_utils, + utils, }; // Local uses -use crate::{zksync::wallet, DRIVER_NAME, PLATFORM_NAME}; +use crate::{dao::ZksyncDao, zksync::wallet, DRIVER_NAME, PLATFORM_NAME}; pub struct ZksyncDriver { active_accounts: AccountsRc, + dao: ZksyncDao, } impl ZksyncDriver { - pub fn new() -> Self { + pub fn new(db: DbExecutor) -> Self { Self { active_accounts: Accounts::new_rc(), + dao: ZksyncDao::new(db), } } + + pub async fn load_active_accounts(&self) { + log::debug!("load_active_accounts"); + let mut accounts = self.active_accounts.borrow_mut(); + let unlocked_accounts = bus::list_unlocked_identities().await.unwrap(); + for account in unlocked_accounts { + log::debug!("account={}", account); + accounts.add_account(account) + } + } + + fn is_account_active(&self, address: &str) -> bool { + self.active_accounts + .as_ref() + .borrow() + .get_node_id(address) + .is_some() + } + + async fn process_payments_for_account(&self, node_id: &str) { + log::trace!("Processing payments for node_id={}", node_id); + let payments: Vec = self.dao.get_pending_payments(node_id).await; + if !payments.is_empty() { + log::info!( + "Processing {} Payments for node_id={}", + payments.len(), + node_id + ); + log::debug!("Payments details: {:?}", payments); + } + for payment in payments { + self.handle_payment(payment).await; + } + } + + async fn handle_payment(&self, payment: PaymentEntity) { + let details = utils::db_to_payment_details(&payment); + let tx_id = self.dao.insert_transaction(&details, Utc::now()).await; + + match wallet::make_transfer(&details).await { + Ok(tx_hash) => { + self.dao + .transaction_success(&tx_id, &tx_hash, &payment.order_id) + .await; + } + Err(e) => { + self.dao + .transaction_failed(&tx_id, &e, &payment.order_id) + .await; + log::error!("NGNT transfer failed: {}", e); + //return Err(e); + } + }; + } } #[async_trait(?Send)] impl PaymentDriver for ZksyncDriver { async fn account_event( &self, - _db: (), + _db: DbExecutor, _caller: String, msg: IdentityEvent, ) -> Result<(), IdentityError> { @@ -50,7 +108,7 @@ impl PaymentDriver for ZksyncDriver { async fn get_account_balance( &self, - _db: (), + _db: DbExecutor, _caller: String, msg: GetAccountBalance, ) -> Result { @@ -72,7 +130,7 @@ impl PaymentDriver for ZksyncDriver { async fn get_transaction_balance( &self, - _db: (), + _db: DbExecutor, _caller: String, msg: GetTransactionBalance, ) -> Result { @@ -82,12 +140,17 @@ impl PaymentDriver for ZksyncDriver { Ok(BigDecimal::from(1_000_000_000_000_000_000u64)) } - async fn init(&self, _db: (), _caller: String, msg: Init) -> Result { + async fn init(&self, _db: DbExecutor, _caller: String, msg: Init) -> Result { log::debug!("init: {:?}", msg); + let address = msg.address().clone(); + + // TODO: payment_api fails to start due to provider account not unlocked + // if !self.is_account_active(&address) { + // return Err(GenericError::new("Can not init, account not active")); + // } wallet::init_wallet(&msg).await?; - let address = msg.address().clone(); let mode = msg.mode(); bus::register_account(self, &address, mode).await?; @@ -103,49 +166,77 @@ impl PaymentDriver for ZksyncDriver { async fn schedule_payment( &self, - _db: (), + _db: DbExecutor, _caller: String, msg: SchedulePayment, ) -> Result { log::debug!("schedule_payment: {:?}", msg); - let date = Utc::now(); - let details = driver_utils::to_payment_details(msg, Some(date)); - let confirmation = to_confirmation(&details)?; - // TODO: move to database / background task - wallet::make_transfer(&details).await?; - let order_id = Uuid::new_v4().to_string(); - let driver_name = self.get_name(); - // Make a clone of order_id as return values - let result = order_id.clone(); - - log::info!( - "Scheduled payment with success. order_id={}, details={:?}", - &order_id, - &details - ); + let sender = msg.sender().to_owned(); + if !self.is_account_active(&sender) { + return Err(GenericError::new( + "Can not schedule_payment, account not active", + )); + } - // Spawned because calling payment service while handling a call from payment service - // would result in a deadlock. - Arbiter::spawn(async move { - let _ = bus::notify_payment(&driver_name, &order_id, &details, confirmation) - .await - .map_err(|e| log::error!("{}", e)); - }); - Ok(result) + let order_id = Uuid::new_v4().to_string(); + self.dao.insert_payment(&order_id, &msg).await; + Ok(order_id) } async fn verify_payment( &self, - _db: (), + _db: DbExecutor, _caller: String, msg: VerifyPayment, ) -> Result { log::debug!("verify_payment: {:?}", msg); - // todo!() - // wallet::verify_transfer(msg).await? - let details = from_confirmation(msg.confirmation())?; - Ok(details) + // TODO: Get transaction details from zksync + // let tx_hash = hex::encode(msg.confirmation().confirmation); + // match wallet::check_tx(&tx_hash).await { + // Some(true) => Ok(wallet::build_payment_details(tx_hash)), + // Some(false) => Err(GenericError::new("Payment did not succeed")), + // None => Err(GenericError::new("Payment not ready to be checked")), + // } + from_confirmation(msg.confirmation()) + } +} + +#[async_trait(?Send)] +impl PaymentDriverCron for ZksyncDriver { + async fn confirm_payments(&self) { + let txs = self.dao.get_unconfirmed_txs().await; + log::trace!("confirm_payments {:?}", txs); + + for tx in txs { + log::trace!("checking tx {:?}", &tx); + let tx_hash = match &tx.tx_hash { + None => continue, + Some(a) => a, + }; + // Check_tx returns None when the result is unknown + if let Some(result) = wallet::check_tx(&tx_hash).await { + let payments = self.dao.transaction_confirmed(&tx.tx_id, result).await; + let driver_name = self.get_name(); + for pay in payments { + let details = utils::db_to_payment_details(&pay); + // TODO: Provider needs method to fetch details based on hash + // let tx_hash = hex::decode(&tx_hash).unwrap(); + let tx_hash = to_confirmation(&details).unwrap(); + if let Err(e) = + bus::notify_payment(&driver_name, &pay.order_id, &details, tx_hash).await + { + log::error!("{}", e) + }; + } + } + } + } + + async fn process_payments(&self) { + for node_id in self.active_accounts.borrow().list_accounts() { + self.process_payments_for_account(&node_id).await; + } } } diff --git a/core/payment-driver/zksync/src/lib.rs b/core/payment-driver/zksync/src/lib.rs index 518089bce5..243356f8d2 100644 --- a/core/payment-driver/zksync/src/lib.rs +++ b/core/payment-driver/zksync/src/lib.rs @@ -15,6 +15,7 @@ pub use service::ZksyncService as PaymentDriverService; #[macro_use] extern crate log; +mod dao; mod driver; mod service; mod zksync; diff --git a/core/payment-driver/zksync/src/service.rs b/core/payment-driver/zksync/src/service.rs index feeeefc40e..5bc550fb8c 100644 --- a/core/payment-driver/zksync/src/service.rs +++ b/core/payment-driver/zksync/src/service.rs @@ -2,8 +2,17 @@ The service that binds this payment driver into yagna via GSB. */ +// Extrernal crates +use std::sync::Arc; + // Workspace uses -use ya_payment_driver::bus; +use ya_payment_driver::{ + bus, + cron::Cron, + dao::{init, DbExecutor}, + model::GenericError, +}; +use ya_service_api_interfaces::Provider; // Local uses use crate::driver::ZksyncDriver; @@ -11,12 +20,27 @@ use crate::driver::ZksyncDriver; pub struct ZksyncService; impl ZksyncService { - pub async fn gsb(_context: &Context) -> anyhow::Result<()> { + pub async fn gsb>(context: &Context) -> anyhow::Result<()> { log::debug!("Connecting ZksyncService to gsb..."); - let driver = ZksyncDriver::new(); + // TODO: Read and validate env + log::debug!("Environment variables validated"); + + // Init database + let db: DbExecutor = context.component(); + init(&db).await.map_err(GenericError::new)?; + log::debug!("Database initialised"); + + // Load driver + let driver = ZksyncDriver::new(db.clone()); + driver.load_active_accounts().await; + let driver_rc = Arc::new(driver); + bus::bind_service(&db, driver_rc.clone()).await; + log::debug!("Driver loaded"); - bus::bind_service(driver).await; + // Start cron + Cron::new(driver_rc.clone()); + log::debug!("Cron started"); log::info!("Succesfully connected ZksyncService to gsb."); Ok(()) diff --git a/core/payment-driver/zksync/src/zksync/wallet.rs b/core/payment-driver/zksync/src/zksync/wallet.rs index 5525133511..79029ce294 100644 --- a/core/payment-driver/zksync/src/zksync/wallet.rs +++ b/core/payment-driver/zksync/src/zksync/wallet.rs @@ -10,7 +10,7 @@ use num::BigUint; use std::env; use std::str::FromStr; use zksync::types::BlockStatus; -use zksync::zksync_types::Address; +use zksync::zksync_types::{tx::TxHash, Address}; use zksync::{Network, Provider, Wallet, WalletCredentials}; use zksync_eth_signer::EthereumSigner; @@ -100,6 +100,29 @@ pub async fn make_transfer(details: &PaymentDetails) -> Result Option { + let provider = get_provider(); + let tx_hash = format!("sync-tx:{}", tx_hash); + let tx_hash = TxHash::from_str(&tx_hash).unwrap(); + let tx_info = provider.tx_info(tx_hash).await.unwrap(); + log::trace!("tx_info: {:?}", tx_info); + tx_info.success +} +// TODO: Get Transfer object from zksync +// pub async fn build_payment_details(tx_hash: &str) -> PaymentDetails { +// let provider = get_provider(); +// let tx_hash = format!("sync-tx:{}", tx_hash); +// let tx_hash = TxHash::from_str(&tx_hash).unwrap(); +// let tx_info = provider.tx_info(tx_hash).await.unwrap(); +// +// PaymentDetails { +// recipient: tx_info., +// sender, +// amount, +// date +// } +// } + fn get_provider() -> Provider { (*PROVIDER).clone() } diff --git a/core/payment/examples/payment_api.rs b/core/payment/examples/payment_api.rs index 4c374e1b63..aa3241ad02 100644 --- a/core/payment/examples/payment_api.rs +++ b/core/payment/examples/payment_api.rs @@ -94,12 +94,15 @@ pub async fn start_gnt_driver( Ok(()) } -pub async fn start_zksync_driver(requestor_account: Box) -> anyhow::Result<()> { +pub async fn start_zksync_driver( + db: &DbExecutor, + requestor_account: Box, +) -> anyhow::Result<()> { let requestor = NodeId::from(requestor_account.address().as_ref()); fake_list_identities(vec![requestor]); fake_subscribe_to_events(); - zksync::PaymentDriverService::gsb(&()).await?; + zksync::PaymentDriverService::gsb(db).await?; let requestor_sign_tx = get_sign_tx(requestor_account); fake_sign_tx(Box::new(requestor_sign_tx)); Ok(()) @@ -202,7 +205,7 @@ async fn main() -> anyhow::Result<()> { (gnt::DRIVER_NAME, gnt::PLATFORM_NAME) } Driver::Zksync => { - start_zksync_driver(requestor_account).await?; + start_zksync_driver(&db, requestor_account).await?; (zksync::DRIVER_NAME, zksync::PLATFORM_NAME) } }; diff --git a/core/serv/src/main.rs b/core/serv/src/main.rs index 0a151bfd6a..69119bb4c7 100644 --- a/core/serv/src/main.rs +++ b/core/serv/src/main.rs @@ -242,7 +242,8 @@ async fn start_payment_drivers(data_dir: &Path) -> anyhow::Result<()> { #[cfg(feature = "zksync-driver")] { use ya_zksync_driver::PaymentDriverService; - PaymentDriverService::gsb(&()).await?; + let db_executor = DbExecutor::from_data_dir(data_dir, "zksync-driver")?; + PaymentDriverService::gsb(&db_executor).await?; } Ok(()) }