Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add persistence to zksync payment driver #779

Merged
merged 10 commits into from
Nov 12, 2020
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions core/payment-driver/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,25 @@ edition = "2018"
default = []

[dependencies]
actix = "0.9"
anyhow = "1.0"
async-trait = "0.1"
bigdecimal = { version = "0.1.0" }
chrono = { version = "0.4", features = ["serde"] }
diesel = { version = "1.4", features = ["sqlite", "r2d2", "chrono"] }
diesel_migrations = "1.4"
ethereum-types = "0.6.0"
hex = "0.4"
log = "0.4.8"
num = { version = "0.2", features = ["serde"] }
r2d2 = "0.8"
thiserror = "1.0"
tokio = { version = "0.2", features = ["macros"] }

## yagna dependencies
ya-client-model = "0.2.0"
ya-core-model = { version = "0.2.1", features = ["driver", "identity", "payment"] }
ya-persistence = "0.2"
ya-service-bus = "0.2"

[dev-dependencies]
5 changes: 5 additions & 0 deletions core/payment-driver/base/diesel.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli

[print_schema]
file = "src/db/schema.rs"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DROP TABLE payment;

DROP TABLE transaction;

DROP TABLE payment_status;

DROP TABLE transaction_status;

DROP TABLE transaction_type;
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
CREATE TABLE `payment_status`
(
status_id INTEGER NOT NULL PRIMARY KEY,
status VARCHAR(50) NOT NULL
);

INSERT INTO `payment_status`
(status_id, status)
VALUES(1, "REQUESTED");
INSERT INTO `payment_status`
(status_id, status)
VALUES(2, "DONE");
INSERT INTO `payment_status`
(status_id, status)
VALUES(3, "NOT_ENOUGH_FUNDS");
INSERT INTO `payment_status`
(status_id, status)
VALUES(4, "NOT_ENOUGH_GAS");
INSERT INTO `payment_status`
(status_id, status)
VALUES(5, "FAILED");


CREATE TABLE `transaction_status`
(
status_id INTEGER NOT NULL PRIMARY KEY,
status VARCHAR(50) NOT NULL
);

INSERT INTO `transaction_status`
(status_id, status)
VALUES(1, "CREATED");
INSERT INTO `transaction_status`
(status_id, status)
VALUES(2, "SENT");
INSERT INTO `transaction_status`
(status_id, status)
VALUES(3, "CONFIRMED");
INSERT INTO `transaction_status`
(status_id, status)
VALUES(0, "FAILED");


CREATE TABLE transaction_type
(
type_id INTEGER NOT NULL PRIMARY KEY,
tx_type VARCHAR(50) NOT NULL
);

INSERT INTO `transaction_type`
(type_id, tx_type)
VALUES(0, "FAUCET");
INSERT INTO `transaction_type`
(type_id, tx_type)
VALUES(1, "TRANSFER");

CREATE TABLE `transaction`
(
tx_id VARCHAR(128) NOT NULL PRIMARY KEY,
sender VARCHAR(40) NOT NULL,
-- U256 in big endian hex
nonce VARCHAR(64) NOT NULL,
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
status INTEGER NOT NULL,
tx_type INTEGER NOT NULL,
encoded VARCHAR (8000) NOT NULL,
signature VARCHAR (130) NOT NULL,
tx_hash VARCHAR(64) NULL UNIQUE,
FOREIGN KEY(status) REFERENCES transaction_status (status_id),
FOREIGN KEY(tx_type) REFERENCES transaction_type (type_id)
);

CREATE TABLE `payment`
(
order_id VARCHAR(50) NOT NULL PRIMARY KEY,
-- U256 in big endian hex
amount VARCHAR(64) NOT NULL,
-- U256 in big endian hex
gas VARCHAR(64) NOT NULL,
sender VARCHAR(40) NOT NULL,
recipient VARCHAR(40) NOT NULL,
payment_due_date DATETIME NOT NULL,
status INTEGER NOT NULL,
tx_id VARCHAR(128),
FOREIGN KEY(tx_id) REFERENCES `transaction` (tx_id),
FOREIGN KEY(status) REFERENCES `payment_status` (status_id)
);
16 changes: 1 addition & 15 deletions core/payment-driver/base/src/account.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,5 @@
/*
Helper to store active accounts in the driver by <NodeId.to_string(), NodeId>.

To use Accounts on your driver:
- Add type AccountsRc to the struct, for example:
struct SomePaymentDriver {
active_accounts: AccountsRc,
}
- Implement get_accounts:
fn get_accounts(&self) -> AccountsRefMut {
self.active_accounts.borrow_mut()
}
- Make sure your "DriverService" subscribes to identity events
bus::subscribe_to_identity_events(driver).await;
- The PaymentDriver trait will keep the list updated

*/

// External crates
Expand Down Expand Up @@ -65,7 +51,7 @@ impl Accounts {
}
}

