Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

testing changes #718

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 54 additions & 41 deletions src/components/Indexer/crawlerThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,48 +285,60 @@ export function checkNewlyIndexedAssets(events: BlocksEvents): void {
})
}

async function retryCrawlerWithDelay(
blockchain: Blockchain,
interval: number = 5000 // in milliseconds, default 5 secs
): Promise<boolean> {
try {
const retryInterval = Math.max(blockchain.getKnownRPCs().length * 3000, interval) // give 2 secs per each one
// try
const result = await startCrawler(blockchain)
if (result) {
INDEXER_LOGGER.info('Blockchain connection succeffully established!')
processNetworkData(blockchain.getProvider(), blockchain.getSigner())
return true
} else {
INDEXER_LOGGER.warn(
`Blockchain connection is not established, retrying again in ${
retryInterval / 1000
} secs....`
)
// delay the next call
await sleep(retryInterval)
// recursively call the same func
return retryCrawlerWithDelay(blockchain, retryInterval)
}
} catch (err) {
INDEXER_LOGGER.error(`Error starting crawler: ${err.message}`)
return false
}
}
// async function retryCrawlerWithDelay(
// blockchain: Blockchain,
// interval: number = 5000 // in milliseconds, default 5 secs
// ): Promise<boolean> {
// try {
// const retryInterval = Math.max(blockchain.getKnownRPCs().length * 3000, interval) // give 2 secs per each one
// // try
// const result = await startCrawler(blockchain)
// const dbActive = await getDatabase(true)
// if (!dbActive || !(await isReachableConnection(dbActive.getConfig().url))) {
// console.log('Crawler cannot start because of DB connection')
// return false
// } else console.log('Crawler OK')
// if (result) {
// INDEXER_LOGGER.info('Blockchain connection succeffully established!')
// processNetworkData(blockchain.getProvider(), blockchain.getSigner())
// return true
// } else {
// INDEXER_LOGGER.warn(
// `Blockchain connection is not established, retrying again in ${
// retryInterval / 1000
// } secs....`
// )
// numCrawlAttempts++
// console.log('NUM CRAWL ATTEMPTS: ' + numCrawlAttempts)
// if (numCrawlAttempts <= MAX_CRAWL_RETRIES) {
// // delay the next call
// await sleep(retryInterval)
// // recursively call the same func
// return retryCrawlerWithDelay(blockchain, retryInterval)
// } else {
// console.log('GIVING UP AFTER MAX RETRIES')
// return false
// }
// }
// } catch (err) {
// INDEXER_LOGGER.error(`Error starting crawler: ${err.message}`)
// return false
// }
// }

// it does not start crawling until the network connectin is ready
async function startCrawler(blockchain: Blockchain): Promise<boolean> {
if ((await blockchain.isNetworkReady()).ready) {
return true
} else {
// try other RPCS if any available (otherwise will just retry the same RPC)
const connectionStatus = await blockchain.tryFallbackRPCs()
if (connectionStatus.ready || (await blockchain.isNetworkReady()).ready) {
return true
}
}
return false
}
// async function startCrawler(blockchain: Blockchain): Promise<boolean> {
// if ((await blockchain.isNetworkReady()).ready) {
// return true
// } else {
// // try other RPCS if any available (otherwise will just retry the same RPC)
// const connectionStatus = await blockchain.tryFallbackRPCs()
// if (connectionStatus.ready || (await blockchain.isNetworkReady()).ready) {
// return true
// }
// }
// return false
// }

