Skip to content

Commit

Permalink
feat: Split full node and light client db file.
Browse files Browse the repository at this point in the history
  • Loading branch information
yanguoyu committed Jan 17, 2024
1 parent ae34b2b commit 976b132
Show file tree
Hide file tree
Showing 53 changed files with 350 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { In, getConnection } from 'typeorm'
import { In } from 'typeorm'
import { queue } from 'async'
import AddressMeta from '../../database/address/meta'
import IndexerTxHashCache from '../../database/chain/entities/indexer-tx-hash-cache'
import RpcService from '../../services/rpc-service'
import TransactionWithStatus from '../../models/chain/transaction-with-status'
import SyncInfoEntity from '../../database/chain/entities/sync-info'
import { getConnection } from '../../database/chain/connection'
import { TransactionCollector, CellCollector, Indexer as CkbIndexer } from '@ckb-lumos/ckb-indexer'

export default class IndexerCacheService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import { DepType } from '../../models/chain/cell-dep'
import { molecule } from '@ckb-lumos/codec'
import { blockchain } from '@ckb-lumos/base'
import type { Base } from '@ckb-lumos/rpc/lib/Base'
import { getConnection } from 'typeorm'
import { BI } from '@ckb-lumos/bi'
import IndexerCacheService from './indexer-cache-service'
import { ScriptType } from '@ckb-lumos/ckb-indexer/lib/type'
import { scriptToAddress } from '../../utils/scriptAndAddress'
import NetworksService from '../../services/networks'
import { getConnection } from '../../database/chain/connection'

const unpackGroup = molecule.vector(blockchain.OutPoint)

Expand Down Expand Up @@ -168,7 +168,7 @@ export default class LightSynchronizer extends Synchronizer {
updatedSyncProgress.push(currentSyncProgress)
}
})
await getConnection().manager.save(updatedSyncProgress, { chunk: 100 })
await getConnection('light').manager.save(updatedSyncProgress, { chunk: 100 })
}

private async initSyncProgress(appendScripts: AppendScript[] = []) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { getConnection } from 'typeorm'
import { scriptToAddress } from '../../utils/scriptAndAddress'
import OutputEntity from '../../database/chain/entities/output'
import NetworksService from '../../services/networks'
import Output from '../../models/chain/output'
import OutPoint from '../../models/chain/out-point'
import Transaction from '../../models/chain/transaction'
import SystemScriptInfo from '../../models/system-script-info'
import { getConnection } from '../../database/chain/connection'

