Skip to content

Commit

Permalink
indexer-agent: Improve efficiency of batch action preparation
Browse files Browse the repository at this point in the history
- Fetch 'currentEpoch' and 'activeAllocations' ony once; instead of
for each action being prepared
- Check indexing status before allocating; instead of calling ensure()
  • Loading branch information
fordN committed Nov 13, 2023
1 parent 3743257 commit 203cfd8
Showing 1 changed file with 79 additions and 37 deletions.
116 changes: 79 additions & 37 deletions packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
Action,
ActionFailure,
ActionType,
Allocation,
allocationIdProof,
AllocationResult,
AllocationStatus,
Expand All @@ -22,6 +23,7 @@ import {
IndexerManagementModels,
IndexingDecisionBasis,
IndexingRuleAttributes,
IndexingStatus,
isActionFailure,
isDeploymentWorthAllocatingTowards,
Network,
Expand All @@ -42,6 +44,12 @@ import {
import { BytesLike } from '@ethersproject/bytes'
import pMap from 'p-map'

export interface TransactionPreparationContext {
activeAllocations: Allocation[]
currentEpoch: BigNumber
indexingStatuses: IndexingStatus[]
}

export interface AllocateTransactionParams {
indexer: string
subgraphDeploymentID: BytesLike
Expand Down Expand Up @@ -228,15 +236,26 @@ export class AllocationManager {
}

async prepareTransactions(actions: Action[]): Promise<PopulateTransactionResult[]> {
const context: TransactionPreparationContext = {
activeAllocations: await this.network.networkMonitor.allocations(
AllocationStatus.ACTIVE,
),
currentEpoch: await this.network.contracts.epochManager.currentEpoch(),
indexingStatuses: await this.graphNode.indexingStatus([]),
}
return await pMap(
actions,
async (action: Action) => await this.prepareTransaction(action),
async (action: Action) => await this.prepareTransaction(action, context),
{
stopOnError: false,
},
)
}
async prepareTransaction(action: Action): Promise<PopulateTransactionResult> {

async prepareTransaction(
action: Action,
context: TransactionPreparationContext,
): Promise<PopulateTransactionResult> {
const logger = this.logger.child({ action: action.id })
logger.trace('Preparing transaction', {
action,
Expand All @@ -246,6 +265,7 @@ export class AllocationManager {
case ActionType.ALLOCATE:
return await this.prepareAllocate(
logger,
context,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
new SubgraphDeploymentID(action.deploymentID!),
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
Expand All @@ -254,6 +274,7 @@ export class AllocationManager {
case ActionType.UNALLOCATE:
return await this.prepareUnallocate(
logger,
context,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
action.allocationID!,
action.poi === null ? undefined : action.poi,
Expand All @@ -262,6 +283,7 @@ export class AllocationManager {
case ActionType.REALLOCATE:
return await this.prepareReallocate(
logger,
context,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
action.allocationID!,
action.poi === null ? undefined : action.poi,
Expand All @@ -287,6 +309,7 @@ export class AllocationManager {

async prepareAllocateParams(
logger: Logger,
context: TransactionPreparationContext,
deployment: SubgraphDeploymentID,
amount: BigNumber,
): Promise<AllocateTransactionParams> {
Expand All @@ -295,10 +318,7 @@ export class AllocationManager {
amount: amount.toString(),
})

const activeAllocations = await this.network.networkMonitor.allocations(
AllocationStatus.ACTIVE,
)
const allocation = activeAllocations.find(
const allocation = context.activeAllocations.find(
(allocation) =>
allocation.subgraphDeployment.id.toString() === deployment.toString(),
)
Expand All @@ -323,22 +343,32 @@ export class AllocationManager {
)
}

const currentEpoch = await this.network.contracts.epochManager.currentEpoch()

// Ensure subgraph is deployed before allocating
await this.graphNode.ensure(
`indexer-agent/${deployment.ipfsHash.slice(-10)}`,
deployment,
// Check that the subgraph is syncing and healthy before allocating
// Throw error if:
// - subgraph deployment is not syncing,
// - subgraph deployment is failed
const status = context.indexingStatuses.find(
(status) => status.subgraphDeployment.ipfsHash == deployment.ipfsHash,
)
if (!status) {
throw indexerError(
IndexerErrorCode.IE020,
`Subgraph deployment, '${deployment.ipfsHash}', is not syncing`,
)
}
if (status && status.health == 'failed') {
throw indexerError(
IndexerErrorCode.IE020,
`Subgraph deployment, '${deployment.ipfsHash}', failed during syncing`,
)
}

logger.debug('Obtain a unique Allocation ID')

// Obtain a unique allocation ID
const { allocationSigner, allocationId } = uniqueAllocationID(
this.network.transactionManager.wallet.mnemonic.phrase,
currentEpoch.toNumber(),
context.currentEpoch.toNumber(),
deployment,
activeAllocations.map((allocation) => allocation.id),
context.activeAllocations.map((allocation) => allocation.id),
)

// Double-check whether the allocationID already exists on chain, to
Expand Down Expand Up @@ -460,10 +490,11 @@ export class AllocationManager {

async prepareAllocate(
logger: Logger,
context: TransactionPreparationContext,
deployment: SubgraphDeploymentID,
amount: BigNumber,
): Promise<PopulatedTransaction> {
const params = await this.prepareAllocateParams(logger, deployment, amount)
const params = await this.prepareAllocateParams(logger, context, deployment, amount)
logger.debug(`Populating allocateFrom transaction`, {
indexer: params.indexer,
subgraphDeployment: params.subgraphDeploymentID,
Expand All @@ -483,6 +514,7 @@ export class AllocationManager {

async prepareUnallocateParams(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
force: boolean,
Expand All @@ -492,12 +524,14 @@ export class AllocationManager {
poi: poi || 'none provided',
})
const allocation = await this.network.networkMonitor.allocation(allocationID)

// Ensure allocation is old enough to close
const currentEpoch = await this.network.contracts.epochManager.currentEpoch()
if (BigNumber.from(allocation.createdAtEpoch).eq(currentEpoch)) {
if (BigNumber.from(allocation.createdAtEpoch).eq(context.currentEpoch)) {
throw indexerError(
IndexerErrorCode.IE064,
`Allocation '${allocation.id}' cannot be closed until epoch ${currentEpoch.add(
`Allocation '${
allocation.id
}' cannot be closed until epoch ${context.currentEpoch.add(
1,
)}. (Allocations cannot be closed in the same epoch they were created)`,
)
Expand Down Expand Up @@ -635,16 +669,24 @@ export class AllocationManager {

async prepareUnallocate(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
force: boolean,
): Promise<PopulatedTransaction> {
const params = await this.prepareUnallocateParams(logger, allocationID, poi, force)
const params = await this.prepareUnallocateParams(
logger,
context,
allocationID,
poi,
force,
)
return await this.populateUnallocateTransaction(logger, params)
}

async prepareReallocateParams(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
amount: BigNumber,
Expand All @@ -657,14 +699,9 @@ export class AllocationManager {
force,
})

/* Fetch all active allocations and search for our input parameter `allocationID`.
* We don't call `fetchAllocations` here because all allocations will be required
* later when generating a new `uniqueAllocationID`. */
const activeAllocations = await this.network.networkMonitor.allocations(
AllocationStatus.ACTIVE,
)
// Validate that the allocation exists and is old enough to close
const allocationAddress = toAddress(allocationID)
const allocation = activeAllocations.find((allocation) => {
const allocation = context.activeAllocations.find((allocation) => {
return allocation.id === allocationAddress
})
if (!allocation) {
Expand All @@ -674,25 +711,28 @@ export class AllocationManager {
`Reallocation failed: No active allocation with id '${allocationID}' found`,
)
}

// Ensure allocation is old enough to close
const currentEpoch = await this.network.contracts.epochManager.currentEpoch()
if (BigNumber.from(allocation.createdAtEpoch).eq(currentEpoch)) {
if (BigNumber.from(allocation.createdAtEpoch).eq(context.currentEpoch)) {
throw indexerError(
IndexerErrorCode.IE064,
`Allocation '${allocation.id}' cannot be closed until epoch ${currentEpoch.add(
`Allocation '${
allocation.id
}' cannot be closed until epoch ${context.currentEpoch.add(
1,
)}. (Allocations cannot be closed in the same epoch they were created)`,
)
}

logger.debug('Resolving POI')
logger.debug('Resolving POI', {
allocation: allocationID,
deployment: allocation.subgraphDeployment.id.ipfsHash,
})
const allocationPOI = await this.network.networkMonitor.resolvePOI(
allocation,
poi,
force,
)
logger.debug('POI resolved', {
deployment: allocation.subgraphDeployment.id.ipfsHash,
userProvidedPOI: poi,
poi: allocationPOI,
})
Expand Down Expand Up @@ -723,9 +763,9 @@ export class AllocationManager {
logger.debug('Generating a new unique Allocation ID')
const { allocationSigner, allocationId: newAllocationId } = uniqueAllocationID(
this.network.transactionManager.wallet.mnemonic.phrase,
currentEpoch.toNumber(),
context.currentEpoch.toNumber(),
allocation.subgraphDeployment.id,
activeAllocations.map((allocation) => allocation.id),
context.activeAllocations.map((allocation) => allocation.id),
)

logger.debug('New unique Allocation ID generated', {
Expand Down Expand Up @@ -774,7 +814,7 @@ export class AllocationManager {
deployment: allocation.subgraphDeployment.id.toString(),
poi: allocationPOI,
proof,
epoch: currentEpoch.toString(),
epoch: context.currentEpoch.toString(),
})

return {
Expand Down Expand Up @@ -921,13 +961,15 @@ export class AllocationManager {

async prepareReallocate(
logger: Logger,
context: TransactionPreparationContext,
allocationID: string,
poi: string | undefined,
amount: BigNumber,
force: boolean,
): Promise<PopulatedTransaction[]> {
const params = await this.prepareReallocateParams(
logger,
context,
allocationID,
poi,
amount,
Expand Down

0 comments on commit 203cfd8

Please sign in to comment.