Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

emit events on reindex tx and chain #418

Merged
merged 11 commits into from
May 23, 2024
11 changes: 9 additions & 2 deletions src/components/Indexer/crawlerThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ export async function processNetworkData(
await sleep(interval)
// reindex chain command called
if (REINDEX_BLOCK && !lockProccessing) {
await reindexChain(currentBlock)
const result = await reindexChain(currentBlock)
// either "true" for success or "false" otherwise
parentPort.postMessage({
method: INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN,
data: { result }
})
}

if (stopCrawling) {
Expand All @@ -196,19 +201,21 @@ export async function processNetworkData(
}
}

async function reindexChain(currentBlock: number): Promise<void> {
async function reindexChain(currentBlock: number): Promise<boolean> {
const block = await updateLastIndexedBlockNumber(REINDEX_BLOCK)
if (block !== -1) {
REINDEX_BLOCK = null
const res = await deleteAllAssetsFromChain()
if (res === -1) {
await updateLastIndexedBlockNumber(currentBlock)
}
return true
paulo-ocean marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Set the reindex block to null -> force admin to trigger again the command until
// we have a notification from worker thread to parent thread #414.
INDEXER_LOGGER.error(`Block could not be reset. Continue indexing normally...`)
REINDEX_BLOCK = null
return false
}
}

Expand Down
13 changes: 12 additions & 1 deletion src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,22 @@ export class OceanIndexer {
INDEXER_DDO_EVENT_EMITTER.emit(event.method, event.data.id)
// remove from indexing list
} else if (event.method === INDEXER_CRAWLING_EVENTS.REINDEX_QUEUE_POP) {
// remove this one from the queue
// remove this one from the queue (means we processed the reindex for this tx)
INDEXING_QUEUE = INDEXING_QUEUE.filter(
(task) =>
task.txId !== event.data.txId && task.chainId !== event.data.chainId
)
// reindex tx successfully done
INDEXER_CRAWLING_EVENT_EMITTER.emit(
INDEXER_CRAWLING_EVENTS.REINDEX_TX, // explicitly set constant value for readability
event.data
)
} else if (event.method === INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN) {
// we should listen to this on the dashboard for instance
INDEXER_CRAWLING_EVENT_EMITTER.emit(
INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN,
event.data
)
} else if (event.method === INDEXER_CRAWLING_EVENTS.CRAWLING_STARTED) {
INDEXER_CRAWLING_EVENT_EMITTER.emit(event.method, event.data)
}
Expand Down
51 changes: 47 additions & 4 deletions src/test/integration/operationsDashboard.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import {
ENVIRONMENT_VARIABLES,
PROTOCOL_COMMANDS,
getConfiguration,
EVENTS
EVENTS,
INDEXER_CRAWLING_EVENTS
} from '../../utils/index.js'
import { OceanNodeConfig } from '../../@types/OceanNode.js'

Expand All @@ -34,9 +35,14 @@ import { StopNodeHandler } from '../../components/core/admin/stopNodeHandler.js'
import { ReindexTxHandler } from '../../components/core/admin/reindexTxHandler.js'
import { ReindexChainHandler } from '../../components/core/admin/reindexChainHandler.js'
import { FindDdoHandler } from '../../components/core/handler/ddoHandler.js'
import { streamToObject } from '../../utils/util.js'
import { sleep, streamToObject } from '../../utils/util.js'
import { expectedTimeoutFailure, waitToIndex } from './testUtils.js'
import { OceanIndexer } from '../../components/Indexer/index.js'
import {
INDEXER_CRAWLING_EVENT_EMITTER,
OceanIndexer
} from '../../components/Indexer/index.js'
import { getCrawlingInterval } from '../../components/Indexer/utils.js'
import { ReindexTask } from '../../components/Indexer/crawlerThread.js'

describe('Should test admin operations', () => {
let config: OceanNodeConfig
Expand Down Expand Up @@ -123,7 +129,8 @@ describe('Should test admin operations', () => {
}
})

it('should pass for reindex tx command', async () => {
it('should pass for reindex tx command', async function () {
this.timeout(DEFAULT_TEST_TIMEOUT * 2)
await waitToIndex(publishedDataset.ddo.did, EVENTS.METADATA_CREATED)
const signature = await getSignature(expiryTimestamp.toString())

Expand All @@ -140,19 +147,39 @@ describe('Should test admin operations', () => {
assert(validationResponse, 'invalid reindex tx validation response')
assert(validationResponse.valid === true, 'validation for reindex tx command failed')

let reindexResult: any = null
INDEXER_CRAWLING_EVENT_EMITTER.addListener(
INDEXER_CRAWLING_EVENTS.REINDEX_QUEUE_POP, // triggered when tx completes and removed from queue
(data) => {
// {ReindexTask}
reindexResult = data.result as ReindexTask
expect(reindexResult.txId).to.be.equal(publishedDataset.trxReceipt.hash)
expect(reindexResult.chainId).to.be.equal(DEVELOPMENT_CHAIN_ID)
}
)

const handlerResponse = await reindexTxHandler.handle(reindexTxCommand)
assert(handlerResponse, 'handler resp does not exist')
assert(handlerResponse.status.httpStatus === 200, 'incorrect http status')
const findDDOTask = {
command: PROTOCOL_COMMANDS.FIND_DDO,
id: publishedDataset.ddo.id
}

// wait a bit
await sleep(getCrawlingInterval() * 2)
if (reindexResult !== null) {
assert('chainId' in reindexResult, 'expected a chainId')
assert('txId' in reindexResult, 'expected a txId')
}

const response = await new FindDdoHandler(oceanNode).handle(findDDOTask)
const actualDDO = await streamToObject(response.stream as Readable)
assert(actualDDO[0].id === publishedDataset.ddo.id, 'DDO id not matching')
})

it('should pass for reindex chain command', async function () {
this.timeout(DEFAULT_TEST_TIMEOUT * 2)
const signature = await getSignature(expiryTimestamp.toString())
this.timeout(DEFAULT_TEST_TIMEOUT * 2)
const { ddo, wasTimeout } = await waitToIndex(
Expand All @@ -178,14 +205,30 @@ describe('Should test admin operations', () => {
'validation for reindex chain command failed'
)

let reindexResult: any = null
INDEXER_CRAWLING_EVENT_EMITTER.addListener(
INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN,
(data) => {
// {result: true/false}
assert(typeof data.result === 'boolean', 'expected a boolean value')
reindexResult = data.result as boolean
}
)
const handlerResponse = await reindexChainHandler.handle(reindexChainCommand)
assert(handlerResponse, 'handler resp does not exist')
assert(handlerResponse.status.httpStatus === 200, 'incorrect http status')

// give it a little time to respond with the event
await sleep(getCrawlingInterval() * 2)
if (reindexResult !== null) {
assert(typeof reindexResult === 'boolean', 'expected a boolean value')
}
}
})

after(async () => {
await tearDownEnvironment(previousConfiguration)
INDEXER_CRAWLING_EVENT_EMITTER.removeAllListeners()
indexer.stopAllThreads()
})
})
6 changes: 4 additions & 2 deletions src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,10 @@ export const EVENTS = {

export const INDEXER_CRAWLING_EVENTS = {
CRAWLING_STARTED: 'crawlingStarted',
REINDEX_QUEUE_POP: 'popFromQueue'
// TODO REINDEX_CHAIN
REINDEX_QUEUE_POP: 'popFromQueue', // this is for reindex tx, not chain
// use same names as the corresponding commands for these events
REINDEX_CHAIN: PROTOCOL_COMMANDS.REINDEX_CHAIN,
REINDEX_TX: PROTOCOL_COMMANDS.REINDEX_TX
}

export const EVENT_HASHES: Hashes = {
Expand Down
Loading