From b2f04ab9ed5c84799017802a963a2fb9d3ae05e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar?= <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 27 Nov 2024 19:10:59 +0100 Subject: [PATCH] Deserialization validation (#1571) --- .../exporter/FromDiskExporterImpl.java | 16 ++++-- .../DeserializationException.java | 15 ++++++ .../LogRecordDataDeserializer.java | 4 +- .../deserializers/MetricDataDeserializer.java | 4 +- .../deserializers/SignalDeserializer.java | 2 +- .../deserializers/SpanDataDeserializer.java | 4 +- .../buffering/internal/storage/Storage.java | 10 ++-- .../internal/storage/files/ReadableFile.java | 32 +++++++----- .../storage/files/reader/ProcessResult.java | 13 +++++ .../storage/responses/ReadableResult.java | 2 +- .../buffering/FromDiskExporterImplTest.java | 11 +++- .../internal/storage/StorageTest.java | 11 ++-- .../storage/files/ReadableFileTest.java | 50 +++++++++++++------ 13 files changed, 123 insertions(+), 51 deletions(-) create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 873ce20c0..19ef6fe2c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -5,8 +5,10 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; import io.opentelemetry.sdk.common.CompletableResultCode; @@ -64,11 +66,15 @@ public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException + " " + deserializer.signalType() + " bytes from storage."); - List telemetry = deserializer.deserialize(bytes); - logger.log( - "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); - CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); - return join.isSuccess(); + try { + List telemetry = deserializer.deserialize(bytes); + logger.log( + "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); + CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); + return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER; + } catch (DeserializationException e) { + return ProcessResult.CONTENT_INVALID; + } }); return result == ReadableResult.SUCCEEDED; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java new file mode 100644 index 000000000..f8e1d9729 --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java @@ -0,0 +1,15 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers; + +import java.io.IOException; + +@SuppressWarnings("serial") +public class DeserializationException extends IOException { + public DeserializationException(Throwable cause) { + super(cause); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java index 22e9d12c3..5ac0007d9 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java @@ -22,11 +22,11 @@ static LogRecordDataDeserializer getInstance() { } @Override - public List deserialize(byte[] source) { + public List deserialize(byte[] source) throws DeserializationException { try { return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source)); } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new DeserializationException(e); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java index 16c672cbc..34e88b3ef 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java @@ -22,11 +22,11 @@ static MetricDataDeserializer getInstance() { } @Override - public List deserialize(byte[] source) { + public List deserialize(byte[] source) throws DeserializationException { try { return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source)); } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new DeserializationException(e); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java index 435c14088..dd56e356e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java @@ -25,7 +25,7 @@ static SignalDeserializer ofLogs() { } /** Deserializes the given byte array into a list of telemetry items. */ - List deserialize(byte[] source); + List deserialize(byte[] source) throws DeserializationException; /** Returns the name of the stored type of signal -- one of "metrics", "spans", or "logs". */ default String signalType() { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java index 03737f8a7..457d5f268 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java @@ -22,11 +22,11 @@ static SpanDataDeserializer getInstance() { } @Override - public List deserialize(byte[] source) { + public List deserialize(byte[] source) throws DeserializationException { try { return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source)); } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new DeserializationException(e); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index f0d074f4e..4ff60cbdc 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -10,6 +10,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; @@ -77,12 +78,13 @@ private boolean write(byte[] item, int attemptNumber) throws IOException { * @param processing Is passed over to {@link ReadableFile#readAndProcess(Function)}. * @throws IOException If an unexpected error happens. */ - public ReadableResult readAndProcess(Function processing) throws IOException { + public ReadableResult readAndProcess(Function processing) + throws IOException { return readAndProcess(processing, 1); } - private ReadableResult readAndProcess(Function processing, int attemptNumber) - throws IOException { + private ReadableResult readAndProcess( + Function processing, int attemptNumber) throws IOException { if (isClosed.get()) { logger.log("Refusing to read from storage after being closed."); return ReadableResult.FAILED; @@ -103,7 +105,7 @@ private ReadableResult readAndProcess(Function processing, int ReadableResult result = readableFile.readAndProcess(processing); switch (result) { case SUCCEEDED: - case PROCESSING_FAILED: + case TRY_LATER: return result; default: // Retry with new file diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java index e88cb0955..c1b92f5ed 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java @@ -9,6 +9,7 @@ import io.opentelemetry.contrib.disk.buffering.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.DelimitedProtoStreamReader; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil; @@ -83,7 +84,7 @@ public ReadableFile( * If the processing function returns TRUE, then the provided line will be deleted from the * source file. If the function returns FALSE, no changes will be applied to the source file. */ - public synchronized ReadableResult readAndProcess(Function processing) + public synchronized ReadableResult readAndProcess(Function processing) throws IOException { if (isClosed.get()) { return ReadableResult.FAILED; @@ -97,20 +98,25 @@ public synchronized ReadableResult readAndProcess(Function proc cleanUp(); return ReadableResult.FAILED; } - if (processing.apply(read.content)) { - unconsumedResult = null; - readBytes += read.totalReadLength; - int amountOfBytesToTransfer = originalFileSize - readBytes; - if (amountOfBytesToTransfer > 0) { - fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer); - } else { + switch (processing.apply(read.content)) { + case SUCCEEDED: + unconsumedResult = null; + readBytes += read.totalReadLength; + int amountOfBytesToTransfer = originalFileSize - readBytes; + if (amountOfBytesToTransfer > 0) { + fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer); + } else { + cleanUp(); + } + return ReadableResult.SUCCEEDED; + case TRY_LATER: + unconsumedResult = read; + return ReadableResult.TRY_LATER; + case CONTENT_INVALID: cleanUp(); - } - return ReadableResult.SUCCEEDED; - } else { - unconsumedResult = read; - return ReadableResult.PROCESSING_FAILED; + return ReadableResult.FAILED; } + return ReadableResult.FAILED; } @Nullable diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java new file mode 100644 index 000000000..696d98d2c --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader; + +/** Result of processing the contents of a file. */ +public enum ProcessResult { + SUCCEEDED, + TRY_LATER, + CONTENT_INVALID +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/responses/ReadableResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/responses/ReadableResult.java index 8448d2a15..295bc2289 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/responses/ReadableResult.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/responses/ReadableResult.java @@ -8,5 +8,5 @@ public enum ReadableResult { SUCCEEDED, FAILED, - PROCESSING_FAILED + TRY_LATER } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index 1fdeca78b..b2955630e 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -10,10 +10,12 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; import io.opentelemetry.sdk.common.Clock; @@ -88,12 +90,19 @@ void verifyStorageFolderIsCreated() { assertThat(new File(rootDir, STORAGE_FOLDER_NAME).exists()).isTrue(); } + @Test + void whenDeserializationFails_returnFalse() throws IOException { + doThrow(DeserializationException.class).when(deserializer).deserialize(any()); + + assertThat(exporter.exportStoredBatch(1, TimeUnit.SECONDS)).isFalse(); + } + private void createDummyFile() throws IOException { File file = new File(rootDir, STORAGE_FOLDER_NAME + "/" + 1000L); Files.write(file.toPath(), singletonList("First line")); } - private void setUpSerializer() { + private void setUpSerializer() throws DeserializationException { deserializer = mock(); when(deserializer.deserialize(any())).thenReturn(deserializedData); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index d3ed0667a..d59c3464f 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -5,7 +5,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.storage; -import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.PROCESSING_FAILED; +import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.TRY_LATER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; @@ -17,6 +17,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import java.io.IOException; @@ -28,7 +29,7 @@ class StorageTest { private FolderManager folderManager; private Storage storage; - private Function processing; + private Function processing; private ReadableFile readableFile; private WritableFile writableFile; @@ -52,11 +53,11 @@ void whenReadingAndProcessingSuccessfully_returnSuccess() throws IOException { } @Test - void whenReadableFileProcessingFails_returnFailed() throws IOException { + void whenReadableFileProcessingFails_returnTryLater() throws IOException { when(folderManager.getReadableFile()).thenReturn(readableFile); - when(readableFile.readAndProcess(processing)).thenReturn(PROCESSING_FAILED); + when(readableFile.readAndProcess(processing)).thenReturn(TRY_LATER); - assertEquals(PROCESSING_FAILED, storage.readAndProcess(processing)); + assertEquals(TRY_LATER, storage.readAndProcess(processing)); verify(readableFile).readAndProcess(processing); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index 134aff051..a7c127edb 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -19,9 +19,11 @@ import io.opentelemetry.api.common.Value; import io.opentelemetry.api.logs.Severity; import io.opentelemetry.contrib.disk.buffering.internal.files.TemporaryFileProvider; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models.LogRecordDataImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; import io.opentelemetry.sdk.common.Clock; @@ -120,7 +122,7 @@ void readSingleItemAndRemoveIt() throws IOException { readableFile.readAndProcess( bytes -> { assertEquals(FIRST_LOG_RECORD, deserialize(bytes)); - return true; + return ProcessResult.SUCCEEDED; }); List logs = getRemainingDataAndClose(readableFile); @@ -132,17 +134,19 @@ void readSingleItemAndRemoveIt() throws IOException { @Test void whenProcessingSucceeds_returnSuccessStatus() throws IOException { - assertEquals(ReadableResult.SUCCEEDED, readableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.SUCCEEDED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); } @Test - void whenProcessingFails_returnProcessFailedStatus() throws IOException { - assertEquals(ReadableResult.PROCESSING_FAILED, readableFile.readAndProcess(bytes -> false)); + void whenProcessingFails_returnTryLaterStatus() throws IOException { + assertEquals( + ReadableResult.TRY_LATER, readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER)); } @Test void deleteTemporaryFileWhenClosing() throws IOException { - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); readableFile.close(); assertFalse(temporaryFile.exists()); @@ -150,8 +154,8 @@ void deleteTemporaryFileWhenClosing() throws IOException { @Test void readMultipleLinesAndRemoveThem() throws IOException { - readableFile.readAndProcess(bytes -> true); - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); List logs = getRemainingDataAndClose(readableFile); @@ -161,7 +165,7 @@ void readMultipleLinesAndRemoveThem() throws IOException { @Test void whenConsumerReturnsFalse_doNotRemoveLineFromSource() throws IOException { - readableFile.readAndProcess(bytes -> false); + readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER); List logs = getRemainingDataAndClose(readableFile); @@ -176,6 +180,15 @@ void whenReadingLastLine_deleteOriginalFile_and_close() throws IOException { assertTrue(readableFile.isClosed()); } + @Test + void whenTheFileContentIsInvalid_deleteOriginalFile_and_close() throws IOException { + assertEquals( + ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.CONTENT_INVALID)); + + assertFalse(source.exists()); + assertTrue(readableFile.isClosed()); + } + @Test void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContentStatus() throws IOException { @@ -188,7 +201,8 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent new ReadableFile( emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir)); - assertEquals(ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); assertTrue(emptyReadableFile.isClosed()); assertFalse(emptyFile.exists()); @@ -198,21 +212,23 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent void whenReadingAfterTheConfiguredReadingTimeExpired_deleteOriginalFile_close_and_returnFileExpiredException() throws IOException { - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); when(clock.now()) .thenReturn(MILLISECONDS.toNanos(CREATED_TIME_MILLIS + MAX_FILE_AGE_FOR_READ_MILLIS)); - assertEquals(ReadableResult.FAILED, readableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); assertTrue(readableFile.isClosed()); } @Test void whenReadingAfterClosed_returnFailedStatus() throws IOException { - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); readableFile.close(); - assertEquals(ReadableResult.FAILED, readableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); } private static List getRemainingDataAndClose(ReadableFile readableFile) @@ -224,7 +240,7 @@ private static List getRemainingDataAndClose(ReadableFile readabl readableFile.readAndProcess( bytes -> { result.add(deserialize(bytes)); - return true; + return ProcessResult.SUCCEEDED; }); } @@ -234,6 +250,10 @@ private static List getRemainingDataAndClose(ReadableFile readabl } private static LogRecordData deserialize(byte[] data) { - return DESERIALIZER.deserialize(data).get(0); + try { + return DESERIALIZER.deserialize(data).get(0); + } catch (DeserializationException e) { + throw new RuntimeException(e); + } } }