Skip to content

Commit

Permalink
+ last 1h invoice stats
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed Oct 16, 2020
1 parent d3f8f08 commit ef2de64
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 51 additions & 0 deletions core/model/src/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ pub mod local {
}
}

impl std::ops::AddAssign for StatValue {
fn add_assign(&mut self, rhs: Self) {
self.agreements_count += rhs.agreements_count;
self.total_amount += rhs.total_amount;
}
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct StatusNotes {
pub requested: StatValue,
Expand Down Expand Up @@ -272,6 +279,50 @@ pub mod local {
type Item = Vec<Account>;
type Error = GenericError;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
#[non_exhaustive]
pub struct GetInvoiceStats {
pub node_id: NodeId,
pub requestor: bool,
pub provider: bool,
pub since: DateTime<Utc>,
}

impl GetInvoiceStats {
pub fn new(node_id: NodeId, since: DateTime<Utc>) -> Self {
Self {
node_id,
requestor: true,
provider: true,
since,
}
}
}

impl RpcMessage for GetInvoiceStats {
const ID: &'static str = "GetInvoiceStats";
type Item = InvoiceStats;
type Error = GenericError;
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
#[non_exhaustive]
pub struct InvoiceStatusNotes {
pub issued: StatValue,
pub received: StatValue,
pub accepted: StatValue,
pub rejected: StatValue,
pub failed: StatValue,
pub settled: StatValue,
pub cancelled: StatValue,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct InvoiceStats {
pub requestor: InvoiceStatusNotes,
pub provider: InvoiceStatusNotes,
}
}

pub mod public {
Expand Down
1 change: 1 addition & 0 deletions core/payment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ thiserror = "1.0"
tokio = { version = "0.2", features = ["fs"] }
uint = "0.7"
uuid = { version = "0.8", features = ["v4"] }
humantime="2.0.1"

[dev-dependencies]
ya-client = "0.3"
Expand Down
29 changes: 29 additions & 0 deletions core/payment/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::accounts::{init_account, Account};
use crate::{DEFAULT_PAYMENT_DRIVER, DEFAULT_PAYMENT_PLATFORM};
use chrono::Utc;
use structopt::*;
use ya_core_model::{identity as id_api, payment::local as pay};
use ya_service_api::{CliCtx, CommandOutput, ResponseTable};
Expand All @@ -23,6 +24,19 @@ pub enum PaymentCli {
platform: Option<String>,
},
Accounts,
Invoice {
address: Option<String>,
#[structopt(subcommand)]
command: InvoiceCommand,
},
}

#[derive(StructOpt, Debug)]
pub enum InvoiceCommand {
Status {
#[structopt(long)]
last: Option<humantime::Duration>,
},
}

impl PaymentCli {
Expand Down Expand Up @@ -83,6 +97,21 @@ impl PaymentCli {
}
.into())
}
PaymentCli::Invoice {
address,
command: InvoiceCommand::Status { last },
} => {
let seconds = last.map(|d| d.as_secs() as i64).unwrap_or(3600);
let address = resolve_address(address).await?;
CommandOutput::object(
bus::service(pay::BUS_ID)
.call(pay::GetInvoiceStats::new(
address.parse()?,
Utc::now() + chrono::Duration::seconds(-seconds),
))
.await??,
)
}
}
}
}
Expand Down
34 changes: 32 additions & 2 deletions core/payment/src/dao/invoice.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
use crate::dao::{agreement, invoice_event};
use crate::error::DbResult;
use crate::error::{DbError, DbResult};
use crate::models::invoice::{InvoiceXActivity, ReadObj, WriteObj};
use crate::schema::pay_agreement::dsl as agreement_dsl;
use crate::schema::pay_invoice::dsl;
use crate::schema::pay_invoice_x_activity::dsl as activity_dsl;
use bigdecimal::BigDecimal;
use chrono::{DateTime, Utc};
use diesel::{
BoolExpressionMethods, ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQueryDsl,
};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::convert::TryFrom;
use ya_client_model::payment::{DocumentStatus, EventType, Invoice, NewInvoice};
use ya_client_model::NodeId;
use ya_core_model::payment::local::StatValue;
use ya_persistence::executor::{
do_with_transaction, readonly_transaction, AsDao, ConnType, PoolType,
};
Expand Down Expand Up @@ -176,6 +179,33 @@ impl<'c> InvoiceDao<'c> {
.await
}

pub async fn last_invoice_stats(
&self,
node_id: NodeId,
since: DateTime<Utc>,
) -> DbResult<BTreeMap<(Role, DocumentStatus), StatValue>> {
let results = readonly_transaction(self.pool, move |conn| {
let invoices: Vec<ReadObj> = query!()
.filter(dsl::owner_id.eq(node_id))
.filter(dsl::timestamp.gt(since.naive_utc()))
.load(conn)?;
Ok::<_, DbError>(invoices)
})
.await?;
let mut stats = BTreeMap::<(Role, DocumentStatus), StatValue>::new();
for invoice in results {
let key = (invoice.role, DocumentStatus::try_from(invoice.status)?);
let entry = stats.entry(key).or_default();
let total_amount = invoice.amount.0;
*entry = entry.clone()
+ StatValue {
total_amount,
agreements_count: 1,
};
}
Ok(stats)
}

