Skip to content

Commit

Permalink
rename defaultParseDir to be defaultParseDir_RecordStream; add defaul…
Browse files Browse the repository at this point in the history
…tParseDir_EventStream to config; #1;
  • Loading branch information
QianSwirlds committed Jul 29, 2019
1 parent 7856810 commit bebf7f1
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 351 deletions.
3 changes: 2 additions & 1 deletion config/config.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"secretKey": "",
"downloadPeriodSec": 120,
"downloadToDir": "/MirrorNodeData",
"defaultParseDir": "/MirrorNodeData/recordstreams/valid/",
"defaultParseDir_RecordStream": "/MirrorNodeData/recordstreams/valid/",
"defaultParseDir_EventStream": "/MirrorNodeData/eventstreams/valid/",
"proxyPort": 50777,
"nodeInfoFile": "./config/nodesInfo.json",
"addressBookFile": "./config/0.0.102",
Expand Down
24 changes: 17 additions & 7 deletions src/main/java/com/hedera/configLoader/ConfigLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ public enum CLOUD_PROVIDER {
// download period in seconds
private static long downloadPeriodSec = 120;

// the directory where we store the RecordStream files
private static String downloadToDir = "./recordstreams";
// the directory where we store the files
private static String downloadToDir = "/MirrorNodeData";

// the default directory to be parsed
private static String defaultParseDir = "./recordstreams/valid/";
private static String defaultParseDir_RecordStream = "/MirrorNodeData/recordstreams/valid/";

// the default directory to be parsed
private static String defaultParseDir_EventStream = "/MirrorNodeData/eventstreams/valid/";

// the port of mirrorNodeProxy;
private static int proxyPort = 50777;
Expand Down Expand Up @@ -146,8 +149,11 @@ public ConfigLoader(String configPath) {
if (jsonObject.has("downloadToDir")) {
downloadToDir = jsonObject.get("downloadToDir").getAsString();
}
if (jsonObject.has("defaultParseDir")) {
defaultParseDir = jsonObject.get("defaultParseDir").getAsString();
if (jsonObject.has("defaultParseDir_RecordStream")) {
defaultParseDir_RecordStream = jsonObject.get("defaultParseDir_RecordStream").getAsString();
}
if (jsonObject.has("defaultParseDir_EventStream")) {
defaultParseDir_EventStream = jsonObject.get("defaultParseDir_EventStream").getAsString();
}
if (jsonObject.has("proxyPort")) {
proxyPort = jsonObject.get("proxyPort").getAsInt();
Expand Down Expand Up @@ -245,8 +251,12 @@ public String getDownloadToDir() {
return downloadToDir;
}

public String getDefaultParseDir() {
return defaultParseDir;
public String getDefaultParseDir_RecordStream() {
return defaultParseDir_RecordStream;
}

public String getDefaultParseDir_EventStream() {
return defaultParseDir_EventStream;
}

public int getProxyPort() {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/hedera/mirrorNodeProxy/Utility.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public static byte[] hexToBytes(String data) throws DecoderException {
* @return converted HexString
*/
public static String bytesToHex(byte[] bytes) {
if (bytes == null || bytes.length == 0) return null;
return Hex.encodeHexString(bytes);
}

Expand Down
187 changes: 91 additions & 96 deletions src/main/java/com/hedera/parser/EventStreamFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,89 +55,86 @@ static public boolean loadEventStreamFile(String fileName, String previousFileHa
return false;
}

if (RecordFileLogger.initFile(fileName)) {

try {
long counter = 0;
stream = new FileInputStream(file);
DataInputStream dis = new DataInputStream(stream);

int eventStreamFileVersion = dis.readInt();
try {
long counter = 0;
stream = new FileInputStream(file);
DataInputStream dis = new DataInputStream(stream);

log.info(MARKER, "EventStream file format version " + eventStreamFileVersion);
if (eventStreamFileVersion != EVENT_STREAM_FILE_VERSION) {
log.error(MARKER, "EventStream file format version doesn't match.");
return false;
}
while (dis.available() != 0) {
try {
byte typeDelimiter = dis.readByte();
switch (typeDelimiter) {
case TYPE_PREV_HASH:
byte[] readFileHash = new byte[48];
dis.read(readFileHash);
if (previousFileHash.isEmpty()) {
log.error(MARKER, "Previous file Hash not available");
previousFileHash = Hex.encodeHexString(readFileHash);
} else {
log.info(MARKER, "Previous file Hash = " + previousFileHash);
}
newFileHash = Hex.encodeHexString(readFileHash);
int eventStreamFileVersion = dis.readInt();

if (!newFileHash.contentEquals(previousFileHash)) {
if (configLoader.getStopLoggingIfHashMismatch()) {
log.error(MARKER, "Previous file Hash Mismatch - stopping loading. Previous = {}, Current = {}", previousFileHash, newFileHash);
return false;
}
}
break;

case STREAM_EVENT_START_NO_TRANS_WITH_VERSION:
loadEvent(dis, true);
counter++;
break;
case STREAM_EVENT_START_WITH_VERSION:
loadEvent(dis, false);
counter++;
break;
default:
log.error(LOGM_EXCEPTION, "Exception Unknown record file delimiter {}", typeDelimiter);
}

} catch (Exception e) {
log.error(LOGM_EXCEPTION, "Exception ", e);
return false;
}
}
dis.close();
} catch (FileNotFoundException e) {
log.error(MARKER, "File Not Found Error");
return false;
} catch (IOException e) {
log.error(MARKER, "IOException Error");
return false;
} catch (Exception e) {
log.error(MARKER, "Parsing Error");
log.info(MARKER, "EventStream file format version " + eventStreamFileVersion);
if (eventStreamFileVersion != EVENT_STREAM_FILE_VERSION) {
log.error(MARKER, "EventStream file format version doesn't match.");
return false;
} finally {
}
while (dis.available() != 0) {
try {
if (stream != null)
stream.close();
} catch (IOException ex) {
log.error("Exception in close the stream {}", ex);
return true;
byte typeDelimiter = dis.readByte();
switch (typeDelimiter) {
case TYPE_PREV_HASH:
byte[] readFileHash = new byte[48];
dis.read(readFileHash);
if (previousFileHash.isEmpty()) {
log.error(MARKER, "Previous file Hash not available");
previousFileHash = Hex.encodeHexString(readFileHash);
} else {
log.info(MARKER, "Previous file Hash = " + previousFileHash);
}
newFileHash = Hex.encodeHexString(readFileHash);

if (!Arrays.equals(new byte[48], readFileHash) && !newFileHash.contentEquals(previousFileHash)) {
if (configLoader.getStopLoggingIfHashMismatch()) {
log.error(MARKER, "Previous file Hash Mismatch - stopping loading. fileName = {}, Previous = {}, Current = {}", fileName, previousFileHash, newFileHash);
return false;
}
}
break;

case STREAM_EVENT_START_NO_TRANS_WITH_VERSION:
loadEvent(dis, true);
counter++;
break;
case STREAM_EVENT_START_WITH_VERSION:
loadEvent(dis, false);
counter++;
break;
default:
log.error(LOGM_EXCEPTION, "Exception Unknown record file delimiter {}", typeDelimiter);
}

} catch (Exception e) {
log.error(LOGM_EXCEPTION, "Exception ", e);
return false;
}
}
return true;
} else {
dis.close();
log.info(MARKER,"Loaded {} events successfully from {}", counter, fileName);
} catch (FileNotFoundException e) {
log.error(MARKER, "File Not Found Error");
return false;
} catch (IOException e) {
log.error(MARKER, "IOException Error");
return false;
} catch (Exception e) {
log.error(MARKER, "Parsing Error");
return false;
} finally {
try {
if (stream != null)
stream.close();
} catch (IOException ex) {
log.error(MARKER, "Exception in close the stream {}", ex);
return true;
}
}

return true;
}

static void loadEvent(DataInputStream dis, boolean noTxs) throws IOException {
if (dis.readInt() != STREAM_EVENT_VERSION) {
log.error(MARKER, "EventStream format version doesn't match.");
return;
}
long creatorId = dis.readLong();
long creatorSeq = dis.readLong();
Expand All @@ -147,8 +144,9 @@ static void loadEvent(DataInputStream dis, boolean noTxs) throws IOException {
long otherParentGen = dis.readLong();
byte[] selfParentHash = readNullableByteArray(dis);
byte[] otherParentHash = readNullableByteArray(dis);
Transaction[] transactions = new Transaction[0];
if (!noTxs) {
Transaction[] transactions = Transaction.readArray(dis);
transactions = Transaction.readArray(dis);
}
Instant timeCreated = readInstant(dis);
byte[] signature = readByteArray(dis);
Expand All @@ -161,6 +159,7 @@ static void loadEvent(DataInputStream dis, boolean noTxs) throws IOException {
byte[] hash = readByteArray(dis);
Instant consensusTimeStamp = readInstant(dis);
long consensusOrder = dis.readLong();
log.info(MARKER, "Loaded Event: creatorId: {}, creatorSeq: {}, otherId: {}, otherSeq: {}, selfParentGen: {}, otherParentGen: {}, selfParentHash: {}, otherParentHash: {}, transactions: {}, timeCreated: {}, signature: {}, hash: {}, consensusTimeStamp: {}, consensusOrder: {}", creatorId, creatorSeq, otherId, otherSeq, selfParentGen, otherParentGen, Utility.bytesToHex(selfParentHash), Utility.bytesToHex(otherParentHash), transactions, timeCreated, Utility.bytesToHex(signature), Utility.bytesToHex(hash), consensusTimeStamp, consensusOrder);
}


Expand Down Expand Up @@ -226,41 +225,37 @@ public static void main(String[] args) {

configLoader = new ConfigLoader("./config/config.json");

pathName = configLoader.getDefaultParseDir();
log.info(MARKER, "EventStream files folder got from configuration file: {}", configLoader.getDefaultParseDir());
pathName = configLoader.getDefaultParseDir_EventStream();
log.info(MARKER, "EventStream files folder got from configuration file: {}", configLoader.getDefaultParseDir_EventStream());

if (pathName != null) {

if (RecordFileLogger.start()) {

File file = new File(pathName);
if (file.isFile()) {
log.info(MARKER, "Loading eventStream file {} " + pathName);
loadEventStreamFile(pathName, "");
} else if (file.isDirectory()) { //if it's a directory
File file = new File(pathName);
if (file.isFile()) {
log.info(MARKER, "Loading eventStream file {} ", pathName);
loadEventStreamFile(pathName, "");
} else if (file.isDirectory()) { //if it's a directory

String[] files = file.list(); // get all files under the directory
Arrays.sort(files); // sorted by name (timestamp)
String[] files = file.list(); // get all files under the directory
Arrays.sort(files); // sorted by name (timestamp)

// add director prefix to get full path
List<String> fullPaths = Arrays.asList(files).stream()
.filter(f -> Utility.isEventStreamFile(f))
.map(s -> file + "/" + s)
.collect(Collectors.toList());
// add director prefix to get full path
List<String> fullPaths = Arrays.asList(files).stream()
.filter(f -> Utility.isEventStreamFile(f))
.map(s -> file + "/" + s)
.collect(Collectors.toList());

log.info(MARKER, "Loading eventStream files from directory {} ", pathName);
log.info(MARKER, "Loading eventStream files from directory {} ", pathName);

if (fullPaths != null) {
log.info(MARKER, "Files are " + fullPaths);
loadRecordFiles(fullPaths);
} else {
log.info(MARKER, "No files to parse");
}
if (fullPaths != null) {
log.info(MARKER, "Files are " + fullPaths);
loadRecordFiles(fullPaths);
} else {
log.error(LOGM_EXCEPTION, "Exception file {} does not exist", pathName);

log.info(MARKER, "No files to parse");
}
RecordFileLogger.finish();
} else {
log.error(LOGM_EXCEPTION, "Exception file {} does not exist", pathName);

}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/hedera/parser/RecordFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ public static void main(String[] args) {
configLoader = new ConfigLoader("./config/config.json");
loggerStatus = new LoggerStatus("./config/loggerStatus.json");

pathName = configLoader.getDefaultParseDir();
log.info(MARKER, "Record files folder got from configuration file: {}", configLoader.getDefaultParseDir());
pathName = configLoader.getDefaultParseDir_RecordStream();
log.info(MARKER, "Record files folder got from configuration file: {}", configLoader.getDefaultParseDir_RecordStream());

if (pathName != null) {

Expand Down
Loading

0 comments on commit bebf7f1

Please sign in to comment.