export interface AnyoneCanPayInfo {
tokenID: string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { getConnection } from 'typeorm'
import { CONNECTION_NOT_FOUND_NAME } from '../database/chain/ormconfig'
import { FailedTransaction, TransactionPersistor } from '../services/tx'
import RpcService from '../services/rpc-service'
import NetworksService from '../services/networks'
import Transaction, { TransactionStatus } from '../models/chain/transaction'
import TransactionWithStatus from '../models/chain/transaction-with-status'
import logger from '../utils/logger'
import { getConnection } from '../database/chain/connection'
import { interval } from 'rxjs'

type TransactionDetail = {
Expand Down
4 changes: 4 additions & 0 deletions packages/neuron-wallet/src/controllers/app/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import SyncApiController from '../../controllers/sync-api'
import { SETTINGS_WINDOW_TITLE } from '../../utils/const'
import { stopCkbNode } from '../../services/ckb-runner'
import { CKBLightRunner } from '../../services/light-runner'
import { migrateDBFile } from '../../database/chain/ormconfig'
import { MAINNET_GENESIS_HASH, TESTNET_GENESIS_HASH } from '../../models/network'

const app = electronApp

Expand All @@ -37,6 +39,8 @@ export default class AppController {
}

public start = async () => {
migrateDBFile(TESTNET_GENESIS_HASH)
migrateDBFile(MAINNET_GENESIS_HASH)
registerListeners()

if (!env.isTestMode) {
Expand Down
17 changes: 17 additions & 0 deletions packages/neuron-wallet/src/database/chain/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { getConnection as originGetConnection } from 'typeorm'
import { NetworkType } from '../../models/network'
import NetworksService from '../../services/networks'

export type ConnectionName = 'light' | 'full'

export function getCurrentConnectionName(): ConnectionName {
return NetworksService.getInstance().getCurrent()?.type === NetworkType.Light ? 'light' : 'full'
}

export function getConnection(connectionName: ConnectionName = getCurrentConnectionName()) {
const connection = originGetConnection(connectionName)
if (!connection) {
throw new Error(`The connection ${connectionName} should be initialized before use`)
}
return connection
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ export default class MultisigConfig {
return multisigConfig
}

public cloneIgnoreBlockNumber(): MultisigConfig {
const multisigConfig = new MultisigConfig()

multisigConfig.walletId = this.walletId
multisigConfig.r = this.r
multisigConfig.m = this.m
multisigConfig.n = this.n
multisigConfig.blake160s = this.blake160s
if (this.alias) {
multisigConfig.alias = this.alias
}
return multisigConfig
}

@AfterInsert()
emitInsert() {
this.changed('AfterInsert')
Expand Down
2 changes: 1 addition & 1 deletion packages/neuron-wallet/src/database/chain/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { getConnection } from 'typeorm'
import MultisigOutputChangedSubject from '../../models/subjects/multisig-output-db-changed-subject'
import SyncProgressService from '../../services/sync-progress'
import InputEntity from './entities/input'
Expand All @@ -9,6 +8,7 @@ import IndexerTxHashCache from './entities/indexer-tx-hash-cache'
import MultisigOutput from './entities/multisig-output'
import SyncProgress from './entities/sync-progress'
import TxLock from './entities/tx-lock'
import { getConnection } from '../../database/chain/connection'

/*
* Clean local sqlite storage
Expand Down
56 changes: 47 additions & 9 deletions packages/neuron-wallet/src/database/chain/ormconfig.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createConnection, getConnectionOptions, getConnection } from 'typeorm'
import { SqliteConnectionOptions } from 'typeorm/driver/sqlite/SqliteConnectionOptions'
import path from 'path'
import fs from 'fs'

import logger from '../../utils/logger'
import env from '../../env'
Expand Down Expand Up @@ -60,23 +61,33 @@ import { IndexerTxHashCacheRemoveField1701234043431 } from './migrations/1701234
import { CreateCellLocalInfo1701234043432 } from './migrations/1701234043432-CreateCellLocalInfo'
import { RenameSyncProgress1702781527414 } from './migrations/1702781527414-RenameSyncProgress'
import { RemoveAddressInIndexerCache1704357651876 } from './migrations/1704357651876-RemoveAddressInIndexerCache'
import { ConnectionName } from './connection'
import AddressSubscribe from './subscriber/address-subscriber'
import MultisigConfigSubscribe from './subscriber/multisig-config-subscriber'
import TxDescriptionSubscribe from './subscriber/tx-description-subscriber'
import SudtTokenInfoSubscribe from './subscriber/sudt-token-info-subscriber'
import AssetAccountSubscribe from './subscriber/asset-account-subscriber'

export const CONNECTION_NOT_FOUND_NAME = 'ConnectionNotFoundError'

const dbPath = (name: string): string => {
const filename = `cell-${name}.sqlite`
const dbPath = (name: string, connectionName: string): string => {
const filename = `${connectionName}-${name}.sqlite`
return path.join(env.fileBasePath, 'cells', filename)
}

const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectionOptions> => {
const connectOptions = async (
genesisBlockHash: string,
connectionName: ConnectionName
): Promise<SqliteConnectionOptions> => {
const connectionOptions = await getConnectionOptions()
const database = env.isTestMode ? ':memory:' : dbPath(genesisBlockHash)
const database = env.isTestMode ? ':memory:' : dbPath(genesisBlockHash, connectionName)

const logging: boolean | ('query' | 'schema' | 'error' | 'warn' | 'info' | 'log' | 'migration')[] = ['warn', 'error']
// (env.isDevMode) ? ['warn', 'error', 'log', 'info', 'schema', 'migration'] : ['warn', 'error']

return {
...connectionOptions,
name: connectionName,
type: 'sqlite',
database,
entities: [
Expand Down Expand Up @@ -137,28 +148,55 @@ const connectOptions = async (genesisBlockHash: string): Promise<SqliteConnectio
RenameSyncProgress1702781527414,
RemoveAddressInIndexerCache1704357651876,
],
subscribers: [
AddressSubscribe,
AssetAccountSubscribe,
MultisigConfigSubscribe,
SudtTokenInfoSubscribe,
TxDescriptionSubscribe,
],
logger: 'simple-console',
logging,
maxQueryExecutionTime: 30,
}
}

export const initConnection = async (genesisBlockHash: string) => {
const initConnectionWithType = async (genesisBlockHash: string, connectionName: ConnectionName) => {
// try to close connection, if not exist, will throw ConnectionNotFoundError when call getConnection()
try {
await getConnection().close()
await getConnection(connectionName).close()
} catch (err) {
// do nothing
}
const connectionOptions = await connectOptions(genesisBlockHash)
const connectionOptions = await connectOptions(genesisBlockHash, connectionName)

try {
await createConnection(connectionOptions)
await getConnection().manager.query(`PRAGMA busy_timeout = 3000;`)
await getConnection().manager.query(`PRAGMA temp_store = MEMORY;`)
await getConnection(connectionName).manager.query(`PRAGMA busy_timeout = 3000;`)
await getConnection(connectionName).manager.query(`PRAGMA temp_store = MEMORY;`)
} catch (err) {
logger.error(err.message)
}
}

export async function initConnection(genesisBlockHash: string) {
await initConnectionWithType(genesisBlockHash, 'full')
await initConnectionWithType(genesisBlockHash, 'light')
}

export function migrateDBFile(genesisBlockHash: string) {
const originDBFile = dbPath(genesisBlockHash, 'cell')
const currentFullDBFile = dbPath(genesisBlockHash, 'full')
const currentLightDBFile = dbPath(genesisBlockHash, 'light')
if (fs.existsSync(originDBFile) && (!fs.existsSync(currentLightDBFile) || !fs.existsSync(currentFullDBFile))) {
if (!fs.existsSync(currentFullDBFile)) {
fs.copyFileSync(originDBFile, currentFullDBFile)
}
if (!fs.existsSync(currentLightDBFile)) {
fs.copyFileSync(originDBFile, currentLightDBFile)
}
fs.rmSync(originDBFile)
}
}

export default initConnection
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { EventSubscriber } from 'typeorm'
import UserSettingSubscriber from './user-setting-subscriber'
import AddressDescription from '../entities/address-description'

@EventSubscriber()
export default class AddressSubscribe extends UserSettingSubscriber<AddressDescription> {
listenTo() {
return AddressDescription
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { EventSubscriber, InsertEvent } from 'typeorm'
import UserSettingSubscriber from './user-setting-subscriber'
import AssetAccount from '../entities/asset-account'

@EventSubscriber()
export default class AssetAccountSubscribe extends UserSettingSubscriber<AssetAccount> {
unionKeys: string[] = ['tokenID', 'blake160']

ignoreUpdateKeys: string[] = ['sudtTokenInfo']

listenTo() {
return AssetAccount
}

async afterInsert(event: InsertEvent<AssetAccount>): Promise<AssetAccount | void> {
const repo = this.getNeedSyncConnection(event.connection.name)?.getRepository(AssetAccount)
if (repo && event.entity) {
const exist = await repo.findOne({ tokenID: event.entity.tokenID, blake160: event.entity.blake160 })
if (exist) {
await repo.upsert(AssetAccount.fromModel(event.entity.toModel()), this.unionKeys)
} else {
await repo.save(event.entity)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { EventSubscriber, InsertEvent } from 'typeorm'
import UserSettingSubscriber from './user-setting-subscriber'
import MultisigConfig from '../entities/multisig-config'

@EventSubscriber()
export default class MultisigConfigSubscribe extends UserSettingSubscriber<MultisigConfig> {
ignoreUpdateKeys = ['lastestBlockNumber']

listenTo() {
return MultisigConfig
}

async afterInsert(event: InsertEvent<MultisigConfig>): Promise<MultisigConfig | void> {
const repo = this.getNeedSyncConnection(event.connection.name)?.getRepository(MultisigConfig)
if (repo && event.entity) {
await repo.upsert(event.entity.cloneIgnoreBlockNumber(), this.unionKeys)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { EventSubscriber, InsertEvent } from 'typeorm'
import UserSettingSubscriber from './user-setting-subscriber'
import SudtTokenInfo from '../entities/sudt-token-info'

@EventSubscriber()
export default class SudtTokenInfoSubscribe extends UserSettingSubscriber<SudtTokenInfo> {
unionKeys: string[] = ['tokenID']

entityKeyName: string = 'tokenID'

ignoreUpdateKeys: string[] = ['assetAccounts']

listenTo() {
return SudtTokenInfo
}

async afterInsert(event: InsertEvent<SudtTokenInfo>): Promise<SudtTokenInfo | void> {
const repo = this.getNeedSyncConnection(event.connection.name)?.getRepository(SudtTokenInfo)
if (repo && event.entity) {
let mergeEntity: SudtTokenInfo | undefined = undefined
const existEntity = await event.connection.getRepository(SudtTokenInfo).findOne(event.entity.tokenID)
if (existEntity) {
mergeEntity = new SudtTokenInfo()
mergeEntity.tokenID = event.entity.tokenID || existEntity.tokenID
mergeEntity.symbol = event.entity.symbol || existEntity.symbol
mergeEntity.tokenName = event.entity.tokenName || existEntity.tokenName
mergeEntity.decimal = event.entity.decimal || existEntity.decimal
}
await repo.upsert(mergeEntity ?? event.entity, this.unionKeys)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { EventSubscriber } from 'typeorm'
import UserSettingSubscriber from './user-setting-subscriber'
import TxDescription from '../entities/tx-description'

@EventSubscriber()
export default class TxDescriptionSubscribe extends UserSettingSubscriber<TxDescription> {
listenTo() {
return TxDescription
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { EntitySubscriberInterface, InsertEvent, RemoveEvent, UpdateEvent } from 'typeorm'
import { ConnectionName, getConnection, getCurrentConnectionName } from '../connection'

type Constructor<T> = new (...args: unknown[]) => T

// Trigger relative updating through subscribing the changes from corresponding entities
export default abstract class UserSettingSubscriber<Entity extends object>
implements EntitySubscriberInterface<Entity>
{
abstract listenTo(): string | Constructor<Entity>

unionKeys: string[] = ['id']

entityKeyName: string = 'id'

ignoreUpdateKeys: string[] = []

getNeedSyncConnection(connectionName: string) {
const currentConnectionName = getCurrentConnectionName()
if (connectionName === currentConnectionName) {
const otherConnectionName: ConnectionName = currentConnectionName === 'full' ? 'light' : 'full'
return getConnection(otherConnectionName)
}
return
}

async afterInsert(event: InsertEvent<Entity>): Promise<Entity | void> {
const repo = this.getNeedSyncConnection(event.connection.name)?.getRepository<Entity>(this.listenTo())
if (repo && event.entity) {
await repo.upsert(event.entity, this.unionKeys)
}
}

async afterUpdate(event: UpdateEvent<Entity>): Promise<Entity | void> {
const repo = this.getNeedSyncConnection(event.connection.name)?.getRepository<Entity>(this.listenTo())
const updatedColumns = event.updatedColumns.filter(v => !this.ignoreUpdateKeys.includes(v.propertyName))
if (repo && event.entity && event.databaseEntity && updatedColumns.length) {
const updateEntity = updatedColumns.reduce(
(pre, cur) => ({
...pre,
[cur.propertyName]: event.entity![cur.propertyName],
}),
{}
)
const key = (event.databaseEntity as any)[this.entityKeyName]
if (key !== undefined && key !== null) {
await repo.update(key, updateEntity)
}
}
}

async afterRemove(event: RemoveEvent<Entity>): Promise<Entity | void> {
const repo = this.getNeedSyncConnection(event.connection.name)?.getRepository<Entity>(this.listenTo())
if (repo && event.databaseEntity) {
await repo.remove(event.databaseEntity)
}
}
}
Loading

0 comments on commit 976b132

Please sign in to comment.