Skip to content

Commit

Permalink
refactor: Optimize input and output storage
Browse files Browse the repository at this point in the history
1. Storage others' cells tx and locks into tx-lock for search others' addresses.
2. Get transaction detail by rpc.
  • Loading branch information
yanguoyu committed Jun 19, 2023
1 parent 861c272 commit cf25d37
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import AddressMeta from '../../database/address/meta'
import IndexerTxHashCache from '../../database/chain/entities/indexer-tx-hash-cache'
import RpcService from '../../services/rpc-service'
import TransactionWithStatus from '../../models/chain/transaction-with-status'
import SyncInfoEntity from '../../database/chain/entities/sync-info'
import { TransactionCollector, CellCollector, CkbIndexer } from '@nervina-labs/ckb-indexer'

export default class IndexerCacheService {
private addressMetas: AddressMeta[]
private rpcService: RpcService
private walletId: string
private indexer: CkbIndexer
#cacheBlockNumberEntity?: SyncInfoEntity

constructor(walletId: string, addressMetas: AddressMeta[], rpcService: RpcService, indexer: CkbIndexer) {
for (const addressMeta of addressMetas) {
Expand All @@ -25,16 +27,6 @@ export default class IndexerCacheService {
this.indexer = indexer
}

private async countTxHashes(): Promise<number> {
return getConnection()
.getRepository(IndexerTxHashCache)
.createQueryBuilder()
.where({
walletId: this.walletId,
})
.getCount()
}

private async getTxHashes(): Promise<IndexerTxHashCache[]> {
return getConnection()
.getRepository(IndexerTxHashCache)
Expand Down Expand Up @@ -79,6 +71,8 @@ export default class IndexerCacheService {
}

private async fetchTxMapping(): Promise<Map<string, Array<{ address: string; lockHash: string }>>> {
const lastCacheBlockNumber = await this.getCachedBlockNumber()
const currentHeaderBlockNumber = await this.rpcService.getTipBlockNumber()
const mappingsByTxHash = new Map()
for (const addressMeta of this.addressMetas) {
const lockScripts = [
Expand All @@ -94,8 +88,10 @@ export default class IndexerCacheService {
lock: {
code_hash: lockScript.codeHash,
hash_type: lockScript.hashType,
args: lockScript.args,
args: lockScript.args
},
fromBlock: lastCacheBlockNumber.value,
toBlock: currentHeaderBlockNumber,
},
this.indexer.ckbRpcUrl,
{
Expand Down Expand Up @@ -137,6 +133,8 @@ export default class IndexerCacheService {
args: lockScript.args.slice(0, 42),
},
argsLen,
fromBlock: lastCacheBlockNumber.value,
toBlock: currentHeaderBlockNumber,
})

for await (const cell of cellCollector.collect()) {
Expand All @@ -153,17 +151,33 @@ export default class IndexerCacheService {
return mappingsByTxHash
}

private async getCachedBlockNumber() {
if (!this.#cacheBlockNumberEntity) {
this.#cacheBlockNumberEntity = (await getConnection().getRepository(SyncInfoEntity).findOne({ name: SyncInfoEntity.getLastCachedKey(this.walletId) })) ??
SyncInfoEntity.fromObject({
name: SyncInfoEntity.getLastCachedKey(this.walletId),
value: '0x0'
})
}

return this.#cacheBlockNumberEntity
}

private async saveCacheBlockNumber(cacheBlockNumber: string) {
let cacheBlockNumberEntity = await this.getCachedBlockNumber()
cacheBlockNumberEntity.value = cacheBlockNumber
await getConnection().manager.save(cacheBlockNumberEntity)
}

public async upsertTxHashes(): Promise<string[]> {
const tipBlockNumber = await this.rpcService.getTipBlockNumber()
const mappingsByTxHash = await this.fetchTxMapping()

const fetchedTxHashes = [...mappingsByTxHash.keys()]
const fetchedTxHashCount = fetchedTxHashes.reduce((sum, txHash) => sum + mappingsByTxHash.get(txHash)!.length, 0)

const txCount = await this.countTxHashes()
if (fetchedTxHashCount === txCount) {
if (!fetchedTxHashes.length) {
await this.saveCacheBlockNumber(tipBlockNumber)
return []
}

const txMetasCaches = await this.getTxHashes()
const cachedTxHashes = txMetasCaches.map(meta => meta.txHash.toString())

Expand All @@ -172,6 +186,7 @@ export default class IndexerCacheService {
const newTxHashes = fetchedTxHashes.filter(hash => !cachedTxHashesSet.has(hash))

if (!newTxHashes.length) {
await this.saveCacheBlockNumber(tipBlockNumber)
return []
}

Expand Down Expand Up @@ -219,6 +234,7 @@ export default class IndexerCacheService {
}
}

await this.saveCacheBlockNumber(tipBlockNumber)
return newTxHashes
}

Expand Down
8 changes: 7 additions & 1 deletion packages/neuron-wallet/src/block-sync-renderer/sync/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export default class Queue {
#rpcService: RpcService
#indexerConnector: Connector | undefined
#checkAndSaveQueue: QueueObject<{ txHashes: CKBComponents.Hash[], params: unknown }> | undefined
#lockArgsSet: Set<string> = new Set()

#multiSignBlake160s: string[]
#anyoneCanPayLockHashes: string[]
Expand All @@ -44,6 +45,11 @@ export default class Queue {
this.#lockHashes = AddressParser.batchToLockHash(this.#addresses.map(meta => meta.address))

const blake160s = this.#addresses.map(meta => meta.blake160)
this.#lockArgsSet = new Set(blake160s.map(blake160 => [
blake160,
Multisig.hash([blake160]),
SystemScriptInfo.generateSecpScript(blake160).computeHash().slice(0, 42)
]).flat())
this.#multiSignBlake160s = blake160s.map(blake160 => Multisig.hash([blake160]))
this.#anyoneCanPayLockHashes = blake160s.map(b =>
this.#assetAccountInfo.generateAnyoneCanPayScript(b).computeHash()
Expand Down Expand Up @@ -215,7 +221,7 @@ export default class Queue {
}
}
}
await TransactionPersistor.saveFetchTx(tx)
await TransactionPersistor.saveFetchTx(tx, this.#lockArgsSet)
for (const info of anyoneCanPayInfos) {
await AssetAccountService.checkAndSaveAssetAccountWhenSync(info.tokenID, info.blake160)
}
Expand Down
14 changes: 14 additions & 0 deletions packages/neuron-wallet/src/database/chain/entities/sync-info.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,18 @@ export default class SyncInfo {
type: 'varchar',
})
value!: string

static fromObject(params: {
name: string,
value: string
}) {
const res = new SyncInfo()
res.name = params.name
res.value = params.value
return res
}

static getLastCachedKey(walletId: string) {
return `lastCachedBlockNumber_${walletId}`
}
}
24 changes: 24 additions & 0 deletions packages/neuron-wallet/src/database/chain/entities/tx-lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { BaseEntity, Entity, PrimaryColumn } from 'typeorm'

@Entity()
export default class TxLock extends BaseEntity {
@PrimaryColumn({
type: 'varchar'
})
transactionHash!: string

@PrimaryColumn({
type: 'varchar',
})
lockHash!: string

static fromObject(obj: {
txHash: string
lockHash: string
}) {
const res = new TxLock()
res.transactionHash = obj.txHash
res.lockHash = obj.lockHash
return res
}
}
3 changes: 2 additions & 1 deletion packages/neuron-wallet/src/database/chain/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import SyncInfoEntity from './entities/sync-info'
import IndexerTxHashCache from './entities/indexer-tx-hash-cache'
import MultisigOutput from './entities/multisig-output'
import SyncProgress from './entities/sync-progress'
import TxLock from './entities/tx-lock'

/*
* Clean local sqlite storage
*/
export const clean = async (clearAllLightClientData?: boolean) => {
await Promise.all([
...[InputEntity, OutputEntity, TransactionEntity, IndexerTxHashCache, MultisigOutput].map(entity => {
...[InputEntity, OutputEntity, TransactionEntity, IndexerTxHashCache, MultisigOutput, TxLock].map(entity => {
return getConnection()
.getRepository(entity)
.clear()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import {MigrationInterface, QueryRunner, TableIndex} from "typeorm";

export class TxLock1684488676083 implements MigrationInterface {

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "tx_lock" ("lockHash" varchar NOT NULL, "transactionHash" varchar NOT NULL, PRIMARY KEY ("transactionHash", "lockHash"))`)
await queryRunner.createIndex("tx_lock", new TableIndex({ columnNames: ['lockHash'] }))
await queryRunner.createIndex("tx_lock", new TableIndex({ columnNames: ['transactionHash'] }))
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "tx_lock"`)
}

}
8 changes: 6 additions & 2 deletions packages/neuron-wallet/src/database/chain/ormconfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import AddressDescription from './entities/address-description'
import MultisigConfig from './entities/multisig-config'
import MultisigOuput from './entities/multisig-output'
import SyncProgress from './entities/sync-progress'
import TxLock from './entities/tx-lock'

import { InitMigration1566959757554 } from './migrations/1566959757554-InitMigration'
import { AddTypeAndHasData1567144517514 } from './migrations/1567144517514-AddTypeAndHasData'
Expand Down Expand Up @@ -51,6 +52,7 @@ import { UpdateOutputChequeLockHash1652945662504 } from './migrations/1652945662
import { RemoveAddressesMultisigConfig1651820157100 } from './migrations/1651820157100-RemoveAddressesMultisigConfig'
import { AddSyncProgress1676441837373 } from './migrations/1676441837373-AddSyncProgress'
import { AddTypeSyncProgress1681360188494 } from './migrations/1681360188494-AddTypeSyncProgress'
import { TxLock1684488676083 } from './migrations/1684488676083-TxLock'

export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError'

Expand Down Expand Up @@ -83,7 +85,8 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
AddressDescription,
MultisigConfig,
MultisigOuput,
SyncProgress
SyncProgress,
TxLock,
],
migrations: [
InitMigration1566959757554,
Expand Down Expand Up @@ -117,7 +120,8 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
UpdateOutputChequeLockHash1652945662504,
RemoveAddressesMultisigConfig1651820157100,
AddSyncProgress1676441837373,
AddTypeSyncProgress1681360188494
AddTypeSyncProgress1681360188494,
TxLock1684488676083,
],
logger: 'simple-console',
logging,
Expand Down
61 changes: 43 additions & 18 deletions packages/neuron-wallet/src/services/tx/transaction-persistor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import OutPoint from '../../models/chain/out-point'
import Output, { OutputStatus } from '../../models/chain/output'
import Transaction, { TransactionStatus } from '../../models/chain/transaction'
import Input from '../../models/chain/input'
import TxLockEntity from '../../database/chain/entities/tx-lock'
import { CHEQUE_ARGS_LENGTH, DEFAULT_ARGS_LENGTH } from '../../utils/const'

export enum TxSaveType {
Sent = 'sent',
Expand Down Expand Up @@ -42,7 +44,7 @@ export class TransactionPersistor {
// After the tx is fetched:
// 1. If the tx is not persisted before fetching, output = live, input = dead
// 2. If the tx is already persisted before fetching, output = live, input = dead
private static saveWithFetch = async (transaction: Transaction): Promise<TransactionEntity> => {
private static saveWithFetch = async (transaction: Transaction, lockArgsSet?: Set<string> ): Promise<TransactionEntity> => {
const connection = getConnection()
const txEntity: TransactionEntity | undefined = await connection
.getRepository(TransactionEntity)
Expand Down Expand Up @@ -190,14 +192,15 @@ export class TransactionPersistor {
return txEntity
}

return TransactionPersistor.create(transaction, OutputStatus.Live, OutputStatus.Dead)
return TransactionPersistor.create(transaction, OutputStatus.Live, OutputStatus.Dead, lockArgsSet)
}

// only create, check exist before this
public static create = async (
transaction: Transaction,
outputStatus: OutputStatus,
inputStatus: OutputStatus
inputStatus: OutputStatus,
lockArgsSet?: Set<string>,
): Promise<TransactionEntity> => {
const connection = getConnection()
const tx = new TransactionEntity()
Expand Down Expand Up @@ -297,22 +300,32 @@ export class TransactionPersistor {
}
return output
})

const sliceSize = 100
let currentWalletInputs: InputEntity[] = inputs
let currentWalletOutputs: OutputEntity[] = outputs
let txLocks: TxLockEntity[] = []
if (lockArgsSet?.size) {
currentWalletInputs = inputs.filter(v => this.isCurrentWalletCell(v, lockArgsSet))
currentWalletOutputs = outputs.filter(v => this.isCurrentWalletCell(v, lockArgsSet))
txLocks = [...new Set(
[
...inputs.filter(v => v.lockHash && !this.isCurrentWalletCell(v, lockArgsSet))
.map(v => v.lockHash!),
...outputs.filter(v => !this.isCurrentWalletCell(v, lockArgsSet))
.map(v => v.lockHash),
]
)].map(v => TxLockEntity.fromObject({ txHash: tx.hash, lockHash: v}))
}

const chunk = 100
const queryRunner = connection.createQueryRunner()
await TransactionPersistor.waitUntilTransactionFinished(queryRunner)
await queryRunner.startTransaction()
try {
await queryRunner.manager.save(tx)
for (const slice of ArrayUtils.eachSlice(inputs, sliceSize)) {
await queryRunner.manager.save(slice)
}
for (const slice of ArrayUtils.eachSlice(previousOutputs, sliceSize)) {
await queryRunner.manager.save(slice)
}
for (const slice of ArrayUtils.eachSlice(outputs, sliceSize)) {
await queryRunner.manager.save(slice)
}
await queryRunner.manager.save(currentWalletInputs, { chunk })
await queryRunner.manager.save(previousOutputs, { chunk })
await queryRunner.manager.save(currentWalletOutputs, { chunk })
await queryRunner.manager.save(txLocks, { chunk })
await queryRunner.commitTransaction()
} catch (err) {
logger.error('Database:\tcreate transaction error:', err)
Expand Down Expand Up @@ -342,25 +355,27 @@ export class TransactionPersistor {
// when fetch a transaction, use TxSaveType.Fetch
public static convertTransactionAndSave = async (
transaction: Transaction,
saveType: TxSaveType
saveType: TxSaveType,
lockArgsSet?: Set<string>,
): Promise<TransactionEntity> => {
const tx: Transaction = transaction

let txEntity: TransactionEntity
if (saveType === TxSaveType.Sent) {
txEntity = await TransactionPersistor.saveWithSent(tx)
} else if (saveType === TxSaveType.Fetch) {
txEntity = await TransactionPersistor.saveWithFetch(tx)
txEntity = await TransactionPersistor.saveWithFetch(tx, lockArgsSet)
} else {
throw new Error('Error TxSaveType!')
}
return txEntity
}

public static saveFetchTx = async (transaction: Transaction): Promise<TransactionEntity> => {
public static saveFetchTx = async (transaction: Transaction, lockArgsSet?: Set<string>): Promise<TransactionEntity> => {
const txEntity: TransactionEntity = await TransactionPersistor.convertTransactionAndSave(
transaction,
TxSaveType.Fetch
TxSaveType.Fetch,
lockArgsSet
)
return txEntity
}
Expand All @@ -373,6 +388,16 @@ export class TransactionPersistor {
const txEntity: TransactionEntity = await TransactionPersistor.convertTransactionAndSave(tx, TxSaveType.Sent)
return txEntity
}

private static isCurrentWalletCell(cell: InputEntity | OutputEntity, lockArgsSet: Set<string>) {
return (cell.lockArgs && (
lockArgsSet.has(cell.lockArgs)
|| (
cell.lockArgs.length === CHEQUE_ARGS_LENGTH
&& [cell.lockArgs.slice(0, DEFAULT_ARGS_LENGTH), `0x${cell.lockArgs.slice(DEFAULT_ARGS_LENGTH)}`].some(v => lockArgsSet.has(v))
)
))
}
}

export default TransactionPersistor
Loading

0 comments on commit cf25d37

Please sign in to comment.