Skip to content

Commit

Permalink
fix: batch asset synchronising operations
Browse files Browse the repository at this point in the history
- Batches the asset sync operation into lots of 500
- Batches the metadata fetch operation when loading from the DB
- Reduces the initial metadata fetch routine to 3 attempts
  • Loading branch information
rhyslbw committed Mar 23, 2021
1 parent 87568d5 commit 637d607
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 44 deletions.
120 changes: 93 additions & 27 deletions packages/api-cardano-db-hasura/src/DataSyncController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,64 @@ export class DataSyncController {
initial: new DataFetcher<number>(
'MetadataSynchronizer',
async () => {
const assets = await this.hasuraClient.getAssetsWithoutMetadata({ _lte: 5 })
if (assets.length > 0) {
await this.fetchAndApplyMetadata(assets, 'Assets missing metadata')
const batchSize = 2500
const assetsWithoutMetadataCount = await this.hasuraClient.assetsWithoutMetadataCount({ _lte: 2 })
this.logger.debug(
'new assets without metadata count',
{ module: 'DataSyncController', value: assetsWithoutMetadataCount }
)
const batchQty = Math.ceil(assetsWithoutMetadataCount / batchSize)
let totalCount = 0
for (const i of Array(batchQty).keys()) {
const assetsInBatch = await this.hasuraClient.getAssetsWithoutMetadata(
{ _lte: 3 }, { limit: batchSize, offset: batchSize * i }
)
this.logger.debug(
'assets without metadata in batch',
{
module: 'DataSyncController',
value: { batch: i, qty: assetsInBatch.length }
}
)
if (assetsInBatch.length > 0) {
await this.fetchAndApplyMetadata(assetsInBatch, 'Assets missing metadata')
}
totalCount = totalCount + assetsInBatch.length
}
return assets.length
return totalCount
},
pollingInterval.initial,
this.logger
),
ongoing: new DataFetcher<number>(
'MetadataSynchronizerRefresh',
async () => {
const assets = await this.hasuraClient.getAssetsIncMetadata({ _gt: 5 })
if (assets.length > 0) {
await this.fetchAndApplyMetadata(assets, 'All assets')
const batchSize = 2500
const assetsEligibleForMetadataRefreshCount =
await this.hasuraClient.assetsEligibleForMetadataRefreshCount({ _gt: 2 })
this.logger.debug(
'assets eligible for metadata refresh count',
{ module: 'DataSyncController', value: assetsEligibleForMetadataRefreshCount }
)
const batchQty = Math.ceil(assetsEligibleForMetadataRefreshCount / batchSize)
let totalCount = 0
for (const i of Array(batchQty).keys()) {
const assetsInBatch = await this.hasuraClient.getAssetsIncMetadata(
{ _gt: 2 }, { limit: batchSize, offset: batchSize * i }
)
this.logger.debug(
'assets without metadata in batch',
{
module: 'DataSyncController',
value: { batch: i, qty: assetsInBatch.length }
}
)
if (assetsInBatch.length > 0) {
await this.fetchAndApplyMetadata(assetsInBatch, 'All assets')
}
totalCount = totalCount + assetsInBatch.length
}
return assets.length
return totalCount
},
pollingInterval.ongoing,
this.logger
Expand All @@ -100,26 +141,48 @@ export class DataSyncController {
this.assetSynchronizer = new DataFetcher<number>(
'AssetTableSynchronizer',
async () => {
const distinctAssetsInTokens = await this.hasuraClient.getDistinctAssetsInTokens()
this.logger.debug('distinct asset IDs from tokens', { module: 'DataSyncController', value: distinctAssetsInTokens.length })
const assetIds = await this.hasuraClient.getAssetIds()
this.logger.debug('fetched asset IDs', { module: 'DataSyncController', value: assetIds.length })
const diff = distinctAssetsInTokens
.filter(asset => !assetIds.includes(asset.assetId))
.map(asset => ({
assetId: asset.assetId,
assetName: asset.assetName,
fingerprint: assetFingerprint(asset),
policyId: asset.policyId,
metadataFetchAttempts: 0
})
const batchSize = 500
const distinctAssetsInTokensCount = await this.hasuraClient.distinctAssetsInTokensCount()
this.logger.debug(
'distinct assets in tokens count',
{ module: 'DataSyncController', value: distinctAssetsInTokensCount }
)
const batchQty = Math.ceil(distinctAssetsInTokensCount / batchSize)
let totalCount = 0
for (const i of Array(batchQty).keys()) {
const assetsInBatch = await this.hasuraClient.getDistinctAssetsInTokens(
{ limit: batchSize, offset: batchSize * i }
)
const assetsAlreadyInDb =
await this.hasuraClient.getAssetsById(assetsInBatch.map(asset => asset.assetId))
this.logger.debug(
'asset IDs from tokens',
{
module: 'DataSyncController',
value: { batch: i, qty: assetsInBatch.length, existing: assetsAlreadyInDb.length }
}
)
this.logger.debug('asset IDs diff', { module: 'DataSyncController', value: diff.length })
if (diff.length > 0) {
await this.hasuraClient.insertAssets(diff)
this.logger.debug('synchronised assets table from tokens', { module: 'DataSyncController', value: diff.length })
const newAssets = assetsInBatch
.filter(asset => assetsAlreadyInDb.find(existingAsset =>
existingAsset.assetId === asset.assetId) === undefined)
.map(asset => ({
assetId: asset.assetId,
assetName: asset.assetName,
fingerprint: assetFingerprint(asset),
policyId: asset.policyId,
metadataFetchAttempts: 0
}))
this.logger.debug('asset IDs diff', { module: 'DataSyncController', value: newAssets.length })
if (newAssets.length > 0) {
totalCount = totalCount + newAssets.length
await this.hasuraClient.insertAssets(newAssets)
this.logger.debug(
'synchronised assets table from tokens',
{ module: 'DataSyncController', value: { batch: i, qty: newAssets.length } }
)
}
}
return diff.length
return totalCount
},
60 * 1000,
this.logger
Expand All @@ -138,7 +201,10 @@ export class DataSyncController {
asset: assets.find(asset => asset.assetId === metadata.subject)
}))
.filter(({ asset, metadataHash }) => metadataHash !== asset.metadataHash)
this.logger.debug('Metadata with updates to apply', { module: 'DataSyncController', value: metadataWithAssetAndHash.length })
this.logger.debug(
'Metadata with updates to apply',
{ module: 'DataSyncController', value: metadataWithAssetAndHash.length }
)
if (metadataWithAssetAndHash.length > 0) {
for (const { metadata, metadataHash } of metadataWithAssetAndHash) {
await this.hasuraClient.addMetadata(metadata, metadataHash)
Expand Down
133 changes: 118 additions & 15 deletions packages/api-cardano-db-hasura/src/HasuraClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export class HasuraClient {
}

public async initialize () {
this.logger.info('Initializing Hasura', { module: 'HasuraClient' })
this.logger.info('Initializing', { module: 'HasuraClient' })
await this.applySchemaAndMetadata()
await pRetry(async () => {
this.schema = await this.buildHasuraSchema()
Expand All @@ -146,7 +146,7 @@ export class HasuraClient {
this.logger
)
})
this.logger.info('Hasura initialized', { module: 'HasuraClient' })
this.logger.info('Initialized', { module: 'HasuraClient' })
await this.currentProtocolVersionFetcher.initialize()
await this.adaCirculatingSupplyFetcher.initialize()
}
Expand Down Expand Up @@ -350,36 +350,80 @@ export class HasuraClient {
}
}

public async getDistinctAssetsInTokens (): Promise<Asset[]> {
public async getDistinctAssetsInTokens (options?: { limit: number, offset: number }): Promise<Asset[]> {
const result = await this.client.query({
query: gql`query {
tokens (distinct_on: assetId) {
query: gql`query DistinctAssetsInTokens (
$limit: Int
$offset: Int
) {
tokens (
distinct_on: assetId
limit: $limit
order_by: { assetId: asc }
offset: $offset
) {
assetId
assetName
policyId
}
}`
}`,
variables: {
limit: options?.limit,
offset: options?.offset
}
})
return result.data.tokens as Asset[]
}

public async getAssetIds (): Promise<Asset['assetId'][]> {
public async distinctAssetsInTokensCount (): Promise<number> {
const result = await this.client.query({
query: gql`query {
assets {
assetId
tokens_aggregate (distinct_on: assetId) {
aggregate {
count
}
}
}`
})
return result.data.assets.map((asset: Asset) => asset.assetId)
return result.data.tokens_aggregate.aggregate.count
}

public async assetsEligibleForMetadataRefreshCount (metadataFetchAttempts: IntComparisonExp): Promise<number> {
try {
const result = await this.client.query({
query: gql`query AssetsEligibleForMetadataRefreshCount (
$metadataFetchAttempts: Int_comparison_exp!
) {
assets_aggregate (
where: {
metadataFetchAttempts: $metadataFetchAttempts
}) {
aggregate {
count
}
}
}`,
variables: {
metadataFetchAttempts
}
})
return result.data.assets_aggregate.aggregate.count
} catch (error) {
this.logger.error(error)
throw error
}
}

public async getAssetsIncMetadata (metadataFetchAttempts?: IntComparisonExp): Promise<Asset[]> {
public async getAssetsIncMetadata (metadataFetchAttempts: IntComparisonExp, options: { limit: number, offset: number }): Promise<Asset[]> {
const result = await this.client.query({
query: gql`query AssetsIncMetadata (
$metadataFetchAttempts: Int_comparison_exp
$limit: Int
$offset: Int
){
assets (
limit: $limit
offset: $offset
where: {
metadataFetchAttempts: $metadataFetchAttempts
}
Expand All @@ -392,7 +436,9 @@ export class HasuraClient {
}
}`,
variables: {
metadataFetchAttempts
metadataFetchAttempts,
limit: options.limit,
offset: options.offset
}
})
return result.data.assets
Expand All @@ -417,6 +463,25 @@ export class HasuraClient {
return new BigNumber(result.data.assets_aggregate.aggregate.count).isGreaterThan(0)
}

public async getAssetsById (assetIds: Asset['assetId'][]): Promise<Asset[]> {
const result = await this.client.query({
query: gql`query IdsOfAssetsWithoutMetadata (
$assetIds: [String!]!
){
assets (
where: {
assetId: { _in: $assetIds }
}) {
assetId
}
}`,
variables: {
assetIds
}
})
return result.data.assets
}

public async getAssetsWithoutFingerprint (limit?: number): Promise<Pick<Asset, 'assetId' | 'assetName' | 'policyId'>[]> {
const result = await this.client.query({
query: gql`query AssetsWithoutFingerprint (
Expand All @@ -442,12 +507,49 @@ export class HasuraClient {
}))
}

public async getAssetsWithoutMetadata (metadataFetchAttempts: IntComparisonExp): Promise<Asset[]> {
public async assetsWithoutMetadataCount (metadataFetchAttempts: IntComparisonExp): Promise<number> {
try {
const result = await this.client.query({
query: gql`query AssetsWithoutMetadataCount (
$metadataFetchAttempts: Int_comparison_exp!
) {
assets_aggregate (
where: {
_and: [
{ metadataFetchAttempts: $metadataFetchAttempts },
{ metadataHash: { _is_null: true }}
]
}) {
aggregate {
count
}
}
}`,
variables: {
metadataFetchAttempts
}
})
return result.data.assets_aggregate.aggregate.count
} catch (error) {
this.logger.error(error)
throw error
}
}

public async getAssetsWithoutMetadata (
metadataFetchAttempts: IntComparisonExp,
options?: { limit: number, offset: number }
): Promise<Asset[]> {
const result = await this.client.query({
query: gql`query IdsOfAssetsWithoutMetadata (
$limit: Int
$metadataFetchAttempts: Int_comparison_exp!
$offset: Int
){
assets (
limit: $limit
order_by: { assetId: asc }
offset: $offset
where: {
_and: [
{ metadataFetchAttempts: $metadataFetchAttempts },
Expand All @@ -459,7 +561,9 @@ export class HasuraClient {
}
}`,
variables: {
metadataFetchAttempts
metadataFetchAttempts,
limit: options?.limit,
offset: options?.offset
}
})
return result.data.assets
Expand Down Expand Up @@ -549,7 +653,6 @@ export class HasuraClient {
}

public incrementMetadataFetchAttempts (assetId: Asset['assetId']) {
this.logger.debug('incrementing asset metadata fetch attempt', { module: 'HasuraClient', value: assetId })
return this.client.mutate({
mutation: gql`mutation IncrementAssetMetadataFetchAttempt(
$assetId: String!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ describe('transactions', () => {
})
})


describe('transactions with tokens', () => {
it('shows the tokens minted and output', async () => {
const result = await client.query({
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export class Server {
return
}
if (result.data.cardanoDbMeta.initialized) {
this.logger.info('CardanoDB is initialized')
this.logger.info('DB ready')
await clearIntervalAsync(this.syncProgress)
} else {
this.logger.info(`DB sync progress: ${result.data.cardanoDbMeta.syncPercentage} %`)
Expand Down

0 comments on commit 637d607

Please sign in to comment.