fn add_account(&mut self, account: NodeId) {
pub fn add_account(&mut self, account: NodeId) {
self.accounts.insert(account.to_string(), account);
log::info!("Account: {:?} is unlocked", account.to_string());
}
Expand Down
26 changes: 23 additions & 3 deletions core/payment-driver/base/src/bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use ya_service_bus::{
};

// Local uses
use crate::dao::DbExecutor;
use crate::driver::PaymentDriver;

pub async fn bind_service<D: PaymentDriver + 'static>(driver: D) {
pub async fn bind_service<Driver: PaymentDriver + 'static>(db: &DbExecutor, driver: Arc<Driver>) {
log::debug!("Binding payment driver service to service bus");
let driver = Arc::new(driver);
let bus_id = driver_bus_id(driver.get_name());

/* Short variable names explained:
Expand All @@ -34,7 +34,7 @@ pub async fn bind_service<D: PaymentDriver + 'static>(driver: D) {
m = message
*/
#[rustfmt::skip] // Keep move's neatly alligned
ServiceBinder::new(&bus_id, &(), driver)
ServiceBinder::new(&bus_id, db, driver)
.bind_with_processor(
move |db, dr, c, m| async move { dr.init(db, c, m).await }
)
Expand Down Expand Up @@ -64,6 +64,26 @@ pub async fn bind_service<D: PaymentDriver + 'static>(driver: D) {
}
}

pub async fn list_unlocked_identities() -> Result<Vec<NodeId>, GenericError> {
log::debug!("list_unlocked_identities");
let message = identity::List {};
let result = service(identity::BUS_ID)
.send(message)
.await
.map_err(GenericError::new)?
.map_err(GenericError::new)?;
let unlocked_list = result
.iter()
.filter(|n| !n.is_locked)
.map(|n| n.node_id)
.collect();
log::debug!(
"list_unlocked_identities completed. result={:?}",
unlocked_list
);
Ok(unlocked_list)
}

pub async fn register_account(
driver: &(dyn PaymentDriver),
address: &str,
Expand Down
64 changes: 64 additions & 0 deletions core/payment-driver/base/src/cron.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Manage PaymentDriver tasks to be ran on set intervals.
*/

// Extrernal crates
use actix::Arbiter;
use actix::AsyncContext;
use actix::{
prelude::{Addr, Context},
Actor,
};
use std::sync::Arc;
use std::time::Duration;

pub use async_trait::async_trait;

#[async_trait(?Send)]
pub trait PaymentDriverCron {
async fn confirm_payments(&self);
async fn process_payments(&self);
}

pub struct Cron<D: PaymentDriverCron> {
driver: Arc<D>,
}

impl<D: PaymentDriverCron + 'static> Cron<D> {
pub fn new(driver: Arc<D>) -> Addr<Self> {
log::trace!("Creating Cron for PaymentDriver.");
let me = Self { driver };
me.start()
}

fn start_confirmation_job(&mut self, ctx: &mut Context<Self>) {
let _ = ctx.run_interval(Duration::from_secs(10), |act, _ctx| {
log::trace!("Spawning confirmation job.");
let driver = act.driver.clone();
Arbiter::spawn(async move {
driver.confirm_payments().await;
log::trace!("Confirmation job finished.");
});
});
}

fn start_payment_job(&mut self, ctx: &mut Context<Self>) {
let _ = ctx.run_interval(Duration::from_secs(30), |act, _ctx| {
log::trace!("Spawning payment job.");
let driver = act.driver.clone();
Arbiter::spawn(async move {
driver.process_payments().await;
log::trace!("Payment job finished.");
});
});
}
}

impl<D: PaymentDriverCron + 'static> Actor for Cron<D> {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
self.start_confirmation_job(ctx);
self.start_payment_job(ctx);
}
}
15 changes: 15 additions & 0 deletions core/payment-driver/base/src/dao/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
Database errors.
*/

#[derive(thiserror::Error, Debug)]
pub enum DbError {
#[error("Database connection error: {0}")]
Connection(#[from] r2d2::Error),
#[error("Database query error: {0}")]
Query(#[from] diesel::result::Error),
#[error("Runtime error: {0}")]
Runtime(#[from] tokio::task::JoinError),
#[error("{0}")]
InvalidData(String),
}
20 changes: 20 additions & 0 deletions core/payment-driver/base/src/dao/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Shared Data Access Object tools.

Contains: Errors, Dao's, Migration tools.
*/

// Private
mod error;

pub use error::DbError;
pub mod payment;
pub mod transaction;

pub use ya_persistence::executor::DbExecutor;

pub type DbResult<T> = Result<T, DbError>;

pub async fn init(db: &DbExecutor) -> anyhow::Result<()> {
db.apply_migration(crate::db::migrations::run_with_output)
}
Loading