diff --git a/analyzer/consensus/consensus.go b/analyzer/consensus/consensus.go index 5a7145dd4..9141de17d 100644 --- a/analyzer/consensus/consensus.go +++ b/analyzer/consensus/consensus.go @@ -253,7 +253,7 @@ func (m *processor) debugDumpGenesisJSON(genesisDoc *genesis.Document, heightOrN func (m *processor) processGenesis(ctx context.Context, genesisDoc *genesis.Document, nodesOverride []nodeapi.Node) error { m.logger.Info("processing genesis document") gen := NewGenesisProcessor(m.logger.With("height", "genesis")) - queries, err := gen.Process(genesisDoc, nodesOverride) + batch, err := gen.Process(genesisDoc, nodesOverride) if err != nil { return err } @@ -261,18 +261,17 @@ func (m *processor) processGenesis(ctx context.Context, genesisDoc *genesis.Docu // Debug: log the SQL into a file if requested. debugPath := os.Getenv("NEXUS_DUMP_GENESIS_SQL") if debugPath != "" { - sql := strings.Join(queries, "\n") - if err := os.WriteFile(debugPath, []byte(sql), 0o600 /* Permissions: rw------- */); err != nil { - gen.logger.Error("failed to write genesis sql to file", "err", err) + queries, err := json.Marshal(batch.Queries()) + if err != nil { + return err + } + if err := os.WriteFile(debugPath, queries, 0o600 /* Permissions: rw------- */); err != nil { + gen.logger.Error("failed to write genesis queries to file", "err", err) } else { - gen.logger.Info("wrote genesis sql to file", "path", debugPath) + gen.logger.Info("wrote genesis queries to file", "path", debugPath) } } - batch := &storage.QueryBatch{} - for _, query := range queries { - batch.Queue(query) - } if err := m.target.SendBatch(ctx, batch); err != nil { return err } @@ -1095,6 +1094,7 @@ func (m *processor) queueSubmissions(batch *storage.QueryBatch, data *governance submission.Content.Upgrade.Epoch, submission.CreatedAt, submission.ClosesAt, + 0, ) case submission.Content.CancelUpgrade != nil: batch.Queue(queries.ConsensusProposalSubmissionCancelInsert, @@ -1105,6 +1105,7 @@ func (m *processor) queueSubmissions(batch *storage.QueryBatch, data *governance submission.Content.CancelUpgrade.ProposalID, submission.CreatedAt, submission.ClosesAt, + 0, ) case submission.Content.ChangeParameters != nil: batch.Queue(queries.ConsensusProposalSubmissionChangeParametersInsert, @@ -1116,6 +1117,7 @@ func (m *processor) queueSubmissions(batch *storage.QueryBatch, data *governance []byte(submission.Content.ChangeParameters.Changes), submission.CreatedAt, submission.ClosesAt, + 0, ) default: m.logger.Warn("unknown proposal content type", "proposal_id", submission.ID, "content", submission.Content) diff --git a/analyzer/consensus/genesis.go b/analyzer/consensus/genesis.go index 8bc2a573a..082cb1202 100644 --- a/analyzer/consensus/genesis.go +++ b/analyzer/consensus/genesis.go @@ -5,24 +5,23 @@ package consensus import ( "encoding/hex" "encoding/json" - "fmt" "sort" - "strings" "github.com/oasisprotocol/oasis-core/go/common/cbor" "github.com/oasisprotocol/oasis-core/go/common/entity" + "github.com/oasisprotocol/nexus/analyzer/queries" + "github.com/oasisprotocol/nexus/common" beacon "github.com/oasisprotocol/nexus/coreapi/v22.2.11/beacon/api" genesis "github.com/oasisprotocol/nexus/coreapi/v22.2.11/genesis/api" registry "github.com/oasisprotocol/nexus/coreapi/v22.2.11/registry/api" staking "github.com/oasisprotocol/nexus/coreapi/v22.2.11/staking/api" + "github.com/oasisprotocol/nexus/storage" "github.com/oasisprotocol/nexus/log" "github.com/oasisprotocol/nexus/storage/oasis/nodeapi" ) -const bulkInsertBatchSize = 1000 - // GenesisProcessor generates sql statements for indexing the genesis state. type GenesisProcessor struct { logger *log.Logger @@ -35,124 +34,62 @@ func NewGenesisProcessor(logger *log.Logger) *GenesisProcessor { // Process generates SQL statements for indexing the genesis state. // `nodesOverride` can be nil; if non-nil, the behavior is as if the genesis document contained that set of nodes instead of whatever it contains. -func (mg *GenesisProcessor) Process(document *genesis.Document, nodesOverride []nodeapi.Node) ([]string, error) { - var queries []string +func (mg *GenesisProcessor) Process(document *genesis.Document, nodesOverride []nodeapi.Node) (*storage.QueryBatch, error) { + batch := &storage.QueryBatch{} - qs, err := mg.addRegistryBackendMigrations(document, nodesOverride) - if err != nil { + if err := mg.addRegistryBackendMigrations(batch, document, nodesOverride); err != nil { return nil, err } - queries = append(queries, qs...) - qs, err = mg.addStakingBackendMigrations(document) - if err != nil { + if err := mg.addStakingBackendMigrations(batch, document); err != nil { return nil, err } - queries = append(queries, qs...) - qs = mg.addGovernanceBackendMigrations(document) - queries = append(queries, qs...) + mg.addGovernanceBackendMigrations(batch, document) - mg.logger.Info("generated genesis queries", "count", len(queries)) + mg.logger.Info("generated genesis queries", "count", batch.Len()) - return queries, nil + return batch, nil } -func (mg *GenesisProcessor) addRegistryBackendMigrations(document *genesis.Document, nodesOverride []nodeapi.Node) (queries []string, err error) { +func (mg *GenesisProcessor) addRegistryBackendMigrations(batch *storage.QueryBatch, document *genesis.Document, nodesOverride []nodeapi.Node) error { // Populate entities. - query := `INSERT INTO chain.entities (id, address) -VALUES -` - for i, signedEntity := range document.Registry.Entities { + for _, signedEntity := range document.Registry.Entities { var entity entity.Entity if err := signedEntity.Open(registry.RegisterEntitySignatureContext, &entity); err != nil { - return nil, err + return err } - query += fmt.Sprintf( - "\t('%s', '%s')", + batch.Queue(queries.ConsensusEntityUpsert, entity.ID.String(), staking.NewAddress(entity.ID).String(), ) - if i != len(document.Registry.Entities)-1 { - query += ",\n" - } } - query += ` -ON CONFLICT (id) DO UPDATE SET address = EXCLUDED.address;` - queries = append(queries, query) // Populate runtimes. - if len(document.Registry.Runtimes) > 0 { - query = `INSERT INTO chain.runtimes (id, suspended, kind, tee_hardware, key_manager) -VALUES -` - for i, runtime := range document.Registry.Runtimes { - keyManager := "NULL" - if runtime.KeyManager != nil { - keyManager = fmt.Sprintf("'%s'", runtime.KeyManager.String()) - } - query += fmt.Sprintf( - "\t('%s', %t, '%s', '%s', %s)", - runtime.ID.String(), - false, - runtime.Kind.String(), - runtime.TEEHardware.String(), - keyManager, - ) - - if i != len(document.Registry.Runtimes)-1 { - query += ",\n" - } - } - query += ` -ON CONFLICT (id) DO UPDATE SET - suspended = EXCLUDED.suspended, - kind = EXCLUDED.kind, - tee_hardware = EXCLUDED.tee_hardware, - key_manager = EXCLUDED.key_manager;` - queries = append(queries, query) + for _, runtime := range document.Registry.Runtimes { + batch.Queue(queries.ConsensusRuntimeUpsert, + runtime.ID.String(), + false, + runtime.Kind.String(), + runtime.TEEHardware.String(), + common.StringOrNil(runtime.KeyManager), + ) } - if len(document.Registry.SuspendedRuntimes) > 0 { - query = `INSERT INTO chain.runtimes (id, suspended, kind, tee_hardware, key_manager) -VALUES -` - - for i, runtime := range document.Registry.SuspendedRuntimes { - keyManager := "NULL" - if runtime.KeyManager != nil { - keyManager = fmt.Sprintf("'%s'", runtime.KeyManager.String()) - } - query += fmt.Sprintf( - "\t('%s', %t, '%s', '%s', %s)", - runtime.ID.String(), - true, - runtime.Kind.String(), - runtime.TEEHardware.String(), - keyManager, - ) - - if i != len(document.Registry.SuspendedRuntimes)-1 { - query += ",\n" - } - } - query += ` -ON CONFLICT (id) DO UPDATE SET - suspended = EXCLUDED.suspended, - kind = EXCLUDED.kind, - tee_hardware = EXCLUDED.tee_hardware, - key_manager = EXCLUDED.key_manager;` - queries = append(queries, query) + for _, runtime := range document.Registry.SuspendedRuntimes { + batch.Queue(queries.ConsensusRuntimeUpsert, + runtime.ID.String(), + true, + runtime.Kind.String(), + runtime.TEEHardware.String(), + common.StringOrNil(runtime.KeyManager), + ) } // Populate nodes. - queries = append(queries, `DELETE FROM chain.nodes;`) - queries = append(queries, `DELETE FROM chain.runtime_nodes;`) - query = `INSERT INTO chain.nodes (id, entity_id, expiration, tls_pubkey, tls_next_pubkey, p2p_pubkey, consensus_pubkey, roles) -VALUES -` - queryRt := "" // Query for populating the chain.runtime_nodes table. + batch.Queue(`DELETE FROM chain.nodes`) + batch.Queue(`DELETE FROM chain.runtime_nodes`) var nodes []nodeapi.Node // What we'll work with; either `overrideNodes` or the nodes from the genesis document. if nodesOverride != nil { @@ -163,7 +100,7 @@ VALUES if err := cbor.Unmarshal(signedNode.Blob, &node); err != nil { // ^ We do not verify the signatures on the Blob; we trust the node that provided the genesis document. // Also, nexus performs internal lossy data conversions where signatures are lost. - return nil, err + return err } if beacon.EpochTime(node.Expiration) < document.Beacon.Base { // Node expired before the genesis epoch, skip. @@ -173,58 +110,39 @@ VALUES } } - for i, node := range nodes { - query += fmt.Sprintf( - "\t('%s', '%s', %d, '%s', '%s', '%s', '%s', '%s')", + for _, node := range nodes { + batch.Queue(queries.ConsensusNodeUpsert, node.ID.String(), node.EntityID.String(), node.Expiration, node.TLS.PubKey.String(), node.TLS.NextPubKey.String(), + nil, node.P2P.ID.String(), + nil, node.Consensus.ID.String(), + nil, + nil, node.Roles.String(), + nil, + nil, ) - if i != len(nodes)-1 { - query += ",\n" - } for _, runtime := range node.Runtimes { - if queryRt != "" { - // There's already a values tuple in the query. - queryRt += ",\n" - } - queryRt += fmt.Sprintf( - "\t('%s', '%s')", + batch.Queue(queries.ConsensusRuntimeNodesUpsert, runtime.ID.String(), node.ID.String(), ) } } - query += ";" - queries = append(queries, query) - - // There might be no runtime_nodes to insert; create a query only if there are. - if queryRt != "" { - queryRt = `INSERT INTO chain.runtime_nodes(runtime_id, node_id) -VALUES -` + queryRt + ";" - queries = append(queries, queryRt) - } - return queries, nil + return nil } -//nolint:gocyclo -func (mg *GenesisProcessor) addStakingBackendMigrations(document *genesis.Document) (queries []string, err error) { +func (mg *GenesisProcessor) addStakingBackendMigrations(batch *storage.QueryBatch, document *genesis.Document) error { // Populate accounts. // Populate special accounts with reserved addresses. - query := `-- Reserved addresses -INSERT INTO chain.accounts (address, general_balance, nonce, escrow_balance_active, escrow_total_shares_active, escrow_balance_debonding, escrow_total_shares_debonding) -VALUES -` - reservedAccounts := make(map[staking.Address]*staking.Account) commonPoolAccount := staking.Account{ @@ -247,10 +165,9 @@ VALUES reservedAccounts[staking.FeeAccumulatorAddress] = &feeAccumulatorAccount reservedAccounts[staking.GovernanceDepositsAddress] = &governanceDepositsAccount - for i, address := range sortedAddressKeys(reservedAccounts) { + for _, address := range sortedAddressKeys(reservedAccounts) { account := reservedAccounts[address] - query += fmt.Sprintf( - "\t('%s', %d, %d, %d, %d, %d, %d)", + batch.Queue(queries.ConsensusAccountUpsert, address.String(), account.General.Balance.ToBigInt(), account.General.Nonce, @@ -259,29 +176,11 @@ VALUES account.Escrow.Debonding.Balance.ToBigInt(), account.Escrow.Debonding.TotalShares.ToBigInt(), ) - - if i != len(reservedAccounts)-1 { - query += ",\n" - } } - query += ` -ON CONFLICT (address) DO UPDATE SET - general_balance = EXCLUDED.general_balance, - nonce = EXCLUDED.nonce, - escrow_balance_active = EXCLUDED.escrow_balance_active, - escrow_total_shares_active = EXCLUDED.escrow_total_shares_active, - escrow_balance_debonding = EXCLUDED.escrow_balance_debonding, - escrow_total_shares_debonding = EXCLUDED.escrow_total_shares_debonding;` - queries = append(queries, query) - - query = `INSERT INTO chain.accounts (address, general_balance, nonce, escrow_balance_active, escrow_total_shares_active, escrow_balance_debonding, escrow_total_shares_debonding) -VALUES -` - - for i, address := range sortedAddressKeys(document.Staking.Ledger) { + + for _, address := range sortedAddressKeys(document.Staking.Ledger) { account := document.Staking.Ledger[address] - query += fmt.Sprintf( - "\t('%s', %d, %d, %d, %d, %d, %d)", + batch.Queue(queries.ConsensusAccountUpsert, address.String(), account.General.Balance.ToBigInt(), account.General.Nonce, @@ -290,299 +189,134 @@ VALUES account.Escrow.Debonding.Balance.ToBigInt(), account.Escrow.Debonding.TotalShares.ToBigInt(), ) - - if (i+1)%bulkInsertBatchSize == 0 { - query += ` -ON CONFLICT (address) DO UPDATE SET - general_balance = EXCLUDED.general_balance, - nonce = EXCLUDED.nonce, - escrow_balance_active = EXCLUDED.escrow_balance_active, - escrow_total_shares_active = EXCLUDED.escrow_total_shares_active, - escrow_balance_debonding = EXCLUDED.escrow_balance_debonding, - escrow_total_shares_debonding = EXCLUDED.escrow_total_shares_debonding; -` - queries = append(queries, query) - query = `INSERT INTO chain.accounts (address, general_balance, nonce, escrow_balance_active, escrow_total_shares_active, escrow_balance_debonding, escrow_total_shares_debonding) -VALUES -` - } else if i != len(document.Staking.Ledger)-1 { - query += ",\n" - } - } - query += ` -ON CONFLICT (address) DO UPDATE SET - general_balance = EXCLUDED.general_balance, - nonce = EXCLUDED.nonce, - escrow_balance_active = EXCLUDED.escrow_balance_active, - escrow_total_shares_active = EXCLUDED.escrow_total_shares_active, - escrow_balance_debonding = EXCLUDED.escrow_balance_debonding, - escrow_total_shares_debonding = EXCLUDED.escrow_total_shares_debonding;` - if len(document.Staking.Ledger) > 0 { - queries = append(queries, query) } // Populate commissions. - // This likely won't overflow batch limit. - query = `INSERT INTO chain.commissions (address, schedule) VALUES -` - - commissions := make([]string, 0) - for _, address := range sortedAddressKeys(document.Staking.Ledger) { account := document.Staking.Ledger[address] if len(account.Escrow.CommissionSchedule.Rates) > 0 || len(account.Escrow.CommissionSchedule.Bounds) > 0 { schedule, err := json.Marshal(account.Escrow.CommissionSchedule) if err != nil { - return nil, err + return err } - commissions = append(commissions, fmt.Sprintf( - "\t('%s', '%s')", + batch.Queue(queries.ConsensusCommissionsUpsert, address.String(), string(schedule), - )) - } - } - - for index, commission := range commissions { - query += commission - - if index != len(commissions)-1 { - query += ",\n" - } else { - query += ` -ON CONFLICT (address) DO UPDATE SET - schedule = EXCLUDED.schedule;` + ) } } - if len(commissions) > 0 { - queries = append(queries, query) - } // Populate allowances. - queries = append(queries, `DELETE FROM chain.allowances;`) - foundAllowances := false // in case allowances are empty + batch.Queue(`DELETE FROM chain.allowances`) - query = "" for _, owner := range sortedAddressKeys(document.Staking.Ledger) { account := document.Staking.Ledger[owner] - if len(account.General.Allowances) > 0 && foundAllowances { - query += ",\n" - } - ownerAllowances := make([]string, len(account.General.Allowances)) - for j, beneficiary := range sortedAddressKeys(account.General.Allowances) { + for _, beneficiary := range sortedAddressKeys(account.General.Allowances) { allowance := account.General.Allowances[beneficiary] - ownerAllowances[j] = fmt.Sprintf( - "\t('%s', '%s', %d)", + batch.Queue(queries.ConsensusAllowanceChangeUpdate, owner.String(), beneficiary.String(), allowance.ToBigInt(), ) } - if len(account.General.Allowances) > 0 && !foundAllowances { - query += `INSERT INTO chain.allowances (owner, beneficiary, allowance) -VALUES -` - foundAllowances = true - } - - query += strings.Join(ownerAllowances, ",\n") - } - if foundAllowances { - query += ";" - queries = append(queries, query) } // Populate delegations. - query = `INSERT INTO chain.delegations (delegatee, delegator, shares) -VALUES -` - i := 0 - for j, delegatee := range sortedAddressKeys(document.Staking.Delegations) { + + for _, delegatee := range sortedAddressKeys(document.Staking.Delegations) { escrows := document.Staking.Delegations[delegatee] - for k, delegator := range sortedAddressKeys(escrows) { + for _, delegator := range sortedAddressKeys(escrows) { delegation := escrows[delegator] - query += fmt.Sprintf( - "\t('%s', '%s', %d)", + batch.Queue(queries.ConsensusAddDelegationsUpsert, delegatee.String(), delegator.String(), delegation.Shares.ToBigInt(), ) - i++ - - if i%bulkInsertBatchSize == 0 { - query += ` -ON CONFLICT (delegatee, delegator) DO UPDATE SET - shares = EXCLUDED.shares;` - queries = append(queries, query) - query = `INSERT INTO chain.delegations (delegatee, delegator, shares) -VALUES -` - } else if !(k == len(escrows)-1 && j == len(document.Staking.Delegations)-1) { - query += ",\n" - } } } - query += ` -ON CONFLICT (delegatee, delegator) DO UPDATE SET - shares = EXCLUDED.shares;` - queries = append(queries, query) // Populate debonding delegations. - queries = append(queries, `DELETE FROM chain.debonding_delegations;`) - query = `INSERT INTO chain.debonding_delegations (delegatee, delegator, shares, debond_end) -VALUES -` - for i, delegatee := range sortedAddressKeys(document.Staking.DebondingDelegations) { + batch.Queue(`DELETE FROM chain.debonding_delegations`) + for _, delegatee := range sortedAddressKeys(document.Staking.DebondingDelegations) { escrows := document.Staking.DebondingDelegations[delegatee] - delegateeDebondingDelegations := make([]string, 0) for _, delegator := range sortedAddressKeys(escrows) { debondingDelegations := escrows[delegator] - delegatorDebondingDelegations := make([]string, len(debondingDelegations)) - for k, debondingDelegation := range debondingDelegations { - delegatorDebondingDelegations[k] = fmt.Sprintf( - "\t('%s', '%s', %d, %d)", + for _, debondingDelegation := range debondingDelegations { + batch.Queue(queries.ConsensusDebondingStartDebondingDelegationsInsert, delegatee.String(), delegator.String(), debondingDelegation.Shares.ToBigInt(), debondingDelegation.DebondEndTime, ) } - delegateeDebondingDelegations = append(delegateeDebondingDelegations, delegatorDebondingDelegations...) - } - query += strings.Join(delegateeDebondingDelegations, ",\n") - - if i != len(document.Staking.DebondingDelegations)-1 && len(escrows) > 0 { - query += ",\n" } } - query += ";\n" - if len(document.Staking.DebondingDelegations) > 0 { - queries = append(queries, query) - } - return queries, nil + return nil } -func (mg *GenesisProcessor) addGovernanceBackendMigrations(document *genesis.Document) (queries []string) { +func (mg *GenesisProcessor) addGovernanceBackendMigrations(batch *storage.QueryBatch, document *genesis.Document) { // Populate proposals. - if len(document.Governance.Proposals) > 0 { - // TODO: Extract `executed` for proposal. - query := `INSERT INTO chain.proposals (id, submitter, state, deposit, handler, cp_target_version, rhp_target_version, rcp_target_version, upgrade_epoch, cancels, parameters_change_module, parameters_change, created_at, closes_at, invalid_votes) -VALUES -` - - for i, proposal := range document.Governance.Proposals { - switch { - case proposal.Content.Upgrade != nil: - query += fmt.Sprintf( - "\t(%d, '%s', '%s', %d, '%s', '%s', '%s', '%s', %d, NULL, NULL, NULL, %d, %d, %d)", - proposal.ID, - proposal.Submitter.String(), - proposal.State.String(), - proposal.Deposit.ToBigInt(), - proposal.Content.Upgrade.Handler, - proposal.Content.Upgrade.Target.ConsensusProtocol.String(), - proposal.Content.Upgrade.Target.RuntimeHostProtocol.String(), - proposal.Content.Upgrade.Target.RuntimeCommitteeProtocol.String(), - proposal.Content.Upgrade.Epoch, - // 1 hardcoded NULL for the proposal.Content.CancelUpgrade.ProposalID field. - // 2 hardcoded NULLs for the proposal.Content.ChangeParameters fields. - proposal.CreatedAt, - proposal.ClosesAt, - proposal.InvalidVotes, - ) - case proposal.Content.CancelUpgrade != nil: - query += fmt.Sprintf( - "\t(%d, '%s', '%s', %d, NULL, NULL, NULL, NULL, NULL, %d, NULL, NULL, %d, %d, %d)", - proposal.ID, - proposal.Submitter.String(), - proposal.State.String(), - proposal.Deposit.ToBigInt(), - // 5 hardcoded NULLs for the proposal.Content.Upgrade fields. - proposal.Content.CancelUpgrade.ProposalID, - // 2 hardcoded NULLs for the proposal.Content.ChangeParameters fields. - proposal.CreatedAt, - proposal.ClosesAt, - proposal.InvalidVotes, - ) - case proposal.Content.ChangeParameters != nil: - query += fmt.Sprintf( - "\t(%d, '%s', '%s', %d, NULL, NULL, NULL, NULL, NULL, NULL, '%s', decode('%s', 'hex'), %d, %d, %d)", - proposal.ID, - proposal.Submitter.String(), - proposal.State.String(), - proposal.Deposit.ToBigInt(), - // 5 hardcoded NULLs for the proposal.Content.Upgrade fields. - // 1 hardocded NULL for the proposal.Content.CancelUpgrade.ProposalID field. - proposal.Content.ChangeParameters.Module, - hex.EncodeToString(proposal.Content.ChangeParameters.Changes), - proposal.CreatedAt, - proposal.ClosesAt, - proposal.InvalidVotes, - ) - default: - mg.logger.Warn("unknown proposal content type", "proposal_id", proposal.ID, "content", proposal.Content) - } - - if i != len(document.Governance.Proposals)-1 { - query += ",\n" - } + // TODO: Extract `executed` for proposal. + for _, proposal := range document.Governance.Proposals { + switch { + case proposal.Content.Upgrade != nil: + batch.Queue(queries.ConsensusProposalSubmissionInsert, + proposal.ID, + proposal.Submitter.String(), + proposal.State.String(), + proposal.Deposit.ToBigInt(), + proposal.Content.Upgrade.Handler, + proposal.Content.Upgrade.Target.ConsensusProtocol.String(), + proposal.Content.Upgrade.Target.RuntimeHostProtocol.String(), + proposal.Content.Upgrade.Target.RuntimeCommitteeProtocol.String(), + proposal.Content.Upgrade.Epoch, + proposal.CreatedAt, + proposal.ClosesAt, + proposal.InvalidVotes, + ) + case proposal.Content.CancelUpgrade != nil: + batch.Queue(queries.ConsensusProposalSubmissionCancelInsert, + proposal.ID, + proposal.Submitter.String(), + proposal.State.String(), + proposal.Deposit.ToBigInt(), + proposal.Content.CancelUpgrade.ProposalID, + proposal.CreatedAt, + proposal.ClosesAt, + proposal.InvalidVotes, + ) + case proposal.Content.ChangeParameters != nil: + batch.Queue(queries.ConsensusProposalSubmissionChangeParametersInsert, + proposal.ID, + proposal.Submitter.String(), + proposal.State.String(), + proposal.Deposit.ToBigInt(), + proposal.Content.ChangeParameters.Module, + hex.EncodeToString(proposal.Content.ChangeParameters.Changes), + proposal.CreatedAt, + proposal.ClosesAt, + proposal.InvalidVotes, + ) + default: + mg.logger.Warn("unknown proposal content type", "proposal_id", proposal.ID, "content", proposal.Content) } - query += ` -ON CONFLICT (id) DO UPDATE SET - submitter = EXCLUDED.submitter, - state = EXCLUDED.state, - deposit = EXCLUDED.deposit, - handler = EXCLUDED.handler, - cp_target_version = EXCLUDED.cp_target_version, - rhp_target_version = EXCLUDED.rhp_target_version, - rcp_target_version = EXCLUDED.rcp_target_version, - upgrade_epoch = EXCLUDED.upgrade_epoch, - cancels = EXCLUDED.cancels, - created_at = EXCLUDED.created_at, - closes_at = EXCLUDED.closes_at, - invalid_votes = EXCLUDED.invalid_votes;` - queries = append(queries, query) } // Populate votes. - foundVotes := false // in case votes are empty - - var query string - for i, proposalID := range sortedIntKeys(document.Governance.VoteEntries) { + for _, proposalID := range sortedIntKeys(document.Governance.VoteEntries) { voteEntries := document.Governance.VoteEntries[proposalID] - if len(voteEntries) > 0 && !foundVotes { - query = `INSERT INTO chain.votes (proposal, voter, vote) -VALUES -` - foundVotes = true - } - votes := make([]string, len(voteEntries)) - for j, voteEntry := range voteEntries { - votes[j] = fmt.Sprintf( - "\t(%d, '%s', '%s')", + for _, voteEntry := range voteEntries { + batch.Queue(queries.ConsensusVoteUpsert, proposalID, voteEntry.Voter.String(), voteEntry.Vote.String(), ) } - query += strings.Join(votes, ",\n") - - if i != len(document.Governance.VoteEntries)-1 && len(voteEntries) > 0 { - query += ",\n" - } - } - if foundVotes { - query += ` -ON CONFLICT (proposal, voter) DO UPDATE SET - vote = EXCLUDED.vote;` - queries = append(queries, query) } - - return queries } func sortedIntKeys[V any](m map[uint64]V) []uint64 { diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index 200b31a3e..81a0939fd 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -197,6 +197,20 @@ var ( INSERT INTO chain.transactions (block, tx_hash, tx_index, nonce, fee_amount, max_gas, method, sender, body, module, code, message) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)` + ConsensusAccountUpsert = ` + INSERT INTO chain.accounts + (address, general_balance, nonce, escrow_balance_active, escrow_total_shares_active, escrow_balance_debonding, escrow_total_shares_debonding) + VALUES + ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (address) DO UPDATE + SET + general_balance = excluded.general_balance, + nonce = excluded.nonce, + escrow_balance_active = excluded.escrow_balance_active, + escrow_total_shares_active = excluded.escrow_total_shares_active, + escrow_balance_debonding = excluded.escrow_balance_debonding, + escrow_total_shares_debonding = excluded.escrow_total_shares_debonding` + ConsensusAccountNonceUpsert = ` INSERT INTO chain.accounts(address, nonce) VALUES ($1, $2) @@ -324,6 +338,15 @@ var ( escrow_balance_active = chain.accounts.escrow_balance_active + $2, escrow_total_shares_active = chain.accounts.escrow_total_shares_active + $3` + ConsensusAddDelegationsUpsertReplace = ` + INSERT INTO chain.delegations + (delegatee, delegator, shares) + VALUES + ($1, $2, $3) + ON CONFLICT (delegatee, delegator) DO UPDATE + SET + shares = excluded.shares` + ConsensusAddDelegationsUpsert = ` INSERT INTO chain.delegations (delegatee, delegator, shares) VALUES ($1, $2, $3) @@ -410,16 +433,16 @@ var ( TRUNCATE chain.committee_members` ConsensusProposalSubmissionInsert = ` - INSERT INTO chain.proposals (id, submitter, state, deposit, handler, cp_target_version, rhp_target_version, rcp_target_version, upgrade_epoch, created_at, closes_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)` + INSERT INTO chain.proposals (id, submitter, state, deposit, handler, cp_target_version, rhp_target_version, rcp_target_version, upgrade_epoch, created_at, closes_at, invalid_votes) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)` ConsensusProposalSubmissionCancelInsert = ` - INSERT INTO chain.proposals (id, submitter, state, deposit, cancels, created_at, closes_at) - VALUES ($1, $2, $3, $4, $5, $6, $7)` + INSERT INTO chain.proposals (id, submitter, state, deposit, cancels, created_at, closes_at, invalid_votes) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)` ConsensusProposalSubmissionChangeParametersInsert = ` - INSERT INTO chain.proposals (id, submitter, state, deposit, parameters_change_module, parameters_change, created_at, closes_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8)` + INSERT INTO chain.proposals (id, submitter, state, deposit, parameters_change_module, parameters_change, created_at, closes_at, invalid_votes) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)` ConsensusProposalExecutionsUpdate = ` UPDATE chain.proposals