Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgres Writer #3 : Move transaction and batching #578

Merged
merged 3 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import javax.persistence.JoinColumn;
import javax.persistence.ManyToOne;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.springframework.data.domain.Persistable;

@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "t_transactions")
@ToString(exclude = {"memo", "transactionHash", "transactionBytes"})
public class Transaction implements Persistable<Long> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.hedera.mirror.importer.parser.record;

import lombok.Data;
import javax.inject.Named;

@Data
@Named
public class PostgresWriterProperties {
apeksharma marked this conversation as resolved.
Show resolved Hide resolved
/**
* PreparedStatement.executeBatch() is called after every batchSize number of transactions from record stream file.
*/
private int batchSize = 100;
apeksharma marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.inject.Named;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

import com.hedera.mirror.importer.domain.ContractResult;
Expand All @@ -36,18 +37,31 @@
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.exception.ParserSQLException;

import org.apache.commons.lang3.NotImplementedException;

@Log4j2
@Named
@RequiredArgsConstructor
public class PostgresWritingRecordParsedItemHandler implements RecordParsedItemHandler {
private long batch_count = 0;
private PreparedStatement sqlInsertTransaction;
private PreparedStatement sqlInsertTransferList;
private PreparedStatement sqlInsertNonFeeTransfers;
private PreparedStatement sqlInsertFileData;
private PreparedStatement sqlInsertContractResult;
private PreparedStatement sqlInsertLiveHashes;
private PreparedStatement sqlInsertTopicMessage;
private final PostgresWriterProperties properties;

void initSqlStatements(Connection connection) throws ParserSQLException {
try {
sqlInsertTransaction = connection.prepareStatement("INSERT INTO t_transactions"
apeksharma marked this conversation as resolved.
Show resolved Hide resolved
+ " (fk_node_acc_id, memo, valid_start_ns, type, fk_payer_acc_id"
+ ", result, consensus_ns, fk_cud_entity_id, charged_tx_fee"
+ ", initial_balance, fk_rec_file_id, valid_duration_seconds, max_fee"
+ ", transaction_hash, transaction_bytes)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

sqlInsertTransferList = connection.prepareStatement("INSERT INTO t_cryptotransferlists"
+ " (consensus_timestamp, amount, realm_num, entity_num)"
+ " VALUES (?, ?, ?, ?)");
Expand Down Expand Up @@ -87,6 +101,7 @@ public void onFileComplete() {

private void closeStatements() {
try {
sqlInsertTransaction.close();
sqlInsertTransferList.close();
sqlInsertNonFeeTransfers.close();
sqlInsertFileData.close();
Expand All @@ -100,25 +115,60 @@ private void closeStatements() {

void executeBatches() {
try {
int[] transactions = sqlInsertTransaction.executeBatch();
int[] transferLists = sqlInsertTransferList.executeBatch();
int[] nonFeeTransfers = sqlInsertNonFeeTransfers.executeBatch();
int[] fileData = sqlInsertFileData.executeBatch();
int[] contractResult = sqlInsertContractResult.executeBatch();
int[] liveHashes = sqlInsertLiveHashes.executeBatch();
int[] topicMessages = sqlInsertTopicMessage.executeBatch();
log.info("Inserted {} transfer lists, {} files, {} contracts, {} claims, {} topic messages, " +
log.info("Inserted {} transactions, {} transfer lists, {} files, {} contracts, {} claims, {} topic " +
"messages, " +
"{} non-fee transfers",
transferLists.length, fileData.length, contractResult.length, liveHashes.length,
topicMessages.length, nonFeeTransfers.length);
transactions.length, transferLists.length, fileData.length, contractResult.length,
liveHashes.length, topicMessages.length, nonFeeTransfers.length);
} catch (SQLException e) {
log.error("Error committing sql insert batch ", e);
throw new ParserSQLException(e);
}
batch_count = 0;
}

@Override
public void onTransaction(Transaction transaction) throws ImporterException {
// to be implemented in followup change
try {
// Temporary until we convert SQL statements to repository invocations
if (transaction.getEntity() != null) {
sqlInsertTransaction.setLong(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), transaction.getEntity().getId());
} else {
sqlInsertTransaction.setObject(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), null);
}
sqlInsertTransaction.setLong(F_TRANSACTION.FK_REC_FILE_ID.ordinal(), transaction.getRecordFileId());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_NODE_ACCOUNT_ID.ordinal(), transaction.getNodeAccountId());
sqlInsertTransaction.setBytes(F_TRANSACTION.MEMO.ordinal(), transaction.getMemo());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_START_NS.ordinal(), transaction.getValidStartNs());
sqlInsertTransaction.setInt(F_TRANSACTION.TYPE.ordinal(), transaction.getType());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_DURATION_SECONDS.ordinal(),
transaction.getValidDurationSeconds());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_PAYER_ACCOUNT_ID.ordinal(), transaction.getPayerAccountId());
sqlInsertTransaction.setLong(F_TRANSACTION.RESULT.ordinal(), transaction.getResult());
sqlInsertTransaction.setLong(F_TRANSACTION.CONSENSUS_NS.ordinal(), transaction.getConsensusNs());
sqlInsertTransaction.setLong(F_TRANSACTION.CHARGED_TX_FEE.ordinal(), transaction.getChargedTxFee());
sqlInsertTransaction.setLong(F_TRANSACTION.MAX_FEE.ordinal(), transaction.getMaxFee());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_HASH.ordinal(), transaction.getTransactionHash());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_BYTES.ordinal(), transaction.getTransactionBytes());
sqlInsertTransaction.setLong(F_TRANSACTION.INITIAL_BALANCE.ordinal(), transaction.getInitialBalance());
sqlInsertTransaction.addBatch();

if (batch_count == properties.getBatchSize() - 1) {
// execute any remaining batches
executeBatches();
} else {
batch_count += 1;
}
} catch (SQLException e) {
throw new ParserSQLException(e);
}
}

@Override
Expand Down Expand Up @@ -206,7 +256,14 @@ public void onLiveHash(LiveHash liveHash) throws ImporterException {

@Override
public void onError(Throwable e) {
// to be implemented in followup change
throw new NotImplementedException("onError not implemented");
}

enum F_TRANSACTION {
ZERO // column indices start at 1, this creates the necessary offset
, FK_NODE_ACCOUNT_ID, MEMO, VALID_START_NS, TYPE, FK_PAYER_ACCOUNT_ID, RESULT, CONSENSUS_NS,
CUD_ENTITY_ID, CHARGED_TX_FEE, INITIAL_BALANCE, FK_REC_FILE_ID, VALID_DURATION_SECONDS, MAX_FEE,
TRANSACTION_HASH, TRANSACTION_BYTES
}

enum F_TRANSFERLIST {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.io.IOException;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Set;
Expand All @@ -62,6 +61,7 @@
import com.hedera.mirror.importer.domain.LiveHash;
import com.hedera.mirror.importer.domain.NonFeeTransfer;
import com.hedera.mirror.importer.domain.TopicMessage;
import com.hedera.mirror.importer.exception.ParserSQLException;
import com.hedera.mirror.importer.parser.CommonParserProperties;
import com.hedera.mirror.importer.repository.EntityRepository;
import com.hedera.mirror.importer.repository.EntityTypeRepository;
Expand All @@ -81,10 +81,6 @@ public class RecordFileLogger {
private static PostgresWritingRecordParsedItemHandler postgresWriter;

private static long fileId = 0;
private static long BATCH_SIZE = 100;
private static long batch_count = 0;

private static PreparedStatement sqlInsertTransaction;

public RecordFileLogger(CommonParserProperties commonParserProperties, RecordParserProperties parserProperties,
NetworkAddressBook networkAddressBook, EntityRepository entityRepository,
Expand All @@ -104,13 +100,7 @@ static long getFileId() {
return fileId;
}

static void setBatchSize(long batchSize) {
BATCH_SIZE = batchSize;
}

public static boolean start() {
batch_count = 0;

connect = DatabaseUtilities.openDatabase(connect);

if (connect == null) {
Expand All @@ -124,29 +114,18 @@ public static boolean start() {
log.error("Unable to set connection to not auto commit", e);
return false;
}

try {
sqlInsertTransaction = connect.prepareStatement("INSERT INTO t_transactions"
+ " (fk_node_acc_id, memo, valid_start_ns, type, fk_payer_acc_id"
+ ", result, consensus_ns, fk_cud_entity_id, charged_tx_fee"
+ ", initial_balance, fk_rec_file_id, valid_duration_seconds, max_fee"
+ ", transaction_hash, transaction_bytes)"
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

postgresWriter.initSqlStatements(connect);
} catch (SQLException e) {
} catch (ParserSQLException e) {
log.error("Unable to prepare SQL statements", e);
return false;
}

return true;
}

public static boolean finish() {
try {
sqlInsertTransaction.close();
postgresWriter.finish();

connect = DatabaseUtilities.closeDatabase(connect);
return false;
} catch (SQLException e) {
Expand Down Expand Up @@ -181,8 +160,6 @@ public static INIT_RESULT initFile(String fileName) {

public static void completeFile(String fileHash, String previousHash) throws SQLException {
try (CallableStatement fileClose = connect.prepareCall("{call f_file_complete( ?, ?, ? ) }")) {
// execute any remaining batches
executeBatches();
postgresWriter.onFileComplete();

// update the file to processed
Expand Down Expand Up @@ -468,36 +445,17 @@ public static void storeRecord(Transaction transaction, TransactionRecord txReco
}
entity.setAutoRenewAccount(createEntity(entity.getAutoRenewAccount()));
entity = entityRepository.save(entity);
sqlInsertTransaction.setLong(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), entity.getId());
} else if (entityId != null) {
sqlInsertTransaction.setObject(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), entityId.getId());
} else {
sqlInsertTransaction.setObject(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), null);
entity = new Entities();
entity.setId(entityId.getId());
}
tx.setEntity(entity);

EntityId payerEntityId = getEntityId(payerAccountId);
EntityId nodeEntityId = getEntityId(body.getNodeAccountID());
tx.setNodeAccountId(nodeEntityId.getId());
tx.setPayerAccountId(payerEntityId.getId());

// Temporary until we convert SQL statements to repository invocations
sqlInsertTransaction.setLong(F_TRANSACTION.FK_NODE_ACCOUNT_ID.ordinal(), tx.getNodeAccountId());
sqlInsertTransaction.setBytes(F_TRANSACTION.MEMO.ordinal(), tx.getMemo());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_START_NS.ordinal(), tx.getValidStartNs());
sqlInsertTransaction.setInt(F_TRANSACTION.TYPE.ordinal(), tx.getType());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_REC_FILE_ID.ordinal(), tx.getRecordFileId());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_DURATION_SECONDS.ordinal(), tx.getValidDurationSeconds());
sqlInsertTransaction.setLong(F_TRANSACTION.FK_PAYER_ACCOUNT_ID.ordinal(), tx.getPayerAccountId());
sqlInsertTransaction.setLong(F_TRANSACTION.RESULT.ordinal(), tx.getResult());
sqlInsertTransaction.setLong(F_TRANSACTION.CONSENSUS_NS.ordinal(), tx.getConsensusNs());
sqlInsertTransaction.setLong(F_TRANSACTION.CHARGED_TX_FEE.ordinal(), tx.getChargedTxFee());
sqlInsertTransaction.setLong(F_TRANSACTION.MAX_FEE.ordinal(), tx.getMaxFee());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_HASH.ordinal(), tx.getTransactionHash());
sqlInsertTransaction.setBytes(F_TRANSACTION.TRANSACTION_BYTES.ordinal(), tx.getTransactionBytes());
sqlInsertTransaction.setLong(F_TRANSACTION.INITIAL_BALANCE.ordinal(), tx.getInitialBalance());
sqlInsertTransaction.addBatch();
log.debug("Storing transaction: {}", tx);

if ((txRecord.hasTransferList()) && parserProperties.isPersistCryptoTransferAmounts()) {
processNonFeeTransfers(consensusNs, payerAccountId, body, txRecord);
if (body.hasCryptoCreateAccount() && isSuccessful(txRecord)) {
Expand All @@ -509,7 +467,6 @@ public static void storeRecord(Transaction transaction, TransactionRecord txReco
}

// TransactionBody-specific handlers.
// If so-configured, each will update the SQL prepared statements via addBatch().
if (body.hasContractCall()) {
insertContractCall(consensusNs, body.getContractCall(), txRecord);
} else if (body.hasContractCreateInstance()) {
Expand All @@ -523,19 +480,15 @@ public static void storeRecord(Transaction transaction, TransactionRecord txReco
} else if (body.hasFileAppend()) {
insertFileAppend(consensusNs, body.getFileAppend());
} else if (body.hasFileCreate()) {
insertFileCreate(consensusNs, body.getFileCreate(), txRecord);
insertFileData(consensusNs, body.getFileCreate().getContents().toByteArray(),
txRecord.getReceipt().getFileID());
} else if (body.hasFileUpdate()) {
insertFileUpdate(consensusNs, body.getFileUpdate());
}
}

if (batch_count == BATCH_SIZE - 1) {
// execute any remaining batches
executeBatches();
batch_count = 0;
} else {
batch_count += 1;
}
postgresWriter.onTransaction(tx);
log.debug("Storing transaction: {}", tx);
}

/**
Expand Down Expand Up @@ -740,28 +693,21 @@ private static void insertConsensusTopicMessage(ConsensusSubmitMessageTransactio
postgresWriter.onTopicMessage(topicMessage);
}

private static void insertFileCreate(long consensusTimestamp, FileCreateTransactionBody transactionBody,
TransactionRecord transactionRecord) {
private static void insertFileData(long consensusTimestamp, byte[] contents, FileID fileID) {
if (parserProperties.isPersistFiles() ||
(parserProperties.isPersistSystemFiles() && transactionRecord.getReceipt().getFileID()
.getFileNum() < 1000)) {
byte[] contents = transactionBody.getContents().toByteArray();
(parserProperties.isPersistSystemFiles() && fileID.getFileNum() < 1000)) {
postgresWriter.onFileData(new FileData(consensusTimestamp, contents));
}
}

private static void insertFileAppend(long consensusTimestamp, FileAppendTransactionBody transactionBody)
throws IOException {
if (parserProperties.isPersistFiles() ||
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
(parserProperties.isPersistSystemFiles() && transactionBody.getFileID().getFileNum() < 1000)) {
byte[] contents = transactionBody.getContents().toByteArray();
postgresWriter.onFileData(new FileData(consensusTimestamp, contents));

// update the local address book
if (isFileAddressBook(transactionBody.getFileID())) {
// we have an address book update, refresh the local file
networkAddressBook.append(contents);
}
byte[] contents = transactionBody.getContents().toByteArray();
insertFileData(consensusTimestamp, contents, transactionBody.getFileID());
// update the local address book
if (isFileAddressBook(transactionBody.getFileID())) {
// we have an address book update, refresh the local file
networkAddressBook.append(contents);
}
}

Expand Down Expand Up @@ -911,12 +857,6 @@ private static void insertContractResults(
new ContractResult(consensusTimestamp, functionParams, gasSupplied, callResult, gasUsed));
}

private static void executeBatches() throws SQLException {
int[] transactions = sqlInsertTransaction.executeBatch();
postgresWriter.executeBatches();
log.info("Inserted {} transactions", transactions.length);
}

public static Entities getEntity(AccountID accountID) {
return getEntity(accountID.getShardNum(), accountID.getRealmNum(), accountID.getAccountNum(), "account");
}
Expand Down Expand Up @@ -989,23 +929,4 @@ private static Entities createEntity(Entities entity) {
public enum INIT_RESULT {
OK, FAIL, SKIP
}

enum F_TRANSACTION {
ZERO // column indices start at 1, this creates the necessary offset
, FK_NODE_ACCOUNT_ID, MEMO, VALID_START_NS, TYPE, FK_PAYER_ACCOUNT_ID, RESULT, CONSENSUS_NS,
CUD_ENTITY_ID, CHARGED_TX_FEE, INITIAL_BALANCE, FK_REC_FILE_ID, VALID_DURATION_SECONDS, MAX_FEE,
TRANSACTION_HASH, TRANSACTION_BYTES
}

enum F_FILE_DATA {
ZERO, CONSENSUS_TIMESTAMP, FILE_DATA
}

enum F_CONTRACT_CALL {
ZERO, CONSENSUS_TIMESTAMP, FUNCTION_PARAMS, GAS_SUPPLIED, CALL_RESULT, GAS_USED
}

enum F_LIVEHASH_DATA {
ZERO, CONSENSUS_TIMESTAMP, LIVEHASH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class RecordParserProperties implements ParserProperties {
// bytes on the t_transaction table
private boolean persistTransactionBytes = false;

private final PostgresWriterProperties postgresWriter;
apeksharma marked this conversation as resolved.
Show resolved Hide resolved

@Override
public Path getStreamPath() {
return mirrorProperties.getDataPath().resolve(getStreamType().getPath());
Expand Down
Loading