Skip to content

Commit

Permalink
Postgres Writer #2 (#576)
Browse files Browse the repository at this point in the history
- Move topic messages, contract result, file data, and live hashes from
  RecordFileLogger to PostgresWritingRecordParsedItemHandler
- Logic for Transaction and batching will be moved together in followup

Partially fixes #566 

Signed-off-by: Apekshit Sharma <[email protected]>
  • Loading branch information
apeksharma authored Mar 2, 2020
1 parent 91f5590 commit 1e04dbf
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "t_contract_result")
public class ContractResult {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "t_file_data")
public class FileData {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
@Table(name = "t_livehashes")
public class LiveHash {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@

import javax.persistence.Entity;
import javax.persistence.Id;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@NoArgsConstructor
@AllArgsConstructor
public class TopicMessage {

@Id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
public class PostgresWritingRecordParsedItemHandler implements RecordParsedItemHandler {
private PreparedStatement sqlInsertTransferList;
private PreparedStatement sqlInsertNonFeeTransfers;
private PreparedStatement sqlInsertFileData;
private PreparedStatement sqlInsertContractResult;
private PreparedStatement sqlInsertLiveHashes;
private PreparedStatement sqlInsertTopicMessage;

void initSqlStatements(Connection connection) throws ParserSQLException {
try {
Expand All @@ -51,6 +55,22 @@ void initSqlStatements(Connection connection) throws ParserSQLException {
sqlInsertNonFeeTransfers = connection.prepareStatement("insert into non_fee_transfers"
+ " (consensus_timestamp, amount, realm_num, entity_num)"
+ " values (?, ?, ?, ?)");

sqlInsertFileData = connection.prepareStatement("INSERT INTO t_file_data"
+ " (consensus_timestamp, file_data)"
+ " VALUES (?, ?)");

sqlInsertContractResult = connection.prepareStatement("INSERT INTO t_contract_result"
+ " (consensus_timestamp, function_params, gas_supplied, call_result, gas_used)"
+ " VALUES (?, ?, ?, ?, ?)");

sqlInsertLiveHashes = connection.prepareStatement("INSERT INTO t_livehashes"
+ " (consensus_timestamp, livehash)"
+ " VALUES (?, ?)");

sqlInsertTopicMessage = connection.prepareStatement("insert into topic_message"
+ " (consensus_timestamp, realm_num, topic_num, message, running_hash, sequence_number)"
+ " values (?, ?, ?, ?, ?, ?)");
} catch (SQLException e) {
throw new ParserSQLException("Unable to prepare SQL statements", e);
}
Expand All @@ -69,6 +89,10 @@ private void closeStatements() {
try {
sqlInsertTransferList.close();
sqlInsertNonFeeTransfers.close();
sqlInsertFileData.close();
sqlInsertContractResult.close();
sqlInsertLiveHashes.close();
sqlInsertTopicMessage.close();
} catch (SQLException e) {
throw new ParserSQLException("Error closing connection", e);
}
Expand All @@ -78,7 +102,14 @@ void executeBatches() {
try {
int[] transferLists = sqlInsertTransferList.executeBatch();
int[] nonFeeTransfers = sqlInsertNonFeeTransfers.executeBatch();
log.info("Inserted {} transfer lists, {} non-fee transfers", transferLists.length, nonFeeTransfers.length);
int[] fileData = sqlInsertFileData.executeBatch();
int[] contractResult = sqlInsertContractResult.executeBatch();
int[] liveHashes = sqlInsertLiveHashes.executeBatch();
int[] topicMessages = sqlInsertTopicMessage.executeBatch();
log.info("Inserted {} transfer lists, {} files, {} contracts, {} claims, {} topic messages, " +
"{} non-fee transfers",
transferLists.length, fileData.length, contractResult.length, liveHashes.length,
topicMessages.length, nonFeeTransfers.length);
} catch (SQLException e) {
log.error("Error committing sql insert batch ", e);
throw new ParserSQLException(e);
Expand Down Expand Up @@ -120,22 +151,57 @@ public void onNonFeeTransfer(NonFeeTransfer nonFeeTransfer) throws ImporterExcep

@Override
public void onTopicMessage(TopicMessage topicMessage) throws ImporterException {
// to be implemented in followup change
try {
sqlInsertTopicMessage.setLong(F_TOPICMESSAGE.CONSENSUS_TIMESTAMP.ordinal(),
topicMessage.getConsensusTimestamp());
sqlInsertTopicMessage.setShort(F_TOPICMESSAGE.REALM_NUM.ordinal(), (short) topicMessage.getRealmNum());
sqlInsertTopicMessage.setInt(F_TOPICMESSAGE.TOPIC_NUM.ordinal(), topicMessage.getTopicNum());
sqlInsertTopicMessage.setBytes(F_TOPICMESSAGE.MESSAGE.ordinal(), topicMessage.getMessage());
sqlInsertTopicMessage.setBytes(F_TOPICMESSAGE.RUNNING_HASH.ordinal(), topicMessage.getRunningHash());
sqlInsertTopicMessage.setLong(F_TOPICMESSAGE.SEQUENCE_NUMBER.ordinal(), topicMessage.getSequenceNumber());
sqlInsertTopicMessage.addBatch();
} catch (SQLException e) {
throw new ParserSQLException(e);
}
}

@Override
public void onContractResult(ContractResult contractResult) throws ImporterException {
// to be implemented in followup change
try {
sqlInsertContractResult.setLong(F_CONTRACT_RESULT.CONSENSUS_TIMESTAMP.ordinal(),
contractResult.getConsensusTimestamp());
sqlInsertContractResult
.setBytes(F_CONTRACT_RESULT.FUNCTION_PARAMS.ordinal(), contractResult.getFunctionParameters());
sqlInsertContractResult.setLong(F_CONTRACT_RESULT.GAS_SUPPLIED.ordinal(), contractResult.getGasSupplied());
sqlInsertContractResult.setBytes(F_CONTRACT_RESULT.CALL_RESULT.ordinal(), contractResult.getCallResult());
sqlInsertContractResult.setLong(F_CONTRACT_RESULT.GAS_USED.ordinal(), contractResult.getGasUsed());
sqlInsertContractResult.addBatch();
} catch (SQLException e) {
throw new ParserSQLException(e);
}
}

@Override
public void onFileData(FileData fileData) throws ImporterException {
// to be implemented in followup change
try {
sqlInsertFileData.setLong(F_FILE_DATA.CONSENSUS_TIMESTAMP.ordinal(), fileData.getConsensusTimestamp());
sqlInsertFileData.setBytes(F_FILE_DATA.FILE_DATA.ordinal(), fileData.getFileData());
sqlInsertFileData.addBatch();
} catch (SQLException e) {
throw new ParserSQLException(e);
}
}

@Override
public void onLiveHash(LiveHash liveHash) throws ImporterException {
// to be implemented in followup change
try {
sqlInsertLiveHashes
.setLong(F_LIVEHASHES.CONSENSUS_TIMESTAMP.ordinal(), liveHash.getConsensusTimestamp());
sqlInsertLiveHashes.setBytes(F_LIVEHASHES.LIVEHASH.ordinal(), liveHash.getLivehash());
sqlInsertLiveHashes.addBatch();
} catch (SQLException e) {
throw new ParserSQLException(e);
}
}

@Override
Expand All @@ -152,4 +218,21 @@ enum F_NONFEETRANSFER {
ZERO // column indices start at 1, this creates the necessary offset
, CONSENSUS_TIMESTAMP, AMOUNT, REALM_NUM, ENTITY_NUM
}

enum F_TOPICMESSAGE {
ZERO // column indices start at 1, this creates the necessary offset
, CONSENSUS_TIMESTAMP, REALM_NUM, TOPIC_NUM, MESSAGE, RUNNING_HASH, SEQUENCE_NUMBER
}

enum F_FILE_DATA {
ZERO, CONSENSUS_TIMESTAMP, FILE_DATA
}

enum F_CONTRACT_RESULT {
ZERO, CONSENSUS_TIMESTAMP, FUNCTION_PARAMS, GAS_SUPPLIED, CALL_RESULT, GAS_USED
}

enum F_LIVEHASHES {
ZERO, CONSENSUS_TIMESTAMP, LIVEHASH
}
}
Loading

0 comments on commit 1e04dbf

Please sign in to comment.