Skip to content

Commit

Permalink
fix: PcesReadWriteTests.corruptedEventTest (#17312)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <[email protected]>
  • Loading branch information
lpetrovic05 authored Jan 13, 2025
1 parent f7b6f69 commit 04f8074
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 209 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2024 Hedera Hashgraph, LLC
* Copyright (C) 2016-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,6 @@

import com.hedera.hapi.platform.event.GossipEvent;
import com.swirlds.common.io.IOIterator;
import com.swirlds.common.io.extendable.ExtendableInputStream;
import com.swirlds.common.io.extendable.extensions.CountingStreamExtension;
import com.swirlds.common.io.streams.SerializableDataInputStream;
import com.swirlds.platform.event.AncientMode;
import com.swirlds.platform.event.EventSerializationUtils;
Expand All @@ -41,7 +39,6 @@ public class PcesFileIterator implements IOIterator<PlatformEvent> {
private final AncientMode fileType;
private final SerializableDataInputStream stream;
private boolean hasPartialEvent = false;
private final CountingStreamExtension counter;
private PlatformEvent next;
private boolean streamClosed = false;
private PcesFileVersion fileVersion;
Expand All @@ -60,11 +57,8 @@ public PcesFileIterator(

this.lowerBound = lowerBound;
this.fileType = Objects.requireNonNull(fileType);
counter = new CountingStreamExtension();
stream = new SerializableDataInputStream(new ExtendableInputStream(
new BufferedInputStream(
new FileInputStream(fileDescriptor.getPath().toFile())),
counter));
stream = new SerializableDataInputStream(new BufferedInputStream(
new FileInputStream(fileDescriptor.getPath().toFile())));

try {
final int fileVersionNumber = stream.readInt();
Expand All @@ -74,8 +68,7 @@ public PcesFileIterator(
}
} catch (final EOFException e) {
// Empty file. Possible if the node crashed right after it created this file.
stream.close();
streamClosed = true;
closeFile();
}
}

Expand All @@ -84,37 +77,40 @@ public PcesFileIterator(
*/
private void findNext() throws IOException {
while (next == null && !streamClosed) {

final long initialCount = counter.getCount();
if (stream.available() == 0) {
closeFile();
return;
}

try {
final PlatformEvent candidate =
switch (fileVersion) {
case ORIGINAL -> EventSerializationUtils.deserializePlatformEvent(stream, true);
case PROTOBUF_EVENTS -> {
final GossipEvent gossipEvent = stream.readPbjRecord(GossipEvent.PROTOBUF);
try {
yield new PlatformEvent(gossipEvent);
} catch (final NullPointerException e) {
throw new IOException("GossipEvent read from the file is malformed", e);
}
}
case PROTOBUF_EVENTS -> new PlatformEvent(stream.readPbjRecord(GossipEvent.PROTOBUF));
};
if (candidate.getAncientIndicator(fileType) >= lowerBound) {
next = candidate;
}
} catch (final EOFException e) {
if (counter.getCount() > initialCount) {
// We started parsing an event but couldn't find enough bytes to finish it.
// This is possible (if not likely) when a node is shut down abruptly.
hasPartialEvent = true;
}
stream.close();
streamClosed = true;
} catch (final IOException e) {
// We started parsing an event but couldn't find enough bytes to finish it.
// This is possible (if not likely) when a node is shut down abruptly.
hasPartialEvent = true;
closeFile();
throw e;
} catch (final NullPointerException e) {
// The PlatformEvent constructor can throw this if the event is malformed.
hasPartialEvent = true;
closeFile();
throw new IOException("GossipEvent read from the file is malformed", e);
}
}
}

private void closeFile() throws IOException {
stream.close();
streamClosed = true;
}

/**
* If true then this file contained a partial event. If false then the last event in the file was fully written when
* the file was closed.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2024 Hedera Hashgraph, LLC
* Copyright (C) 2016-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,20 +59,37 @@ public PcesMultiFileIterator(
* Find the next event that should be returned.
*/
private void findNext() throws IOException {
if (currentIterator == null) { // on first call
if (fileIterator.hasNext()) {
currentIterator = new PcesFileIterator(fileIterator.next(), lowerBound, fileType);
} else {
return;
}
}

while (next == null) {
if (currentIterator == null || !currentIterator.hasNext()) {
if (currentIterator != null && currentIterator.hasPartialEvent()) {
truncatedFileCount++;
}

if (!fileIterator.hasNext()) {
break;
}
boolean hasNextEvent = false;
try {
hasNextEvent = currentIterator.hasNext();
} catch (final IOException ignored) {
// ignore the exception and move on to the next file if there is one
}

currentIterator = new PcesFileIterator(fileIterator.next(), lowerBound, fileType);
} else {
if (hasNextEvent) {
next = currentIterator.next();
return;
}

if (currentIterator.hasPartialEvent()) {
truncatedFileCount++;
}

if (!fileIterator.hasNext()) {
return;
}

currentIterator = new PcesFileIterator(fileIterator.next(), lowerBound, fileType);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
* Copyright (C) 2024-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -125,8 +125,7 @@ public static void verifyStream(
assertTrue(eventsIterator.hasNext());
assertEquals(event, eventsIterator.next());
}

assertFalse(eventsIterator.hasNext());
assertFalse(eventsIterator.hasNext(), "There should be no more events");
assertEquals(truncatedFileCount, eventsIterator.getTruncatedFileCount());

// Make sure things look good when iterating starting in the middle of the stream that was written
Expand Down Expand Up @@ -169,11 +168,14 @@ public static void verifyStream(
assertTrue(file.getUpperBound() >= previousMaximum);
previousMaximum = file.getUpperBound();

final IOIterator<PlatformEvent> fileEvents = file.iterator(0);
while (fileEvents.hasNext()) {
final PlatformEvent event = fileEvents.next();
assertTrue(event.getAncientIndicator(ancientMode) >= file.getLowerBound());
assertTrue(event.getAncientIndicator(ancientMode) <= file.getUpperBound());
try (final IOIterator<PlatformEvent> fileEvents = file.iterator(0)) {
while (fileEvents.hasNext()) {
final PlatformEvent event = fileEvents.next();
assertTrue(event.getAncientIndicator(ancientMode) >= file.getLowerBound());
assertTrue(event.getAncientIndicator(ancientMode) <= file.getUpperBound());
}
} catch (final IOException ignored) {
// hasNext() can throw an IOException if the file is truncated, in this case there is nothing to do
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2024 Hedera Hashgraph, LLC
* Copyright (C) 2016-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,7 +49,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -100,7 +99,7 @@ protected static Stream<Arguments> ancientModeArguments() {
return Stream.of(Arguments.of(GENERATION_THRESHOLD), Arguments.of(BIRTH_ROUND_THRESHOLD));
}

protected static Stream<Arguments> ancientAndWriterTypeArguments() {
protected static Stream<Arguments> ancientAndBoolanArguments() {
return Stream.of(
Arguments.of(GENERATION_THRESHOLD, false),
Arguments.of(BIRTH_ROUND_THRESHOLD, false),
Expand All @@ -109,7 +108,7 @@ protected static Stream<Arguments> ancientAndWriterTypeArguments() {
}

@ParameterizedTest
@MethodSource("ancientAndWriterTypeArguments")
@MethodSource("ancientAndBoolanArguments")
@DisplayName("Write Then Read Test")
void writeThenReadTest(@NonNull final AncientMode ancientMode, final boolean useFileChannelWriter)
throws IOException {
Expand Down Expand Up @@ -213,13 +212,7 @@ void readFilesAfterMinimumTest(@NonNull final AncientMode ancientMode) throws IO
iterator.forEachRemaining(deserializedEvents::add);

// We don't want any events with an ancient indicator less than the middle
final Iterator<PlatformEvent> it = events.iterator();
while (it.hasNext()) {
final PlatformEvent event = it.next();
if (event.getAncientIndicator(ancientMode) < middle) {
it.remove();
}
}
events.removeIf(event -> event.getAncientIndicator(ancientMode) < middle);

assertEquals(events.size(), deserializedEvents.size());
for (int i = 0; i < events.size(); i++) {
Expand Down Expand Up @@ -250,72 +243,79 @@ void readEmptyFileTest(@NonNull final AncientMode ancientMode) throws IOExceptio
}

@ParameterizedTest
@MethodSource("ancientModeArguments")
@MethodSource("ancientAndBoolanArguments")
@DisplayName("Truncated Event Test")
void truncatedEventTest(@NonNull final AncientMode ancientMode) throws IOException {
for (final boolean truncateOnBoundary : List.of(true, false)) {
final Random random = RandomUtils.getRandomPrintSeed();

final int numEvents = 100;

final StandardGraphGenerator generator = new StandardGraphGenerator(
ancientMode == GENERATION_THRESHOLD ? DEFAULT_PLATFORM_CONTEXT : BIRTH_ROUND_PLATFORM_CONTEXT,
random.nextLong(),
new StandardEventSource(),
new StandardEventSource(),
new StandardEventSource(),
new StandardEventSource());

final List<PlatformEvent> events = new ArrayList<>();
for (int i = 0; i < numEvents; i++) {
events.add(generator.generateEvent().getBaseEvent());
}
void truncatedEventTest(@NonNull final AncientMode ancientMode, final boolean truncateOnBoundary)
throws IOException {
final Random random = RandomUtils.getRandomPrintSeed();

long upperBound = Long.MIN_VALUE;
for (final PlatformEvent event : events) {
upperBound = Math.max(upperBound, event.getAncientIndicator(ancientMode));
}
final int numEvents = 100;

upperBound += random.nextInt(0, 10);
final StandardGraphGenerator generator = new StandardGraphGenerator(
ancientMode == GENERATION_THRESHOLD ? DEFAULT_PLATFORM_CONTEXT : BIRTH_ROUND_PLATFORM_CONTEXT,
random.nextLong(),
new StandardEventSource(),
new StandardEventSource(),
new StandardEventSource(),
new StandardEventSource());

final PcesFile file = PcesFile.of(
ancientMode,
RandomUtils.randomInstant(random),
random.nextInt(0, 100),
0,
upperBound,
upperBound,
testDirectory);
final List<PlatformEvent> events = new ArrayList<>();
for (int i = 0; i < numEvents; i++) {
events.add(generator.generateEvent().getBaseEvent());
}

final Map<Integer /* event index */, Integer /* last byte position */> byteBoundaries = new HashMap<>();
long upperBound = Long.MIN_VALUE;
for (final PlatformEvent event : events) {
upperBound = Math.max(upperBound, event.getAncientIndicator(ancientMode));
}

final PcesMutableFile mutableFile = file.getMutableFile();
for (int i = 0; i < events.size(); i++) {
final PlatformEvent event = events.get(i);
mutableFile.writeEvent(event);
byteBoundaries.put(i, (int) mutableFile.fileSize());
}
upperBound += random.nextInt(0, 10);

mutableFile.close();
final PcesFile file = PcesFile.of(
ancientMode,
RandomUtils.randomInstant(random),
random.nextInt(0, 100),
0,
upperBound,
upperBound,
testDirectory);

final int lastEventIndex =
random.nextInt(0, events.size() - 2 /* make sure we always truncate at least one event */);
final Map<Integer /* event index */, Integer /* last byte position */> byteBoundaries = new HashMap<>();

final int truncationPosition = byteBoundaries.get(lastEventIndex) + (truncateOnBoundary ? 0 : 1);
final PcesMutableFile mutableFile = file.getMutableFile();
for (int i = 0; i < events.size(); i++) {
final PlatformEvent event = events.get(i);
mutableFile.writeEvent(event);
byteBoundaries.put(i, (int) mutableFile.fileSize());
}

truncateFile(file.getPath(), truncationPosition);
mutableFile.close();

final PcesFileIterator iterator = file.iterator(Long.MIN_VALUE);
final List<PlatformEvent> deserializedEvents = new ArrayList<>();
final int lastEventIndex =
random.nextInt(0, events.size() - 2 /* make sure we always truncate at least one event */);

final int truncationPosition = byteBoundaries.get(lastEventIndex) + (truncateOnBoundary ? 0 : 1);

truncateFile(file.getPath(), truncationPosition);

final PcesFileIterator iterator = file.iterator(Long.MIN_VALUE);
final List<PlatformEvent> deserializedEvents = new ArrayList<>();

if (truncateOnBoundary) {
iterator.forEachRemaining(deserializedEvents::add);
} else {
assertThrows(
IOException.class,
() -> iterator.forEachRemaining(deserializedEvents::add),
"A partial event should have been detected and an IOException should have been thrown");
}

assertEquals(truncateOnBoundary, !iterator.hasPartialEvent());
assertEquals(truncateOnBoundary, !iterator.hasPartialEvent());

assertEquals(lastEventIndex + 1, deserializedEvents.size());
assertEquals(lastEventIndex + 1, deserializedEvents.size());

for (int i = 0; i < deserializedEvents.size(); i++) {
assertEquals(events.get(i), deserializedEvents.get(i));
}
for (int i = 0; i < deserializedEvents.size(); i++) {
assertEquals(events.get(i), deserializedEvents.get(i));
}
}

Expand Down Expand Up @@ -593,8 +593,9 @@ void emptyFileTest(@NonNull final AncientMode ancientMode) throws IOException {
assertTrue(path.toFile().createNewFile());
assertTrue(Files.exists(path));

final PcesFileIterator iterator = file.iterator(Long.MIN_VALUE);
assertFalse(iterator.hasNext());
assertThrows(NoSuchElementException.class, iterator::next);
try (final PcesFileIterator iterator = file.iterator(Long.MIN_VALUE)) {
assertFalse(iterator.hasNext());
assertThrows(NoSuchElementException.class, iterator::next);
}
}
}
Loading

0 comments on commit 04f8074

Please sign in to comment.