Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
common, agent: switch most timer calls to sequential impl
Browse files Browse the repository at this point in the history
dwerner committed Dec 18, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent d85d2cf commit ae00a9e
Showing 12 changed files with 413 additions and 209 deletions.
134 changes: 69 additions & 65 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ import {
networkIsL1,
DeploymentManagementMode,
SubgraphStatus,
sequentialTimerMap,
} from '@graphprotocol/indexer-common'

import PQueue from 'p-queue'
@@ -253,40 +254,41 @@ export class Agent {
const requestIntervalSmall = this.pollingInterval
const requestIntervalLarge = this.pollingInterval * 5
const logger = this.logger.child({ component: 'ReconciliationLoop' })
const currentEpochNumber: Eventual<NetworkMapped<number>> = timer(
requestIntervalLarge,
).tryMap(
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching current epoch number', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.currentEpochNumber()
}),
{
onError: error =>
logger.warn(`Failed to fetch current epoch`, { error }),
},
)
const currentEpochNumber: Eventual<NetworkMapped<number>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching current epoch number', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.currentEpochNumber()
}),
{
onError: error =>
logger.warn(`Failed to fetch current epoch`, { error }),
},
)

const maxAllocationEpochs: Eventual<NetworkMapped<number>> = timer(
requestIntervalLarge,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation epochs', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.contracts.staking.maxAllocationEpochs()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation epochs`, { error }),
},
)
const maxAllocationEpochs: Eventual<NetworkMapped<number>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation epochs', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.contracts.staking.maxAllocationEpochs()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation epochs`, { error }),
},
)

