Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common, agent: switch most timer calls to sequential impl #1065

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 69 additions & 65 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
networkIsL1,
DeploymentManagementMode,
SubgraphStatus,
sequentialTimerMap,
} from '@graphprotocol/indexer-common'

import PQueue from 'p-queue'
Expand Down Expand Up @@ -253,40 +254,41 @@ export class Agent {
const requestIntervalSmall = this.pollingInterval
const requestIntervalLarge = this.pollingInterval * 5
const logger = this.logger.child({ component: 'ReconciliationLoop' })
const currentEpochNumber: Eventual<NetworkMapped<number>> = timer(
requestIntervalLarge,
).tryMap(
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching current epoch number', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.currentEpochNumber()
}),
{
onError: error =>
logger.warn(`Failed to fetch current epoch`, { error }),
},
)
const currentEpochNumber: Eventual<NetworkMapped<number>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching current epoch number', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.currentEpochNumber()
}),
{
onError: error =>
logger.warn(`Failed to fetch current epoch`, { error }),
},
)

const maxAllocationEpochs: Eventual<NetworkMapped<number>> = timer(
requestIntervalLarge,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation epochs', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.contracts.staking.maxAllocationEpochs()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation epochs`, { error }),
},
)
const maxAllocationEpochs: Eventual<NetworkMapped<number>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching max allocation epochs', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.contracts.staking.maxAllocationEpochs()
}),
{
onError: error =>
logger.warn(`Failed to fetch max allocation epochs`, { error }),
},
)

