Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move file handling logic from RecordItemParser to RecordFileParser #585

Merged
merged 5 commits into from
Mar 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import javax.persistence.Id;
import javax.persistence.Table;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Entity
@AllArgsConstructor
@NoArgsConstructor
@Table(name = "t_record_files")
public class RecordFile {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ public class Transaction implements Persistable<Long> {
@ManyToOne(cascade = CascadeType.PERSIST)
private Entities entity;

// Deprecated, value set to 0 until removed.
@Column(name = "fk_rec_file_id")
@Deprecated(forRemoval = true, since = "v0.7.0")
apeksharma marked this conversation as resolved.
Show resolved Hide resolved
private Long recordFileId;

private Long validStartNs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,9 @@
* ‍
*/

import com.hedera.mirror.importer.exception.ImporterException;

/**
* As items are parsed during processing of streams, the on*() function corresponding to that item (in this interface or
* sub-interface) will be called. Invocation pattern: [...sub-interfaces calls...] [onBatchComplete | onError]
* sub-interface) will be called.
*/
public interface ParsedItemHandler {
/**
* Called after successful parsing of stream file.
*
* @throws ImporterException
*/
void onFileComplete() throws ImporterException;

/**
* Called if an error is encountered during processing of stream file.
*/
void onError(Throwable e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.hedera.mirror.importer.parser;

import java.util.Optional;

import com.hedera.mirror.importer.domain.RecordFile;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.domain.StreamFileData;

public interface RecordStreamFileListener extends StreamFileListener<RecordFile> {
Optional<RecordFile> onStart(StreamFileData streamFileData) throws ImporterException;

void onEnd(RecordFile recordFile) throws ImporterException;

void onError();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.hedera.mirror.importer.parser;

import java.util.Optional;

import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.parser.domain.StreamFileData;

public interface StreamFileListener<T> {
/**
* Called when starting to process a new stream file.
*
* @return non-empty <T> if the file processing should continue; empty to skip the file.
* @throws ImporterException
*/
Optional<T> onStart(StreamFileData streamFileData) throws ImporterException;

void onEnd(T recordFile) throws ImporterException;

/**
* Called if an error is encountered during processing of stream file.
*/
void onError();
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package com.hedera.mirror.importer.parser.record;

import lombok.Data;
import javax.inject.Named;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
import javax.validation.constraints.Min;

@Data
@Named
@Validated
@ConfigurationProperties("hedera.mirror.parser.record.postgresql")
public class PostgresWriterProperties {
/**
* PreparedStatement.executeBatch() is called after every batchSize number of transactions from record stream file.
*/
@Min(1)
private int batchSize = 100;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
* ‍
*/

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Optional;
import javax.inject.Named;
import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;

Expand All @@ -32,17 +36,20 @@
import com.hedera.mirror.importer.domain.FileData;
import com.hedera.mirror.importer.domain.LiveHash;
import com.hedera.mirror.importer.domain.NonFeeTransfer;
import com.hedera.mirror.importer.domain.RecordFile;
import com.hedera.mirror.importer.domain.TopicMessage;
import com.hedera.mirror.importer.domain.Transaction;
import com.hedera.mirror.importer.exception.ImporterException;
import com.hedera.mirror.importer.exception.ParserException;
import com.hedera.mirror.importer.exception.ParserSQLException;

import org.apache.commons.lang3.NotImplementedException;
import com.hedera.mirror.importer.parser.RecordStreamFileListener;
import com.hedera.mirror.importer.parser.domain.StreamFileData;
import com.hedera.mirror.importer.util.Utility;

@Log4j2
@Named
@RequiredArgsConstructor
public class PostgresWritingRecordParsedItemHandler implements RecordParsedItemHandler {
public class PostgresWritingRecordParsedItemHandler implements RecordParsedItemHandler, RecordStreamFileListener {
private long batch_count = 0;
private PreparedStatement sqlInsertTransaction;
private PreparedStatement sqlInsertTransferList;
Expand All @@ -52,8 +59,88 @@ public class PostgresWritingRecordParsedItemHandler implements RecordParsedItemH
private PreparedStatement sqlInsertLiveHashes;
private PreparedStatement sqlInsertTopicMessage;
private final PostgresWriterProperties properties;
private final DataSource dataSource;
apeksharma marked this conversation as resolved.
Show resolved Hide resolved
private Connection connection;

void initSqlStatements(Connection connection) throws ParserSQLException {
@Override
public Optional<RecordFile> onStart(StreamFileData streamFileData) {
String fileName = streamFileData.getFilename();
try {
initConnectionAndStatements();
long fileId;

try (CallableStatement fileCreate = connection.prepareCall("{? = call f_file_create( ? ) }")) {
fileCreate.registerOutParameter(1, Types.BIGINT);
fileCreate.setString(2, fileName);
fileCreate.execute();
fileId = fileCreate.getLong(1);
}

if (fileId == 0) {
log.trace("File {} already exists in the database.", fileName);
closeConnectionAndStatements();
steven-sheehy marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
} else {
log.trace("Added file {} to the database.", fileName);
}
RecordFile recordFile = new RecordFile();
recordFile.setId(fileId);
recordFile.setName(fileName);
return Optional.of(recordFile);
} catch (Exception e) {
closeConnectionAndStatements();
throw new ParserException("Error saving file in database: " + fileName, e);
}
}

@Override
public void onEnd(RecordFile recordFile) {
try (CallableStatement fileClose = connection.prepareCall("{call f_file_complete( ?, ?, ? ) }")) {
executeBatches();

// update the file to processed

fileClose.setLong(1, recordFile.getId());

if (Utility.hashIsEmpty(recordFile.getFileHash())) {
fileClose.setObject(2, null);
} else {
fileClose.setString(2, recordFile.getFileHash());
}

if (Utility.hashIsEmpty(recordFile.getPreviousHash())) {
fileClose.setObject(3, null);
} else {
fileClose.setString(3, recordFile.getPreviousHash());
}

fileClose.execute();
// commit the changes to the database
connection.commit();
closeConnectionAndStatements();
} catch (SQLException e) {
throw new ParserSQLException(e);
}
}

@Override
public void onError() {
try {
connection.rollback();
closeConnectionAndStatements();
} catch (SQLException e) {
log.error("Exception while rolling transaction back", e);
}
}

private void initConnectionAndStatements() throws ParserSQLException {
try {
connection = dataSource.getConnection();
connection.setAutoCommit(false); // do not auto-commit
connection.setClientInfo("ApplicationName", getClass().getName());
} catch (SQLException e) {
throw new ParserSQLException("Error setting up connection to database", e);
}
try {
sqlInsertTransaction = connection.prepareStatement("INSERT INTO t_transactions"
+ " (fk_node_acc_id, memo, valid_start_ns, type, fk_payer_acc_id"
Expand Down Expand Up @@ -90,16 +177,7 @@ void initSqlStatements(Connection connection) throws ParserSQLException {
}
}

public void finish() {
closeStatements();
}

@Override
public void onFileComplete() {
executeBatches();
}

private void closeStatements() {
private void closeConnectionAndStatements() {
try {
sqlInsertTransaction.close();
sqlInsertTransferList.close();
Expand All @@ -108,6 +186,8 @@ private void closeStatements() {
sqlInsertContractResult.close();
sqlInsertLiveHashes.close();
sqlInsertTopicMessage.close();

connection.close();
} catch (SQLException e) {
throw new ParserSQLException("Error closing connection", e);
}
Expand Down Expand Up @@ -143,7 +223,8 @@ public void onTransaction(Transaction transaction) throws ImporterException {
} else {
sqlInsertTransaction.setObject(F_TRANSACTION.CUD_ENTITY_ID.ordinal(), null);
}
sqlInsertTransaction.setLong(F_TRANSACTION.FK_REC_FILE_ID.ordinal(), transaction.getRecordFileId());
sqlInsertTransaction.setLong(
F_TRANSACTION.FK_REC_FILE_ID.ordinal(), 0); // deprecated. set to 0 until removed.
sqlInsertTransaction.setLong(F_TRANSACTION.FK_NODE_ACCOUNT_ID.ordinal(), transaction.getNodeAccountId());
sqlInsertTransaction.setBytes(F_TRANSACTION.MEMO.ordinal(), transaction.getMemo());
sqlInsertTransaction.setLong(F_TRANSACTION.VALID_START_NS.ordinal(), transaction.getValidStartNs());
Expand Down Expand Up @@ -254,11 +335,6 @@ public void onLiveHash(LiveHash liveHash) throws ImporterException {
}
}

@Override
public void onError(Throwable e) {
throw new NotImplementedException("onError not implemented");
}

enum F_TRANSACTION {
ZERO // column indices start at 1, this creates the necessary offset
, FK_NODE_ACCOUNT_ID, MEMO, VALID_START_NS, TYPE, FK_PAYER_ACCOUNT_ID, RESULT, CONSENSUS_NS,
Expand Down
Loading