const indexingRules: Eventual<NetworkMapped<IndexingRuleAttributes[]>> =
timer(requestIntervalSmall).tryMap(
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () => {
return this.multiNetworks.map(async ({ network, operator }) => {
logger.trace('Fetching indexing rules', {
@@ -322,24 +324,25 @@ export class Agent {
},
)

const activeDeployments: Eventual<SubgraphDeploymentID[]> = timer(
requestIntervalSmall,
).tryMap(
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
},
{
onError: error =>
logger.warn(
`Failed to obtain active deployments, trying again later`,
{ error },
),
},
)
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
},
{
onError: error =>
logger.warn(
`Failed to obtain active deployments, trying again later`,
{ error },
),
},
)

const networkDeployments: Eventual<NetworkMapped<SubgraphDeployment[]>> =
timer(requestIntervalSmall).tryMap(
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching network deployments', {
@@ -358,7 +361,8 @@ export class Agent {

const eligibleTransferDeployments: Eventual<
NetworkMapped<TransferredSubgraphDeployment[]>
> = timer(requestIntervalLarge).tryMap(
> = sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () => {
// Return early if the auto migration feature is disabled.
if (!this.autoMigrationSupport) {
@@ -558,23 +562,23 @@ export class Agent {
},
)

const activeAllocations: Eventual<NetworkMapped<Allocation[]>> = timer(
requestIntervalSmall,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching active allocations', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.allocations(AllocationStatus.ACTIVE)
}),
{
onError: () =>
logger.warn(
`Failed to obtain active allocations, trying again later`,
),
},
)
const activeAllocations: Eventual<NetworkMapped<Allocation[]>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching active allocations', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.allocations(AllocationStatus.ACTIVE)
}),
{
onError: () =>
logger.warn(
`Failed to obtain active allocations, trying again later`,
),
},
)

// `activeAllocations` is used to trigger this Eventual, but not really needed
// inside.
12 changes: 10 additions & 2 deletions packages/indexer-common/src/allocations/monitor.ts
Original file line number Diff line number Diff line change
@@ -2,12 +2,13 @@ import {
indexerError,
IndexerErrorCode,
parseGraphQLAllocation,
sequentialTimerReduce,
} from '@graphprotocol/indexer-common'
import { Allocation, MonitorEligibleAllocationsOptions } from './types'

import gql from 'graphql-tag'

import { Eventual, timer } from '@graphprotocol/common-ts'
import { Eventual } from '@graphprotocol/common-ts'

export const monitorEligibleAllocations = ({
indexer,
@@ -168,7 +169,14 @@ export const monitorEligibleAllocations = ({
}
}

const allocations = timer(interval).reduce(refreshAllocations, [])
const allocations = sequentialTimerReduce(
{
logger,
milliseconds: interval,
},
refreshAllocations,
[],
)

allocations.pipe((allocations) => {
logger.info(`Eligible allocations`, {
6 changes: 3 additions & 3 deletions packages/indexer-common/src/allocations/query-fees.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ import { Counter, Gauge, Histogram } from 'prom-client'
import axios from 'axios'
import {
Logger,
timer,
BytesWriter,
toAddress,
formatGRT,
@@ -20,6 +19,7 @@ import {
ensureAllocationSummary,
TransactionManager,
specification as spec,
sequentialTimerMap,
} from '..'
import { DHeap } from '@thi.ng/heaps'
import { BigNumber, BigNumberish, Contract } from 'ethers'
@@ -264,7 +264,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
}

// Check if there's another batch of receipts to collect every 10s
timer(10_000).pipe(async () => {
sequentialTimerMap({ logger: this.logger, milliseconds: 10_000 }, async () => {
while (hasReceiptsReadyForCollecting()) {
// Remove the batch from the processing queue
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
@@ -283,7 +283,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
}

private startVoucherProcessing() {
timer(30_000).pipe(async () => {
sequentialTimerMap({ logger: this.logger, milliseconds: 30_000 }, async () => {
let pendingVouchers: Voucher[] = []
try {
pendingVouchers = await this.pendingVouchers() // Ordered by value
11 changes: 6 additions & 5 deletions packages/indexer-common/src/allocations/tap-collector.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { Counter, Gauge, Histogram } from 'prom-client'
import {
Logger,
timer,
toAddress,
formatGRT,
Address,
Metrics,
Eventual,
join as joinEventual,
} from '@graphprotocol/common-ts'
import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings'
import {
@@ -23,6 +21,7 @@ import {
allocationSigner,
tapAllocationIdProof,
parseGraphQLAllocation,
sequentialTimerMap,
} from '..'
import { BigNumber } from 'ethers'
import pReduce from 'p-reduce'
@@ -184,9 +183,11 @@ export class TapCollector {
}

private getPendingRAVs(): Eventual<RavWithAllocation[]> {
return joinEventual({
timer: timer(RAV_CHECK_INTERVAL_MS),
}).tryMap(
return sequentialTimerMap(
{
logger: this.logger,
milliseconds: RAV_CHECK_INTERVAL_MS,
},
async () => {
let ravs = await this.pendingRAVs()
if (ravs.length === 0) {
1 change: 1 addition & 0 deletions packages/indexer-common/src/index.ts
Original file line number Diff line number Diff line change
@@ -17,3 +17,4 @@ export * from './utils'
export * from './parsers'
export * as specification from './network-specification'
export * from './multi-networks'
export * from './sequential-timer'
9 changes: 7 additions & 2 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
@@ -18,10 +18,11 @@ import {
Network,
OrderDirection,
GraphNode,
sequentialTimerMap,
} from '@graphprotocol/indexer-common'

import { Order, Transaction } from 'sequelize'
import { Eventual, join, Logger, timer } from '@graphprotocol/common-ts'
import { Eventual, join, Logger } from '@graphprotocol/common-ts'
import groupBy from 'lodash.groupby'

export class ActionManager {
@@ -116,7 +117,11 @@ export class ActionManager {

async monitorQueue(): Promise<void> {
const logger = this.logger.child({ component: 'QueueMonitor' })
const approvedActions: Eventual<Action[]> = timer(30_000).tryMap(
const approvedActions: Eventual<Action[]> = sequentialTimerMap(
{
logger,
milliseconds: 30_000,
},
async () => {
logger.trace('Fetching approved actions')
let actions: Action[] = []
Loading

0 comments on commit ae00a9e

Please sign in to comment.