pub async fn get_for_provider(&self, node_id: NodeId) -> DbResult<Vec<Invoice>> {
self.get_for_role(node_id, Role::Provider).await
}
Expand Down
56 changes: 56 additions & 0 deletions core/payment/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ pub fn bind_service(db: &DbExecutor, processor: PaymentProcessor) {
mod local {
use super::*;
use crate::dao::*;
use std::collections::BTreeMap;
use ya_client_model::payment::DocumentStatus;
use ya_core_model::payment::local::*;
use ya_persistence::types::Role;

pub fn bind_service(db: &DbExecutor, processor: PaymentProcessor) {
log::debug!("Binding payment local service to service bus");
Expand All @@ -27,6 +30,7 @@ mod local {
.bind_with_processor(unregister_account)
.bind_with_processor(notify_payment)
.bind_with_processor(get_status)
.bind_with_processor(get_invoice_stats)
.bind_with_processor(get_accounts);

// Initialize counters to 0 value. Otherwise they won't appear on metrics endpoint
Expand Down Expand Up @@ -143,6 +147,58 @@ mod local {
incoming,
})
}

async fn get_invoice_stats(
db: DbExecutor,
processor: PaymentProcessor,
_caller: String,
msg: GetInvoiceStats,
) -> Result<InvoiceStats, GenericError> {
let stats: BTreeMap<(Role, DocumentStatus), StatValue> = async {
db.as_dao::<InvoiceDao>()
.last_invoice_stats(msg.node_id, msg.since)
.await
}
.map_err(GenericError::new)
.await?;
let mut output_stats = InvoiceStats::default();

fn aggregate(
iter: impl Iterator<Item = (DocumentStatus, StatValue)>,
) -> InvoiceStatusNotes {
let mut notes = InvoiceStatusNotes::default();
for (status, value) in iter {
match status {
DocumentStatus::Issued => notes.issued += value,
DocumentStatus::Received => notes.received += value,
DocumentStatus::Accepted => notes.accepted += value,
DocumentStatus::Rejected => notes.rejected += value,
DocumentStatus::Failed => notes.failed += value,
DocumentStatus::Settled => notes.settled += value,
DocumentStatus::Cancelled => notes.cancelled += value,
}
}
notes
}

if msg.provider {
output_stats.provider = aggregate(
stats
.iter()
.filter(|((role, _), _)| matches!(role, Role::Provider))
.map(|((_, status), value)| (*status, value.clone())),
);
}
if msg.requestor {
output_stats.requestor = aggregate(
stats
.iter()
.filter(|((role, _), _)| matches!(role, Role::Requestor))
.map(|((_, status), value)| (*status, value.clone())),
);
}
Ok(output_stats)
}
}

mod public {
Expand Down
2 changes: 1 addition & 1 deletion core/persistence/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ where
}
}

#[derive(Debug, Clone, AsExpression, FromSqlRow)]
#[derive(Debug, Clone, Ord, Eq, PartialOrd, PartialEq, AsExpression, FromSqlRow)]
#[sql_type = "Text"]
pub enum Role {
Provider,
Expand Down
19 changes: 18 additions & 1 deletion golem_cli/src/command/yagna.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use std::process::Stdio;

use tokio::process::{Child, Command};
use ya_core_model::payment::local::{StatusNotes, StatusResult};
use ya_core_model::payment::local::{InvoiceStats, InvoiceStatusNotes, StatusNotes, StatusResult};

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -64,6 +64,18 @@ impl PaymentSummary for StatusNotes {
}
}

impl PaymentSummary for InvoiceStatusNotes {
fn total_pending(&self) -> (BigDecimal, u64) {
let value = self.accepted.clone();
(value.total_amount, value.agreements_count)
}

fn unconfirmed(&self) -> (BigDecimal, u64) {
let value = self.issued.clone() + self.received.clone();
(value.total_amount.clone(), value.agreements_count)
}
}

pub struct YagnaCommand {
pub(super) cmd: Command,
}
Expand Down Expand Up @@ -98,6 +110,11 @@ impl YagnaCommand {
self.run().await
}

pub async fn invoice_status(mut self) -> anyhow::Result<InvoiceStats> {
self.cmd.args(&["--json", "payment", "invoice", "status"]);
self.run().await
}

pub async fn activity_status(mut self) -> anyhow::Result<ActivityStatus> {
self.cmd.args(&["--json", "activity", "status"]);
self.run().await
Expand Down
12 changes: 8 additions & 4 deletions golem_cli/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ pub async fn run() -> Result</*exit code*/ i32> {

if is_running {
let payments = {
let (id, payment_status) =
future::try_join(cmd.yagna()?.default_id(), cmd.yagna()?.payment_status()).await?;
let (id, payment_status, invoice_status) = future::try_join3(
cmd.yagna()?.default_id(),
cmd.yagna()?.payment_status(),
cmd.yagna()?.invoice_status(),
)
.await?;

let mut table = Table::new();
let format = format::FormatBuilder::new().padding(1, 1).build();
Expand All @@ -72,13 +76,13 @@ pub async fn run() -> Result</*exit code*/ i32> {
table.add_row(row!["amount", format!("{} NGNT", &payment_status.amount)]);
table.add_empty_row();
{
let (pending, pending_cnt) = payment_status.incoming.total_pending();
let (pending, pending_cnt) = invoice_status.provider.total_pending();
table.add_row(row![
"pending",
format!("{} NGNT ({})", pending, pending_cnt)
]);
}
let (unconfirmed, unconfirmed_cnt) = payment_status.incoming.unconfirmed();
let (unconfirmed, unconfirmed_cnt) = invoice_status.provider.unconfirmed();
table.add_row(row![
"issued",
format!("{} NGNT ({})", unconfirmed, unconfirmed_cnt)
Expand Down

0 comments on commit ef2de64

Please sign in to comment.