const indexingRules: Eventual<NetworkMapped<IndexingRuleAttributes[]>> =
timer(requestIntervalSmall).tryMap(
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () => {
return this.multiNetworks.map(async ({ network, operator }) => {
logger.trace('Fetching indexing rules', {
Expand Down Expand Up @@ -322,24 +324,25 @@ export class Agent {
},
)

const activeDeployments: Eventual<SubgraphDeploymentID[]> = timer(
requestIntervalSmall,
).tryMap(
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
},
{
onError: error =>
logger.warn(
`Failed to obtain active deployments, trying again later`,
{ error },
),
},
)
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
},
{
onError: error =>
logger.warn(
`Failed to obtain active deployments, trying again later`,
{ error },
),
},
)

const networkDeployments: Eventual<NetworkMapped<SubgraphDeployment[]>> =
timer(requestIntervalSmall).tryMap(
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
async () =>
await this.multiNetworks.map(({ network }) => {
logger.trace('Fetching network deployments', {
Expand All @@ -358,7 +361,8 @@ export class Agent {

const eligibleTransferDeployments: Eventual<
NetworkMapped<TransferredSubgraphDeployment[]>
> = timer(requestIntervalLarge).tryMap(
> = sequentialTimerMap(
{ logger, milliseconds: requestIntervalLarge },
async () => {
// Return early if the auto migration feature is disabled.
if (!this.autoMigrationSupport) {
Expand Down Expand Up @@ -558,23 +562,23 @@ export class Agent {
},
)

const activeAllocations: Eventual<NetworkMapped<Allocation[]>> = timer(
requestIntervalSmall,
).tryMap(
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching active allocations', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.allocations(AllocationStatus.ACTIVE)
}),
{
onError: () =>
logger.warn(
`Failed to obtain active allocations, trying again later`,
),
},
)
const activeAllocations: Eventual<NetworkMapped<Allocation[]>> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() =>
this.multiNetworks.map(({ network }) => {
logger.trace('Fetching active allocations', {
protocolNetwork: network.specification.networkIdentifier,
})
return network.networkMonitor.allocations(AllocationStatus.ACTIVE)
}),
{
onError: () =>
logger.warn(
`Failed to obtain active allocations, trying again later`,
),
},
)

// `activeAllocations` is used to trigger this Eventual, but not really needed
// inside.
Expand Down
12 changes: 10 additions & 2 deletions packages/indexer-common/src/allocations/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import {
indexerError,
IndexerErrorCode,
parseGraphQLAllocation,
sequentialTimerReduce,
} from '@graphprotocol/indexer-common'
import { Allocation, MonitorEligibleAllocationsOptions } from './types'

import gql from 'graphql-tag'

import { Eventual, timer } from '@graphprotocol/common-ts'
import { Eventual } from '@graphprotocol/common-ts'

export const monitorEligibleAllocations = ({
indexer,
Expand Down Expand Up @@ -168,7 +169,14 @@ export const monitorEligibleAllocations = ({
}
}

const allocations = timer(interval).reduce(refreshAllocations, [])
const allocations = sequentialTimerReduce(
{
logger,
milliseconds: interval,
},
refreshAllocations,
[],
)

allocations.pipe((allocations) => {
logger.info(`Eligible allocations`, {
Expand Down
6 changes: 3 additions & 3 deletions packages/indexer-common/src/allocations/query-fees.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { Counter, Gauge, Histogram } from 'prom-client'
import axios from 'axios'
import {
Logger,
timer,
BytesWriter,
toAddress,
formatGRT,
Expand All @@ -20,6 +19,7 @@ import {
ensureAllocationSummary,
TransactionManager,
specification as spec,
sequentialTimerMap,
} from '..'
import { DHeap } from '@thi.ng/heaps'
import { BigNumber, BigNumberish, Contract } from 'ethers'
Expand Down Expand Up @@ -264,7 +264,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
}

// Check if there's another batch of receipts to collect every 10s
timer(10_000).pipe(async () => {
sequentialTimerMap({ logger: this.logger, milliseconds: 10_000 }, async () => {
while (hasReceiptsReadyForCollecting()) {
// Remove the batch from the processing queue
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand All @@ -283,7 +283,7 @@ export class AllocationReceiptCollector implements ReceiptCollector {
}

private startVoucherProcessing() {
timer(30_000).pipe(async () => {
sequentialTimerMap({ logger: this.logger, milliseconds: 30_000 }, async () => {
let pendingVouchers: Voucher[] = []
try {
pendingVouchers = await this.pendingVouchers() // Ordered by value
Expand Down
11 changes: 6 additions & 5 deletions packages/indexer-common/src/allocations/tap-collector.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { Counter, Gauge, Histogram } from 'prom-client'
import {
Logger,
timer,
toAddress,
formatGRT,
Address,
Metrics,
Eventual,
join as joinEventual,
} from '@graphprotocol/common-ts'
import { NetworkContracts as TapContracts } from '@semiotic-labs/tap-contracts-bindings'
import {
Expand All @@ -23,6 +21,7 @@ import {
allocationSigner,
tapAllocationIdProof,
parseGraphQLAllocation,
sequentialTimerMap,
} from '..'
import { BigNumber } from 'ethers'
import pReduce from 'p-reduce'
Expand Down Expand Up @@ -184,9 +183,11 @@ export class TapCollector {
}

private getPendingRAVs(): Eventual<RavWithAllocation[]> {
return joinEventual({
timer: timer(RAV_CHECK_INTERVAL_MS),
}).tryMap(
return sequentialTimerMap(
{
logger: this.logger,
milliseconds: RAV_CHECK_INTERVAL_MS,
},
async () => {
let ravs = await this.pendingRAVs()
if (ravs.length === 0) {
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ export * from './utils'
export * from './parsers'
export * as specification from './network-specification'
export * from './multi-networks'
export * from './sequential-timer'
9 changes: 7 additions & 2 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import {
Network,
OrderDirection,
GraphNode,
sequentialTimerMap,
} from '@graphprotocol/indexer-common'

import { Order, Transaction } from 'sequelize'
import { Eventual, join, Logger, timer } from '@graphprotocol/common-ts'
import { Eventual, join, Logger } from '@graphprotocol/common-ts'
import groupBy from 'lodash.groupby'

export class ActionManager {
Expand Down Expand Up @@ -116,7 +117,11 @@ export class ActionManager {

async monitorQueue(): Promise<void> {
const logger = this.logger.child({ component: 'QueueMonitor' })
const approvedActions: Eventual<Action[]> = timer(30_000).tryMap(
const approvedActions: Eventual<Action[]> = sequentialTimerMap(
{
logger,
milliseconds: 30_000,
},
async () => {
logger.trace('Fetching approved actions')
let actions: Action[] = []
Expand Down
Loading
Loading