diff --git a/packages/indexer-common/src/allocations/escrow-accounts.ts b/packages/indexer-common/src/allocations/escrow-accounts.ts
new file mode 100644
index 000000000..44d6060c2
--- /dev/null
+++ b/packages/indexer-common/src/allocations/escrow-accounts.ts
@@ -0,0 +1,64 @@
+import { Address, toAddress } from '@graphprotocol/common-ts'
+import { TAPSubgraph } from '../tap-subgraph'
+import gql from 'graphql-tag'
+
+type U256 = bigint
+
+type EscrowAccountResponse = {
+ escrowAccounts: {
+ balance: string
+ sender: {
+ id: string
+ }
+ }[]
+}
+
+export class EscrowAccounts {
+ constructor(private sendersBalances: Map
) {}
+
+ getBalanceForSender(sender: Address): U256 {
+ const balance = this.sendersBalances.get(sender)
+ if (balance === undefined) {
+ throw new Error(`No balance found for sender: ${sender}`)
+ }
+ return balance
+ }
+
+ subtractSenderBalance(sender: Address, ravValue: U256) {
+ const balance = this.getBalanceForSender(sender)
+ const newBalance = balance - ravValue
+ this.sendersBalances.set(sender, newBalance)
+ }
+
+ static fromResponse(response: EscrowAccountResponse): EscrowAccounts {
+ const sendersBalances = new Map()
+ response.escrowAccounts.forEach((account) => {
+ sendersBalances.set(toAddress(account.sender.id), BigInt(account.balance))
+ })
+
+ return new EscrowAccounts(sendersBalances)
+ }
+}
+
+export const getEscrowAccounts = async (
+ tapSubgraph: TAPSubgraph,
+ indexer: Address,
+): Promise => {
+ const result = await tapSubgraph.query(
+ gql`
+ query EscrowAccountQuery($indexer: ID!) {
+ escrowAccounts(where: { receiver_: { id: $indexer } }) {
+ balance
+ sender {
+ id
+ }
+ }
+ }
+ `,
+ { indexer },
+ )
+ if (!result.data) {
+ throw `There was an error while querying Tap Subgraph. Errors: ${result.error}`
+ }
+ return EscrowAccounts.fromResponse(result.data)
+}
diff --git a/packages/indexer-common/src/allocations/tap-collector.ts b/packages/indexer-common/src/allocations/tap-collector.ts
index 95a51564e..67b683ad5 100644
--- a/packages/indexer-common/src/allocations/tap-collector.ts
+++ b/packages/indexer-common/src/allocations/tap-collector.ts
@@ -29,6 +29,7 @@ import pReduce from 'p-reduce'
import { TAPSubgraph } from '../tap-subgraph'
import { NetworkSubgraph } from '../network-subgraph'
import gql from 'graphql-tag'
+import { getEscrowAccounts } from './escrow-accounts'
const RAV_CHECK_INTERVAL_MS = 30_000
@@ -90,6 +91,7 @@ export class TapCollector {
declare tapSubgraph: TAPSubgraph
declare networkSubgraph: NetworkSubgraph
declare finalityTime: number
+ declare indexerAddress: Address
// eslint-disable-next-line @typescript-eslint/no-empty-function -- Private constructor to prevent direct instantiation
private constructor() {}
@@ -119,10 +121,11 @@ export class TapCollector {
collector.tapSubgraph = tapSubgraph
collector.networkSubgraph = networkSubgraph
- const { voucherRedemptionThreshold, finalityTime } =
+ const { voucherRedemptionThreshold, finalityTime, address } =
networkSpecification.indexerOptions
collector.ravRedemptionThreshold = voucherRedemptionThreshold
collector.finalityTime = finalityTime
+ collector.indexerAddress = address
collector.logger.info(`RAV processing is initiated`)
collector.startRAVProcessing()
@@ -452,10 +455,28 @@ export class TapCollector {
logger.info(`Redeem last RAVs on chain individually`, {
signedRavs,
})
+ const escrowAccounts = await getEscrowAccounts(this.tapSubgraph, this.indexerAddress)
// Redeem RAV one-by-one as no plual version available
for (const { rav: signedRav, allocation, sender } of signedRavs) {
const { rav } = signedRav
+
+ // verify escrow balances
+ const ravValue = BigInt(rav.valueAggregate.toString())
+ const senderBalance = escrowAccounts.getBalanceForSender(sender)
+ if (senderBalance < ravValue) {
+ this.logger.warn(
+ 'RAV was not sent to the blockchain \
+ because its value aggregate is lower than escrow balance.',
+ {
+ rav,
+ sender,
+ senderBalance,
+ },
+ )
+ continue
+ }
+
const stopTimer = this.metrics.ravsRedeemDuration.startTimer({
allocation: rav.allocationId,
})
@@ -486,6 +507,10 @@ export class TapCollector {
this.metrics.ravRedeemsInvalid.inc({ allocation: rav.allocationId })
return
}
+ // subtract from the escrow account
+ // THIS IS A MUT OPERATION
+ escrowAccounts.subtractSenderBalance(sender, ravValue)
+
this.metrics.ravCollectedFees.set(
{ allocation: rav.allocationId },
parseFloat(rav.valueAggregate.toString()),