Skip to content

Commit

Permalink
Postgres Writer #3 : Move transaction and batching (#578)
Browse files Browse the repository at this point in the history
Postgres Writer #3 : Move transaction and batching

Followups from previous reviews:
- Add NotImplementedException to methods missing logic
- share code between two insertFile*() functions

Signed-off-by: Apekshit Sharma <[email protected]>
  • Loading branch information
apeksharma authored Mar 2, 2020
1 parent 1e04dbf commit 740b6f5
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.domain.Persistable;

@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "t_transactions")
@ToString(exclude = {"memo", "transactionHash", "transactionBytes"})
public class Transaction implements Persistable<Long> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.hedera.mirror.importer.parser.record;

import lombok.Data;
import javax.inject.Named;

@Data
@Named
public class PostgresWriterProperties {
/**
* PreparedStatement.executeBatch() is called after every batchSize number of transactions from record stream file.
*/
private int batchSize = 100;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.inject.Named;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import com.hedera.mirror.importer.domain.ContractResult;
Expand All @@ -36,18 +37,31 @@
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.exception.ParserSQLException;

import org.apache.commons.lang3.NotImplementedException;

@Log4j2
@Named
@RequiredArgsConstructor
public class PostgresWritingRecordParsedItemHandler implements RecordParsedItemHandler {
private long batch_count = 0;
private PreparedStatement sqlInsertTransaction;
private PreparedStatement sqlInsertTransferList;
private PreparedStatement sqlInsertNonFeeTransfers;
private PreparedStatement sqlInsertFileData;
private PreparedStatement sqlInsertContractResult;
private PreparedStatement sqlInsertLiveHashes;
private PreparedStatement sqlInsertTopicMessage;
private final PostgresWriterProperties properties;

void initSqlStatements(Connection connection) throws ParserSQLException {
try {
sqlInsertTransaction = connection.prepareStatement("INSERT INTO t_transactions"
+ " (fk_node_acc_id, memo, valid_start_ns, type, fk_payer_acc_id"
+ ", result, consensus_ns, fk_cud_entity_id, charged_tx_fee"
+ ", initial_balance, fk_rec_file_id, valid_duration_seconds, max_fee"
+ ", transaction_hash, transaction_bytes)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

sqlInsertTransferList = connection.prepareStatement("INSERT INTO t_cryptotransferlists"
+ " (consensus_timestamp, amount, realm_num, entity_num)"
+ " VALUES (?, ?, ?, ?)");
Expand Down Expand Up @@ -87,6 +101,7 @@ public void onFileComplete() {

private void closeStatements() {
try {
sqlInsertTransaction.close();
sqlInsertTransferList.close();
sqlInsertNonFeeTransfers.close();
sqlInsertFileData.close();
Expand All @@ -100,25 +115,60 @@ private void closeStatements() {

void executeBatches() {
try {
int[] transactions = sqlInsertTransaction.executeBatch();
int[] transferLists = sqlInsertTransferList.executeBatch();
int[] nonFeeTransfers = sqlInsertNonFeeTransfers.executeBatch();
int[] fileData = sqlInsertFileData.executeBatch();
int[] contractResult = sqlInsertContractResult.executeBatch();
int[] liveHashes = sqlInsertLiveHashes.executeBatch();
int[] topicMessages = sqlInsertTopicMessage.executeBatch();
log.info("Inserted {} transfer lists, {} files, {} contracts, {} claims, {} topic messages, " +
log.info("Inserted {} transactions, {} transfer lists, {} files, {} contracts, {} claims, {} topic " +
"messages, " +
"{} non-fee transfers",
transferLists.length, fileData.length, contractResult.length, liveHashes.length,
topicMessages.length, nonFeeTransfers.length);
transactions.length, transferLists.length, fileData.length, contractResult.length,
liveHashes.length, topicMessages.length, nonFeeTransfers.length);
} catch (SQLException e) {
log.error("Error committing sql insert batch ", e);
throw new ParserSQLException(e);
}
batch_count = 0;
}

@Override
public void onTransaction(Transaction transaction) throws ImporterException {
// to be implemented in followup change
try {
// Temporary until we convert SQL statements to repository invocations
if (transaction.getEntity() != null) {
sqlInsertTransaction.setLong(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), transaction.getEntity().getId());
} else {
sqlInsertTransaction.setObject(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), null);
}
sqlInsertTransaction.setLong(F_TRANSACTION.FK_REC_FILE_ID.ordinal(), transaction.getRecordFileId());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_NODE_ACCOUNT_ID.ordinal(), transaction.getNodeAccountId());
sqlInsertTransaction.setBytes(F_TRANSACTION.MEMO.ordinal(), transaction.getMemo());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_START_NS.ordinal(), transaction.getValidStartNs());
sqlInsertTransaction.setInt(F_TRANSACTION.TYPE.ordinal(), transaction.getType());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_DURATION_SECONDS.ordinal(),
transaction.getValidDurationSeconds());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_PAYER_ACCOUNT_ID.ordinal(), transaction.getPayerAccountId());
sqlInsertTransaction.setLong(F_TRANSACTION.RESULT.ordinal(), transaction.getResult());
sqlInsertTransaction.setLong(F_TRANSACTION.CONSENSUS_NS.ordinal(), transaction.getConsensusNs());
sqlInsertTransaction.setLong(F_TRANSACTION.CHARGED_TX_FEE.ordinal(), transaction.getChargedTxFee());
sqlInsertTransaction.setLong(F_TRANSACTION.MAX_FEE.ordinal(), transaction.getMaxFee());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_HASH.ordinal(), transaction.getTransactionHash());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_BYTES.ordinal(), transaction.getTransactionBytes());
sqlInsertTransaction.setLong(F_TRANSACTION.INITIAL_BALANCE.ordinal(), transaction.getInitialBalance());
sqlInsertTransaction.addBatch();

if (batch_count == properties.getBatchSize() - 1) {
// execute any remaining batches
executeBatches();
} else {
batch_count += 1;
}
} catch (SQLException e) {
throw new ParserSQLException(e);
}
}

@Override
Expand Down Expand Up @@ -206,7 +256,14 @@ public void onLiveHash(LiveHash liveHash) throws ImporterException {

@Override
public void onError(Throwable e) {
// to be implemented in followup change
throw new NotImplementedException("onError not implemented");
}

enum F_TRANSACTION {
ZERO // column indices start at 1, this creates the necessary offset
, FK_NODE_ACCOUNT_ID, MEMO, VALID_START_NS, TYPE, FK_PAYER_ACCOUNT_ID, RESULT, CONSENSUS_NS,
CUD_ENTITY_ID, CHARGED_TX_FEE, INITIAL_BALANCE, FK_REC_FILE_ID, VALID_DURATION_SECONDS, MAX_FEE,
TRANSACTION_HASH, TRANSACTION_BYTES
}

enum F_TRANSFERLIST {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Set;
Expand All @@ -62,6 +61,7 @@
import com.hedera.mirror.importer.domain.LiveHash;
import com.hedera.mirror.importer.domain.NonFeeTransfer;
import com.hedera.mirror.importer.domain.TopicMessage;
import com.hedera.mirror.importer.exception.ParserSQLException;
import com.hedera.mirror.importer.parser.CommonParserProperties;
import com.hedera.mirror.importer.repository.EntityRepository;
import com.hedera.mirror.importer.repository.EntityTypeRepository;
Expand All @@ -81,10 +81,6 @@ public class RecordFileLogger {
private static PostgresWritingRecordParsedItemHandler postgresWriter;

private static long fileId = 0;
private static long BATCH_SIZE = 100;
private static long batch_count = 0;

private static PreparedStatement sqlInsertTransaction;

public RecordFileLogger(CommonParserProperties commonParserProperties, RecordParserProperties parserProperties,
NetworkAddressBook networkAddressBook, EntityRepository entityRepository,
Expand All @@ -104,13 +100,7 @@ static long getFileId() {
return fileId;
}

static void setBatchSize(long batchSize) {
BATCH_SIZE = batchSize;
}

public static boolean start() {
batch_count = 0;

connect = DatabaseUtilities.openDatabase(connect);

if (connect == null) {
Expand All @@ -124,29 +114,18 @@ public static boolean start() {
log.error("Unable to set connection to not auto commit", e);
return false;
}

try {
sqlInsertTransaction = connect.prepareStatement("INSERT INTO t_transactions"
+ " (fk_node_acc_id, memo, valid_start_ns, type, fk_payer_acc_id"
+ ", result, consensus_ns, fk_cud_entity_id, charged_tx_fee"
+ ", initial_balance, fk_rec_file_id, valid_duration_seconds, max_fee"
+ ", transaction_hash, transaction_bytes)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

postgresWriter.initSqlStatements(connect);
} catch (SQLException e) {
} catch (ParserSQLException e) {
log.error("Unable to prepare SQL statements", e);
return false;
}

return true;
}

public static boolean finish() {
try {
sqlInsertTransaction.close();
postgresWriter.finish();

connect = DatabaseUtilities.closeDatabase(connect);
return false;
} catch (SQLException e) {
Expand Down Expand Up @@ -181,8 +160,6 @@ public static INIT_RESULT initFile(String fileName) {

public static void completeFile(String fileHash, String previousHash) throws SQLException {
try (CallableStatement fileClose = connect.prepareCall("{call f_file_complete( ?, ?, ? ) }")) {
// execute any remaining batches
executeBatches();
postgresWriter.onFileComplete();

// update the file to processed
Expand Down Expand Up @@ -468,36 +445,17 @@ public static void storeRecord(Transaction transaction, TransactionRecord txReco
}
entity.setAutoRenewAccount(createEntity(entity.getAutoRenewAccount()));
entity = entityRepository.save(entity);
sqlInsertTransaction.setLong(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), entity.getId());
} else if (entityId != null) {
sqlInsertTransaction.setObject(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), entityId.getId());
} else {
sqlInsertTransaction.setObject(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), null);
entity = new Entities();
entity.setId(entityId.getId());
}
tx.setEntity(entity);

EntityId payerEntityId = getEntityId(payerAccountId);
EntityId nodeEntityId = getEntityId(body.getNodeAccountID());
tx.setNodeAccountId(nodeEntityId.getId());
tx.setPayerAccountId(payerEntityId.getId());

// Temporary until we convert SQL statements to repository invocations
sqlInsertTransaction.setLong(F_TRANSACTION.FK_NODE_ACCOUNT_ID.ordinal(), tx.getNodeAccountId());
sqlInsertTransaction.setBytes(F_TRANSACTION.MEMO.ordinal(), tx.getMemo());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_START_NS.ordinal(), tx.getValidStartNs());
sqlInsertTransaction.setInt(F_TRANSACTION.TYPE.ordinal(), tx.getType());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_REC_FILE_ID.ordinal(), tx.getRecordFileId());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_DURATION_SECONDS.ordinal(), tx.getValidDurationSeconds());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_PAYER_ACCOUNT_ID.ordinal(), tx.getPayerAccountId());
sqlInsertTransaction.setLong(F_TRANSACTION.RESULT.ordinal(), tx.getResult());
sqlInsertTransaction.setLong(F_TRANSACTION.CONSENSUS_NS.ordinal(), tx.getConsensusNs());
sqlInsertTransaction.setLong(F_TRANSACTION.CHARGED_TX_FEE.ordinal(), tx.getChargedTxFee());
sqlInsertTransaction.setLong(F_TRANSACTION.MAX_FEE.ordinal(), tx.getMaxFee());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_HASH.ordinal(), tx.getTransactionHash());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_BYTES.ordinal(), tx.getTransactionBytes());
sqlInsertTransaction.setLong(F_TRANSACTION.INITIAL_BALANCE.ordinal(), tx.getInitialBalance());
sqlInsertTransaction.addBatch();
log.debug("Storing transaction: {}", tx);

if ((txRecord.hasTransferList()) && parserProperties.isPersistCryptoTransferAmounts()) {
processNonFeeTransfers(consensusNs, payerAccountId, body, txRecord);
if (body.hasCryptoCreateAccount() && isSuccessful(txRecord)) {
Expand All @@ -509,7 +467,6 @@ public static void storeRecord(Transaction transaction, TransactionRecord txReco
}

// TransactionBody-specific handlers.
// If so-configured, each will update the SQL prepared statements via addBatch().
if (body.hasContractCall()) {
insertContractCall(consensusNs, body.getContractCall(), txRecord);
} else if (body.hasContractCreateInstance()) {
Expand All @@ -523,19 +480,15 @@ public static void storeRecord(Transaction transaction, TransactionRecord txReco
} else if (body.hasFileAppend()) {
insertFileAppend(consensusNs, body.getFileAppend());
} else if (body.hasFileCreate()) {
insertFileCreate(consensusNs, body.getFileCreate(), txRecord);
insertFileData(consensusNs, body.getFileCreate().getContents().toByteArray(),
txRecord.getReceipt().getFileID());
} else if (body.hasFileUpdate()) {
insertFileUpdate(consensusNs, body.getFileUpdate());
}
}

if (batch_count == BATCH_SIZE - 1) {
// execute any remaining batches
executeBatches();
batch_count = 0;
} else {
batch_count += 1;
}
postgresWriter.onTransaction(tx);
log.debug("Storing transaction: {}", tx);
}

/**
Expand Down Expand Up @@ -740,28 +693,21 @@ private static void insertConsensusTopicMessage(ConsensusSubmitMessageTransactio
postgresWriter.onTopicMessage(topicMessage);
}

private static void insertFileCreate(long consensusTimestamp, FileCreateTransactionBody transactionBody,
TransactionRecord transactionRecord) {
private static void insertFileData(long consensusTimestamp, byte[] contents, FileID fileID) {
if (parserProperties.isPersistFiles() ||
(parserProperties.isPersistSystemFiles() && transactionRecord.getReceipt().getFileID()
.getFileNum() < 1000)) {
byte[] contents = transactionBody.getContents().toByteArray();
(parserProperties.isPersistSystemFiles() && fileID.getFileNum() < 1000)) {
postgresWriter.onFileData(new FileData(consensusTimestamp, contents));
}
}

private static void insertFileAppend(long consensusTimestamp, FileAppendTransactionBody transactionBody)
throws IOException {
if (parserProperties.isPersistFiles() ||
(parserProperties.isPersistSystemFiles() && transactionBody.getFileID().getFileNum() < 1000)) {
byte[] contents = transactionBody.getContents().toByteArray();
postgresWriter.onFileData(new FileData(consensusTimestamp, contents));

// update the local address book
if (isFileAddressBook(transactionBody.getFileID())) {
// we have an address book update, refresh the local file
networkAddressBook.append(contents);
}
byte[] contents = transactionBody.getContents().toByteArray();
insertFileData(consensusTimestamp, contents, transactionBody.getFileID());
// update the local address book
if (isFileAddressBook(transactionBody.getFileID())) {
// we have an address book update, refresh the local file
networkAddressBook.append(contents);
}
}

Expand Down Expand Up @@ -911,12 +857,6 @@ private static void insertContractResults(
new ContractResult(consensusTimestamp, functionParams, gasSupplied, callResult, gasUsed));
}

private static void executeBatches() throws SQLException {
int[] transactions = sqlInsertTransaction.executeBatch();
postgresWriter.executeBatches();
log.info("Inserted {} transactions", transactions.length);
}

public static Entities getEntity(AccountID accountID) {
return getEntity(accountID.getShardNum(), accountID.getRealmNum(), accountID.getAccountNum(), "account");
}
Expand Down Expand Up @@ -989,23 +929,4 @@ private static Entities createEntity(Entities entity) {
public enum INIT_RESULT {
OK, FAIL, SKIP
}

enum F_TRANSACTION {
ZERO // column indices start at 1, this creates the necessary offset
, FK_NODE_ACCOUNT_ID, MEMO, VALID_START_NS, TYPE, FK_PAYER_ACCOUNT_ID, RESULT, CONSENSUS_NS,
CUD_ENTITY_ID, CHARGED_TX_FEE, INITIAL_BALANCE, FK_REC_FILE_ID, VALID_DURATION_SECONDS, MAX_FEE,
TRANSACTION_HASH, TRANSACTION_BYTES
}

enum F_FILE_DATA {
ZERO, CONSENSUS_TIMESTAMP, FILE_DATA
}

enum F_CONTRACT_CALL {
ZERO, CONSENSUS_TIMESTAMP, FUNCTION_PARAMS, GAS_SUPPLIED, CALL_RESULT, GAS_USED
}

enum F_LIVEHASH_DATA {
ZERO, CONSENSUS_TIMESTAMP, LIVEHASH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class RecordParserProperties implements ParserProperties {
// bytes on the t_transaction table
private boolean persistTransactionBytes = false;

private final PostgresWriterProperties postgresWriter;

@Override
public Path getStreamPath() {
return mirrorProperties.getDataPath().resolve(getStreamType().getPath());
Expand Down
Loading

0 comments on commit 740b6f5

Please sign in to comment.