Skip to content

Commit

Permalink
Consider expiration return addresses for ledger updates (#1314)
Browse files Browse the repository at this point in the history
* Consider expiration return addresses for ledger updates

* fix import

* update spent outputs in migration

* Improve balance endpoint

* fmt

* optimized migration

* DRY

* update analytics

* improve index usage for balance

* fix balance calculation

* invert expiration checks

* more fixes for balance calculation

* nor does not exist for aggregation 🤷‍♀️

* refine query more

* update deps

* update api spec
  • Loading branch information
DaughterOfMars authored Jan 24, 2024
1 parent 3d96ce4 commit ca605a7
Show file tree
Hide file tree
Showing 16 changed files with 297 additions and 86 deletions.
12 changes: 6 additions & 6 deletions documentation/api/api-explorer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ components:
totalBalance:
type: string
description: >-
The total value held in unspent outputs owned by the given address
(includes funds held in storage deposit).
sigLockedBalance:
The total value held in unspent outputs that is unlockable by the given address or currently timelocked.
Does not include funds held in storage deposit.
availableBalance:
type: string
description: >-
The sum of value held in unspent outputs owned by the given address
that are signature locked ("trivially unlockable").
The total value held in unspent outputs that is immediately unlockable at ledgerIndex by the given address.
Does not include funds held in storage deposit.
ledgerIndex:
type: integer
description: The ledger index for which the balance calculation was performed.
Expand Down Expand Up @@ -585,7 +585,7 @@ components:
balance-example:
value:
totalBalance: 100000
sigLockedBalance: 99900
availableBalance: 99900
ledgerIndex: 500000
ledger-updates-address-example:
value:
Expand Down
4 changes: 2 additions & 2 deletions src/analytics/ledger/active_addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ impl IntervalAnalytics for AddressActivityMeasurement {
impl Analytics for AddressActivityAnalytics {
type Measurement = AddressActivityMeasurement;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) {
fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {
for output in consumed {
if let Some(a) = output.owning_address() {
self.addresses.insert(*a);
}
}

for output in created {
if let Some(a) = output.owning_address() {
if let Some(a) = output.output.owning_address(ctx.at().milestone_timestamp) {
self.addresses.insert(*a);
}
}
Expand Down
16 changes: 11 additions & 5 deletions src/analytics/ledger/address_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use std::collections::HashMap;

use super::*;
use crate::model::utxo::{Address, TokenAmount};
use crate::model::{
payload::milestone::MilestoneTimestamp,
utxo::{Address, TokenAmount},
};

#[derive(Debug)]
pub(crate) struct AddressBalanceMeasurement {
Expand All @@ -29,10 +32,13 @@ pub(crate) struct AddressBalancesAnalytics {

impl AddressBalancesAnalytics {
/// Initialize the analytics by reading the current ledger state.
pub(crate) fn init<'a>(unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>) -> Self {
pub(crate) fn init<'a>(
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
milestone_timestamp: MilestoneTimestamp,
) -> Self {
let mut balances = HashMap::new();
for output in unspent_outputs {
if let Some(&a) = output.owning_address() {
if let Some(&a) = output.output.owning_address(milestone_timestamp) {
*balances.entry(a).or_default() += output.amount();
}
}
Expand All @@ -43,7 +49,7 @@ impl AddressBalancesAnalytics {
impl Analytics for AddressBalancesAnalytics {
type Measurement = AddressBalanceMeasurement;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) {
fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {
for output in consumed {
if let Some(a) = output.owning_address() {
// All inputs should be present in `addresses`. If not, we skip it's value.
Expand All @@ -57,7 +63,7 @@ impl Analytics for AddressBalancesAnalytics {
}

for output in created {
if let Some(&a) = output.owning_address() {
if let Some(&a) = output.output.owning_address(ctx.at().milestone_timestamp) {
// All inputs should be present in `addresses`. If not, we skip it's value.
*self.balances.entry(a).or_default() += output.amount();
}
Expand Down
4 changes: 2 additions & 2 deletions src/analytics/ledger/base_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ pub(crate) struct BaseTokenActivityMeasurement {
impl Analytics for BaseTokenActivityMeasurement {
type Measurement = Self;

fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) {
fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) {
// The idea behind the following code is that we keep track of the deltas that are applied to each account that
// is represented by an address.
let mut balance_deltas: HashMap<&Address, i128> = HashMap::new();

// We first gather all tokens that have been moved to an individual address.
for output in created {
if let Some(address) = output.owning_address() {
if let Some(address) = output.output.owning_address(ctx.at().milestone_timestamp) {
*balance_deltas.entry(address).or_default() += output.amount().0 as i128;
}
}
Expand Down
12 changes: 8 additions & 4 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
model::{
ledger::{LedgerOutput, LedgerSpent},
metadata::LedgerInclusionState,
payload::{Payload, TransactionEssence},
payload::{milestone::MilestoneTimestamp, Payload, TransactionEssence},
protocol::ProtocolParameters,
tangle::{MilestoneIndex, MilestoneIndexTimestamp},
utxo::Input,
Expand Down Expand Up @@ -152,9 +152,12 @@ impl Analytic {
choice: &AnalyticsChoice,
protocol_params: &ProtocolParameters,
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
milestone_timestamp: MilestoneTimestamp,
) -> Self {
Self(match choice {
AnalyticsChoice::AddressBalance => Box::new(AddressBalancesAnalytics::init(unspent_outputs)) as _,
AnalyticsChoice::AddressBalance => {
Box::new(AddressBalancesAnalytics::init(unspent_outputs, milestone_timestamp)) as _
}
AnalyticsChoice::BaseTokenActivity => Box::<BaseTokenActivityMeasurement>::default() as _,
AnalyticsChoice::BlockActivity => Box::<BlockActivityMeasurement>::default() as _,
AnalyticsChoice::ActiveAddresses => Box::<AddressActivityAnalytics>::default() as _,
Expand Down Expand Up @@ -396,7 +399,7 @@ mod test {
ledger::{LedgerOutput, LedgerSpent},
metadata::BlockMetadata,
node::NodeConfiguration,
payload::{MilestoneId, MilestonePayload},
payload::{milestone::MilestoneTimestamp, MilestoneId, MilestonePayload},
protocol::ProtocolParameters,
tangle::{MilestoneIndex, MilestoneIndexTimestamp},
},
Expand Down Expand Up @@ -444,10 +447,11 @@ mod test {
fn init<'a>(
protocol_params: ProtocolParameters,
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput> + Copy,
milestone_timestamp: MilestoneTimestamp,
) -> Self {
Self {
active_addresses: Default::default(),
address_balance: AddressBalancesAnalytics::init(unspent_outputs),
address_balance: AddressBalancesAnalytics::init(unspent_outputs, milestone_timestamp),
base_tokens: Default::default(),
ledger_outputs: LedgerOutputMeasurement::init(unspent_outputs),
ledger_size: LedgerSizeAnalytics::init(protocol_params, unspent_outputs),
Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/api/explorer/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl From<LedgerUpdateByMilestoneRecord> for LedgerUpdateByMilestoneDto {
#[serde(rename_all = "camelCase")]
pub struct BalanceResponse {
pub total_balance: String,
pub sig_locked_balance: String,
pub available_balance: String,
pub ledger_index: MilestoneIndex,
}

Expand Down
10 changes: 5 additions & 5 deletions src/bin/inx-chronicle/api/explorer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,22 @@ async fn ledger_updates_by_milestone(
}

async fn balance(database: Extension<MongoDb>, Path(address): Path<String>) -> ApiResult<BalanceResponse> {
let ledger_index = database
let ledger_ms = database
.collection::<MilestoneCollection>()
.get_ledger_index()
.get_newest_milestone()
.await?
.ok_or(MissingError::NoResults)?;
let address = Address::from_str(&address).map_err(RequestError::from)?;
let res = database
.collection::<OutputCollection>()
.get_address_balance(address, ledger_index)
.get_address_balance(address, ledger_ms)
.await?
.ok_or(MissingError::NoResults)?;

Ok(BalanceResponse {
total_balance: res.total_balance,
sig_locked_balance: res.sig_locked_balance,
ledger_index,
available_balance: res.available_balance,
ledger_index: ledger_ms.milestone_index,
})
}

Expand Down
9 changes: 8 additions & 1 deletion src/bin/inx-chronicle/cli/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,14 @@ pub async fn fill_analytics<I: 'static + InputSource + Clone>(

let analytics = analytics_choices
.iter()
.map(|choice| Analytic::init(choice, &milestone.protocol_params, &ledger_state))
.map(|choice| {
Analytic::init(
choice,
&milestone.protocol_params,
&ledger_state,
milestone.at.milestone_timestamp,
)
})
.collect::<Vec<_>>();
state = Some(AnalyticsState {
analytics,
Expand Down
9 changes: 8 additions & 1 deletion src/bin/inx-chronicle/inx/influx/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ impl InxWorker {

let analytics = analytics_choices
.iter()
.map(|choice| Analytic::init(choice, &milestone.protocol_params, &ledger_state))
.map(|choice| {
Analytic::init(
choice,
&milestone.protocol_params,
&ledger_state,
milestone.at.milestone_timestamp,
)
})
.collect::<Vec<_>>();
*state = Some(AnalyticsState {
analytics,
Expand Down
122 changes: 122 additions & 0 deletions src/bin/inx-chronicle/migrations/migrate_2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use chronicle::{
db::{
mongodb::collections::{LedgerUpdateCollection, OutputCollection},
MongoDb, MongoDbCollection, MongoDbCollectionExt,
},
model::{
ledger::{LedgerOutput, LedgerSpent, RentStructureBytes},
metadata::OutputMetadata,
utxo::{Output, OutputId},
},
};
use futures::{prelude::stream::TryStreamExt, StreamExt};
use mongodb::bson::doc;
use serde::Deserialize;
use tokio::{task::JoinSet, try_join};

use super::Migration;

const INSERT_BATCH_SIZE: usize = 1000;

pub struct Migrate;

#[async_trait]
impl Migration for Migrate {
const ID: usize = 2;
const APP_VERSION: &'static str = "1.0.0-rc.3";
const DATE: time::Date = time::macros::date!(2024 - 01 - 12);

async fn migrate(db: &MongoDb) -> eyre::Result<()> {
db.collection::<LedgerUpdateCollection>()
.collection()
.drop(None)
.await?;

let outputs_stream = db
.collection::<OutputCollection>()
.find::<OutputDocument>(doc! {}, None)
.await?;
let mut batched_stream = outputs_stream.try_chunks(INSERT_BATCH_SIZE);

let mut tasks = JoinSet::new();

while let Some(batch) = batched_stream.next().await {
let batch = batch?;
while tasks.len() >= 100 {
if let Some(res) = tasks.join_next().await {
res??;
}
}
let db = db.clone();
tasks.spawn(async move {
let consumed = batch.iter().filter_map(Option::<LedgerSpent>::from).collect::<Vec<_>>();
let created = batch.into_iter().map(LedgerOutput::from).collect::<Vec<_>>();
try_join! {
async {
db.collection::<LedgerUpdateCollection>()
.insert_unspent_ledger_updates(&created)
.await
},
async {
db.collection::<OutputCollection>().update_spent_outputs(&consumed).await
},
async {
db.collection::<LedgerUpdateCollection>().insert_spent_ledger_updates(&consumed).await
}
}
.and(Ok(()))
});
}

while let Some(res) = tasks.join_next().await {
res??;
}

Ok(())
}
}

#[derive(Deserialize)]
pub struct OutputDocument {
#[serde(rename = "_id")]
output_id: OutputId,
output: Output,
metadata: OutputMetadata,
details: OutputDetails,
}

#[derive(Deserialize)]
struct OutputDetails {
rent_structure: RentStructureBytes,
}

impl From<OutputDocument> for LedgerOutput {
fn from(value: OutputDocument) -> Self {
Self {
output_id: value.output_id,
block_id: value.metadata.block_id,
booked: value.metadata.booked,
output: value.output,
rent_structure: value.details.rent_structure,
}
}
}

impl From<&OutputDocument> for Option<LedgerSpent> {
fn from(value: &OutputDocument) -> Self {
value.metadata.spent_metadata.map(|spent_metadata| LedgerSpent {
spent_metadata,
output: LedgerOutput {
output_id: value.output_id,
block_id: value.metadata.block_id,
booked: value.metadata.booked,
output: value.output.clone(),
rent_structure: value.details.rent_structure,
},
})
}
}
4 changes: 3 additions & 1 deletion src/bin/inx-chronicle/migrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ use eyre::bail;

pub mod migrate_0;
pub mod migrate_1;
pub mod migrate_2;

pub type LatestMigration = migrate_1::Migrate;
pub type LatestMigration = migrate_2::Migrate;

/// The list of migrations, in order.
const MIGRATIONS: &[&'static dyn DynMigration] = &[
// In order to add a new migration, change the `LatestMigration` type above and add an entry at the bottom of this
// list.
&migrate_0::Migrate,
&migrate_1::Migrate,
&migrate_2::Migrate,
];

fn build_migrations(migrations: &[&'static dyn DynMigration]) -> HashMap<Option<usize>, &'static dyn DynMigration> {
Expand Down
Loading

0 comments on commit ca605a7

Please sign in to comment.