Skip to content

Commit

Permalink
[subgraph] feat: use balance deltas over RPC calls (#1948)
Browse files Browse the repository at this point in the history
* feat: use balance deltas over rpc calls

* feat: track when balance was last updated from rpc

- be smart when to query from rpc again
- avoid double rpc calling in the same block

* use more balance deltas

* use balance delta only for accounts without any cfa, gda, ida income

* add TODO

* add logging when balance from delta doesnt match

* dont log when it could be a failure anyway with high likelihood

* account for gda pool admins for isLiquidationEstimateOptimistic

* prefer looking at stream inflow and outflow separately

* fix missing mapping

* dont always log

* handle PR comments

* prepare 2.2.0 release
  • Loading branch information
kasparkallas authored Sep 19, 2024
1 parent 7f72da9 commit 2898aaa
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 60 deletions.
9 changes: 9 additions & 0 deletions packages/subgraph/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

## [2.2.0]

- Fix missing Token name/symbol in some cases
- Fix Pool's `totalAmountDistributed` being wrong in some cases
- Add `adminOfPoolCount` to AccountTokenSnapshot to count how many pools the account is admin of
- Add `balanceLastUpdatedFromRpcBlocknumber` to AccountTokenSnapshot to track when last RPC call was made
- Don't always make an RPC call when updating the balance of an AccountTokenSnapshot, use the balance delta for simple cases
- Account for GDA's adjustment flow when setting the `isLiquidationEstimateOptimistic` flag on AccountTokenSnapshot

## [2.1.0]

## Breaking
Expand Down
2 changes: 1 addition & 1 deletion packages/subgraph/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@superfluid-finance/subgraph",
"description": "Subgraph for the Superfluid Ethereum contracts.",
"version": "2.1.0",
"version": "2.2.0",
"dependencies": {
"@graphprotocol/graph-cli": "0.80.1",
"@graphprotocol/graph-ts": "0.35.1",
Expand Down
14 changes: 12 additions & 2 deletions packages/subgraph/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -2328,20 +2328,30 @@ type AccountTokenSnapshot @entity {
totalApprovedSubscriptions: Int!

"""
The current (as of updatedAt) number of membership with units allocated to them tied to this `account`.
The current (as of updatedAt) number of membership with units allocated to them tied to this `account`. (both IDA and GDA)
"""
totalMembershipsWithUnits: Int!

"""
Counts all currently (as of updatedAt) approved membership whether or not they have units.
Counts all currently (as of updatedAt) approved membership whether or not they have units. (both IDA and GDA)
"""
totalConnectedMemberships: Int!

"""
Counts how many pools the account is a pool admin of. The pool admin can be set arbitrarily when creating a GDA pool. The pool admin might receive an "adjustment flow" if the pool has a flow distribution.
"""
adminOfPoolCount: Int!

"""
Balance of `account` as of `updatedAtTimestamp`/`updatedAtBlock`.
"""
balanceUntilUpdatedAt: BigInt!

"""
The last block the balance was queried from an RPC (the most accurate source for balance data).
"""
balanceLastUpdatedFromRpcBlocknumber: BigInt

"""
The total deposit this account has held by all flow agreements for `account` active streams.
"""
Expand Down
82 changes: 61 additions & 21 deletions packages/subgraph/src/mappingHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ if (accountTokenSnapshot == null) {
accountTokenSnapshot.totalApprovedSubscriptions = 0;
accountTokenSnapshot.totalMembershipsWithUnits = 0;
accountTokenSnapshot.totalConnectedMemberships = 0;
accountTokenSnapshot.adminOfPoolCount = 0;
accountTokenSnapshot.balanceUntilUpdatedAt = BIG_INT_ZERO;
accountTokenSnapshot.totalNetFlowRate = BIG_INT_ZERO;
accountTokenSnapshot.totalCFANetFlowRate = BIG_INT_ZERO;
Expand Down Expand Up @@ -938,7 +939,9 @@ export function updateAggregateDistributionAgreementData(

accountTokenSnapshot.isLiquidationEstimateOptimistic =
accountTokenSnapshot.totalSubscriptionsWithUnits > 0 ||
accountTokenSnapshot.totalMembershipsWithUnits > 0;
accountTokenSnapshot.totalMembershipsWithUnits > 0 ||
accountTokenSnapshot.adminOfPoolCount > 0;

accountTokenSnapshot.updatedAtTimestamp = block.timestamp;
accountTokenSnapshot.updatedAtBlockNumber = block.number;
accountTokenSnapshot.save();
Expand Down Expand Up @@ -981,23 +984,64 @@ function updateATSBalanceAndUpdatedAt(
Address.fromString(accountTokenSnapshot.token)
);

if (balanceDelta && accountTokenSnapshot.totalSubscriptionsWithUnits == 0) {
accountTokenSnapshot.balanceUntilUpdatedAt =
accountTokenSnapshot.balanceUntilUpdatedAt.plus(
balanceDelta as BigInt
// If the balance has been updated from RPC in this block then no need to update it again.
// The RPC call gets the final balance from that block.
if (accountTokenSnapshot.balanceLastUpdatedFromRpcBlocknumber !== block.number) {

// "Unpredictable" would mean an account receiving GDA distributions, IDA distributions or GDA adjustment flow.
// The reason they are "unpredictable" is that it would be unscalable to perfectly keep track of them with subgraph.
// What makes it unscalable is that the distribution events happen on the side of the distributor, not the receiver.
// We can't iterate all the receivers when a distribution is made.
const isAccountWithOnlyVeryPredictableBalanceSources =
!accountTokenSnapshot.isLiquidationEstimateOptimistic // Covers GDA and IDA
&& accountTokenSnapshot.activeGDAOutgoingStreamCount === 0
&& accountTokenSnapshot.totalInflowRate === BIG_INT_ZERO
&& accountTokenSnapshot.totalOutflowRate === BIG_INT_ZERO;

// If the balance has been updated in this block without an RPC, it's better to be safe than sorry and just get the final accurate state from the RPC.
const hasBalanceBeenUpdatedInThisBlock = accountTokenSnapshot.updatedAtBlockNumber === block.number;

if (balanceDelta && isAccountWithOnlyVeryPredictableBalanceSources && !hasBalanceBeenUpdatedInThisBlock) {
accountTokenSnapshot.balanceUntilUpdatedAt = accountTokenSnapshot.balanceUntilUpdatedAt.plus(balanceDelta);
} else {
// if the account has any subscriptions with units we assume that
// the balance data requires a RPC call for balance because we did not
// have claim events there and we do not count distributions
// for subscribers
const newBalanceResult = superTokenContract.try_realtimeBalanceOf(
Address.fromString(accountTokenSnapshot.account),
block.timestamp
);
} else {
// if the account has any subscriptions with units we assume that
// the balance data requires a RPC call for balance because we did not
// have claim events there and we do not count distributions
// for subscribers
const newBalanceResult = superTokenContract.try_realtimeBalanceOf(
Address.fromString(accountTokenSnapshot.account),
block.timestamp
);
if (!newBalanceResult.reverted) {
accountTokenSnapshot.balanceUntilUpdatedAt =
newBalanceResult.value.value0;

const balanceBeforeUpdate = accountTokenSnapshot.balanceUntilUpdatedAt;
let balanceFromRpc = balanceBeforeUpdate;
if (!newBalanceResult.reverted) {
balanceFromRpc = newBalanceResult.value.value0;
accountTokenSnapshot.balanceUntilUpdatedAt = balanceFromRpc;
accountTokenSnapshot.balanceLastUpdatedFromRpcBlocknumber = block.number;
} else {
log.warning("Fetching balance from RPC failed.", []);
}

if (balanceDelta && !accountTokenSnapshot.isLiquidationEstimateOptimistic) {
const balanceFromDelta = balanceBeforeUpdate.plus(balanceDelta);
if (!balanceFromRpc.equals(balanceFromDelta)) {
log.debug(
"Balance would have been different if we used balance delta over an RPC call. Block: {}, Timestamp: {}, Account: {}, Token: {}, Balance from RPC: {}, Balance from delta: {}, Balance delta: {}, Outgoing stream count: {}, Incoming stream count: {}",
[
block.number.toString(),
block.timestamp.toString(),
accountTokenSnapshot.account.toString(),
accountTokenSnapshot.token.toString(),
balanceFromRpc.toString(),
balanceFromDelta.toString(),
balanceDelta.toString(),
accountTokenSnapshot.activeOutgoingStreamCount.toString(),
accountTokenSnapshot.activeIncomingStreamCount.toString()
]
);
}
}
}
}

Expand Down Expand Up @@ -1026,10 +1070,6 @@ export function updateATSStreamedAndBalanceUntilUpdatedAt(
accountAddress: Address,
tokenAddress: Address,
block: ethereum.Block,

// TODO: we are currently always passing null here
// remove null one at a time and use validation script
// to compare v1 to feature
balanceDelta: BigInt | null
): void {
let accountTokenSnapshot = getOrInitAccountTokenSnapshot(
Expand Down
5 changes: 2 additions & 3 deletions packages/subgraph/src/mappings/cfav1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,13 @@ export function handleFlowUpdated(event: FlowUpdated): void {
senderAddress,
tokenAddress,
event.block,
// @note when deleting, we do RPC call (prevents double accounting post-liquidation)
null
depositDelta
);
updateATSStreamedAndBalanceUntilUpdatedAt(
receiverAddress,
tokenAddress,
event.block,
null
BigInt.fromI32(0)
);

// update stream counter data for sender and receiver ATS
Expand Down
45 changes: 27 additions & 18 deletions packages/subgraph/src/mappings/gdav1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
updateSenderATSStreamData,
updateTokenStatisticStreamData,
updateTokenStatsStreamedUntilUpdatedAt,
getOrInitAccountTokenSnapshot,
} from "../mappingHelpers";
import {
BIG_INT_ZERO,
Expand Down Expand Up @@ -59,15 +60,22 @@ export function handlePoolCreated(event: PoolCreated): void {
event.block
);
tokenStatistic.totalNumberOfPools = tokenStatistic.totalNumberOfPools + 1;

tokenStatistic.save();

updateATSStreamedAndBalanceUntilUpdatedAt(
event.params.admin,
event.params.token,
event.block,
null
BigInt.fromI32(0)
);

const accountTokenSnapshot = getOrInitAccountTokenSnapshot(
event.params.admin,
event.params.token,
event.block,
);
accountTokenSnapshot.adminOfPoolCount = accountTokenSnapshot.adminOfPoolCount + 1;
accountTokenSnapshot.save();

_createAccountTokenSnapshotLogEntity(
event,
Expand Down Expand Up @@ -130,19 +138,19 @@ export function handlePoolConnectionUpdated(
}
}

pool.save();
poolMember.save();

// Update Token Stats Streamed Until Updated At
updateTokenStatsStreamedUntilUpdatedAt(event.params.token, event.block);
// Update ATS Balance and Streamed Until Updated At
updateATSStreamedAndBalanceUntilUpdatedAt(
event.params.account,
event.params.token,
event.block,
null
BigInt.fromI32(0)
);

pool.save();
poolMember.save();

const isConnecting = event.params.connected;

// there is no concept of revoking in GDA, but in the subgraph
Expand Down Expand Up @@ -262,18 +270,6 @@ export function handleFlowDistributionUpdated(
);
_createTokenStatisticLogEntity(event, event.params.token, eventName);

// Update ATS
updateSenderATSStreamData(
event.params.distributor,
event.params.token,
event.params.newDistributorToPoolFlowRate,
flowRateDelta,
BIG_INT_ZERO,
isCreate,
isDelete,
false,
event.block
);
updateATSStreamedAndBalanceUntilUpdatedAt(
event.params.distributor,
event.params.token,
Expand All @@ -287,6 +283,19 @@ export function handleFlowDistributionUpdated(
eventName
);

// Update ATS
updateSenderATSStreamData(
event.params.distributor,
event.params.token,
event.params.newDistributorToPoolFlowRate,
flowRateDelta,
BIG_INT_ZERO,
isCreate,
isDelete,
false,
event.block
);

// Create Event Entity
_createFlowDistributionUpdatedEntity(event, poolDistributor.id, pool.totalUnits);
}
Expand Down
18 changes: 9 additions & 9 deletions packages/subgraph/src/mappings/idav1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export function handleIndexCreated(event: IndexCreated): void {
event.params.publisher,
event.params.token,
event.block,
null // will do RPC if any units exist anyways (balance isn't impacted by index creation)
BigInt.fromI32(0) // will do RPC if any units exist anyways (balance isn't impacted by index creation)
);

_createAccountTokenSnapshotLogEntity(
Expand Down Expand Up @@ -262,7 +262,7 @@ export function handleSubscriptionApproved(event: SubscriptionApproved): void {
event.params.subscriber,
event.params.token,
event.block,
null // will do RPC if any units exist anyways
balanceDelta
);

if (hasSubscriptionWithUnits) {
Expand All @@ -280,7 +280,7 @@ export function handleSubscriptionApproved(event: SubscriptionApproved): void {
event.params.publisher,
event.params.token,
event.block,
null // will do RPC if any units exist anyways
BigInt.fromI32(0)
);
_createAccountTokenSnapshotLogEntity(
event,
Expand Down Expand Up @@ -365,13 +365,13 @@ export function handleSubscriptionDistributionClaimed(
event.params.publisher,
event.params.token,
event.block,
null // will do RPC call if they have sub w/ units
BigInt.fromI32(0)
);
updateATSStreamedAndBalanceUntilUpdatedAt(
event.params.subscriber,
event.params.token,
event.block,
null // will do RPC call if they have sub w/ units
event.params.amount
);
_createAccountTokenSnapshotLogEntity(
event,
Expand Down Expand Up @@ -445,7 +445,7 @@ export function handleSubscriptionRevoked(event: SubscriptionRevoked): void {
event.params.subscriber,
event.params.token,
event.block,
null // will do RPC call if they have sub w/ units
balanceDelta
);

updateTokenStatsStreamedUntilUpdatedAt(event.params.token, event.block);
Expand All @@ -467,7 +467,7 @@ export function handleSubscriptionRevoked(event: SubscriptionRevoked): void {
event.params.publisher,
event.params.token,
event.block,
null // will do RPC call if they have sub w/ units
BigInt.fromI32(0)
);

// occurs on revoke or delete
Expand Down Expand Up @@ -563,13 +563,13 @@ export function handleSubscriptionUnitsUpdated(
event.params.publisher,
event.params.token,
event.block,
null // will do RPC call if they have sub w/ units
BigInt.fromI32(0)
);
updateATSStreamedAndBalanceUntilUpdatedAt(
event.params.subscriber,
event.params.token,
event.block,
null // will do RPC call if they have sub w/ units
balanceDelta
);

updateTokenStatsStreamedUntilUpdatedAt(event.params.token, event.block);
Expand Down
8 changes: 4 additions & 4 deletions packages/subgraph/src/mappings/superToken.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export function handleTokenUpgraded(event: TokenUpgraded): void {
event.params.account,
event.address,
event.block,
null // will always do final RPC - override accounting done in handleTransfer
event.params.amount
);
updateTokenStatsStreamedUntilUpdatedAt(event.address, event.block);
_createAccountTokenSnapshotLogEntity(
Expand Down Expand Up @@ -132,7 +132,7 @@ export function handleTokenDowngraded(event: TokenDowngraded): void {
event.params.account,
event.address,
event.block,
null // will always do final RPC - override accounting done in handleTransfer
event.params.amount.times(BigInt.fromI32(-1))
);
updateTokenStatsStreamedUntilUpdatedAt(event.address, event.block);
_createAccountTokenSnapshotLogEntity(
Expand Down Expand Up @@ -161,13 +161,13 @@ export function handleTransfer(event: Transfer): void {
event.params.to,
event.address,
event.block,
null // manual accounting (overridden in upgrade/downgrade)
event.params.value
);
updateATSStreamedAndBalanceUntilUpdatedAt(
event.params.from,
event.address,
event.block,
null // manual accounting (overridden in upgrade/downgrade)
event.params.value.times(BigInt.fromI32(-1))
);
updateTokenStatsStreamedUntilUpdatedAt(tokenId, event.block);

Expand Down
Loading

0 comments on commit 2898aaa

Please sign in to comment.