parentPort.on('message', (message) => {
if (message.method === INDEXER_MESSAGES.START_CRAWLING) {
Expand All @@ -337,7 +349,8 @@ parentPort.on('message', (message) => {
rpcDetails.chainId,
rpcDetails.fallbackRPCs
)
return retryCrawlerWithDelay(blockchain)
// return retryCrawlerWithDelay(blockchain)
processNetworkData(blockchain.getProvider(), blockchain.getSigner())
} else if (message.method === INDEXER_MESSAGES.REINDEX_TX) {
// reindex a specific transaction

Expand Down
79 changes: 76 additions & 3 deletions src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ReindexTask } from './crawlerThread.js'
import { LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { INDEXER_LOGGER } from '../../utils/logging/common.js'
import {
Blockchain,
EVENTS,
INDEXER_CRAWLING_EVENTS,
INDEXER_MESSAGES,
Expand All @@ -14,6 +15,8 @@ import {
import { CommandStatus, JobStatus } from '../../@types/commands.js'
import { buildJobIdentifier } from './utils.js'
import { create256Hash } from '../../utils/crypt.js'
import { isReachableConnection } from '../../utils/database.js'
import { sleep } from '../../utils/util.js'

// emmit events for node
export const INDEXER_DDO_EVENT_EMITTER = new EventEmitter()
Expand All @@ -23,6 +26,8 @@ let INDEXING_QUEUE: ReindexTask[] = []
// job queue for admin commands or other commands not immediately available
const JOBS_QUEUE: JobStatus[] = []

const MAX_CRAWL_RETRIES = 10
let numCrawlAttempts = 0
export class OceanIndexer {
private db: Database
private networks: RPCS
Expand Down Expand Up @@ -85,15 +90,83 @@ export class OceanIndexer {
return false
}

// it does not start crawling until the network connectin is ready
async startCrawler(blockchain: Blockchain): Promise<boolean> {
if ((await blockchain.isNetworkReady()).ready) {
return true
} else {
// try other RPCS if any available (otherwise will just retry the same RPC)
const connectionStatus = await blockchain.tryFallbackRPCs()
if (connectionStatus.ready || (await blockchain.isNetworkReady()).ready) {
return true
}
}
return false
}

async retryCrawlerWithDelay(
blockchain: Blockchain,
interval: number = 5000 // in milliseconds, default 5 secs
): Promise<boolean> {
try {
const retryInterval = Math.max(blockchain.getKnownRPCs().length * 3000, interval) // give 2 secs per each one
// try
const result = await this.startCrawler(blockchain)
const dbActive = this.getDatabase()
if (!dbActive || !(await isReachableConnection(dbActive.getConfig().url))) {
INDEXER_LOGGER.error(`Giving up start crawling. DB is not online!`)
return false
}
if (result) {
INDEXER_LOGGER.info('Blockchain connection succeffully established!')
// processNetworkData(blockchain.getProvider(), blockchain.getSigner())
return true
} else {
INDEXER_LOGGER.warn(
`Blockchain connection is not established, retrying again in ${
retryInterval / 1000
} secs....`
)
numCrawlAttempts++
if (numCrawlAttempts <= MAX_CRAWL_RETRIES) {
// delay the next call
await sleep(retryInterval)
// recursively call the same func
return this.retryCrawlerWithDelay(blockchain, retryInterval)
} else {
INDEXER_LOGGER.error(
`Giving up start crawling after ${MAX_CRAWL_RETRIES} retries.`
)
return false
}
}
} catch (err) {
INDEXER_LOGGER.error(`Error starting crawler: ${err.message}`)
return false
}
}

// starts crawling for a specific chain
public startThread(chainID: number): Worker | null {
public async startThread(chainID: number): Promise<Worker | null> {
const rpcDetails: SupportedNetwork = this.getSupportedNetwork(chainID)
if (!rpcDetails) {
INDEXER_LOGGER.error(
'Unable to start (unsupported network) a worker thread for chain: ' + chainID
)
return null
}

const blockchain = new Blockchain(
rpcDetails.rpc,
rpcDetails.network,
rpcDetails.chainId,
rpcDetails.fallbackRPCs
)
const canStartWorker = await this.retryCrawlerWithDelay(blockchain)
if (!canStartWorker) {
INDEXER_LOGGER.error(`Cannot start worker thread. check DB and RPC connections!`)
return null
}
const workerData = { rpcDetails }
// see if it exists already, otherwise create a new one
let worker = this.workers[chainID]
Expand All @@ -115,11 +188,11 @@ export class OceanIndexer {
}

// eslint-disable-next-line require-await
public startThreads(): boolean {
public async startThreads(): Promise<boolean> {
let count = 0
for (const network of this.supportedChains) {
const chainId = parseInt(network)
const worker = this.startThread(chainId)
const worker = await this.startThread(chainId)
if (worker) {
// track if we were able to start them all
count++
Expand Down
74 changes: 19 additions & 55 deletions src/test/unit/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,28 @@
import { expect } from 'chai'
import { stub } from 'sinon'
import { assert, expect } from 'chai'
import { describe, it } from 'mocha'
import { OceanIndexer } from '../../../components/Indexer/index.js'
import { RPCS } from '../../../@types/blockchain.js'

class MockDatabase {
indexer = {
retrieve: stub(),
update: stub()
}
}

const mockSupportedNetworks: RPCS = {
'1': { chainId: 1, network: 'mainnet', rpc: 'https://mainnet.rpc', chunkSize: 1000 },
'2': { chainId: 2, network: 'testnet', rpc: 'https://testnet.rpc', chunkSize: 500 }
}
import { getConfiguration } from '../../../utils/index.js'
import { Database } from '../../../components/database/index.js'
import { OceanNodeConfig } from '../../../@types/OceanNode.js'

describe('OceanIndexer', () => {
let oceanIndexer: OceanIndexer
it('should start threads and handle worker events', () => {
const mockDatabase = new MockDatabase()
oceanIndexer = new OceanIndexer(mockDatabase as any, mockSupportedNetworks)

const mockWorker = {
on: stub(),
postMessage: stub()
}

stub(oceanIndexer as any, 'startThreads').callsFake(() => {
oceanIndexer.startThreads = (): boolean => {
try {
const network = '1'

mockWorker.on
.withArgs('message')
.callArgWith(1, { method: 'store-last-indexed-block', network, data: 42 })

mockWorker.on.withArgs('error').callArgWith(1, new Error('Worker error'))

mockWorker.on.withArgs('exit').callArgWith(1, 0)

return oceanIndexer.startThreads()
} catch (error) {
console.error(error)
}
}
})

// stub(oceanIndexer as any, 'createWorker').returns(mockWorker)

oceanIndexer.startThreads()

// eslint-disable-next-line no-unused-expressions
expect(mockWorker.postMessage.calledOnceWith({ method: 'start-crawling' })).to.be
.false
// eslint-disable-next-line no-unused-expressions
expect(mockWorker.on.calledThrice).to.be.false
let mockDatabase: Database
let config: OceanNodeConfig
before(async () => {
config = await getConfiguration(true)
mockDatabase = await new Database(config.dbConfig)
})

after(() => {
oceanIndexer.stopAllThreads()
it('should start threads and handle worker events', async () => {
oceanIndexer = new OceanIndexer(mockDatabase, config.supportedNetworks)
assert(oceanIndexer, 'indexer should not be null')
expect(oceanIndexer.getDatabase().getConfig()).to.be.equal(mockDatabase.getConfig())
expect(oceanIndexer.getIndexingQueue().length).to.be.equal(0)
expect(oceanIndexer.getJobsPool().length).to.be.equal(0)
// cannot start threads without DB connection
expect(await oceanIndexer.startThreads()).to.be.equal(false)
// there are no worker threads available
expect(oceanIndexer.stopAllThreads()).to.be.equal(false)
})
})
2 changes: 1 addition & 1 deletion src/test/unit/ocean.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('Status command tests', async () => {
after(async () => {
// Restore original local setup / env variables after test
await tearDownEnvironment(envOverrides)
oceanIndexer.stopAllThreads()
await oceanIndexer.stopAllThreads()
})

it('Ocean Node instance', () => {
Expand Down
4 changes: 2 additions & 2 deletions src/utils/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ let dbConnection: Database = null

// lazy load env configuration and then db configuration
// we should be able to use this every where without dep cycle issues
export async function getDatabase(): Promise<Database> {
if (!dbConnection) {
export async function getDatabase(forceReload: boolean = false): Promise<Database> {
if (!dbConnection || forceReload) {
const { dbConfig } = await getConfiguration(true)
if (dbConfig && dbConfig.url) {
dbConnection = await new Database(dbConfig)
Expand Down
Loading