From 3aebaa9bf028fddbff58e11fa1063c35aab3fe44 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:11:36 +0100 Subject: [PATCH 1/5] Using enum to denote more than two outcomes from reading a signal file --- .../exporter/FromDiskExporterImpl.java | 15 ++++++--- .../buffering/internal/storage/Storage.java | 5 +-- .../internal/storage/files/ReadableFile.java | 32 ++++++++++-------- .../storage/files/reader/ProcessResult.java | 8 +++++ .../internal/storage/StorageTest.java | 3 +- .../storage/files/ReadableFileTest.java | 33 +++++++++++-------- 6 files changed, 62 insertions(+), 34 deletions(-) create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 873ce20c0..202eee91b 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -7,6 +7,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; import io.opentelemetry.sdk.common.CompletableResultCode; @@ -64,11 +65,15 @@ public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException + " " + deserializer.signalType() + " bytes from storage."); - List telemetry = deserializer.deserialize(bytes); - logger.log( - "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); - CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); - return join.isSuccess(); + try { + List telemetry = deserializer.deserialize(bytes); + logger.log( + "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); + CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); + return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER; + } catch (IllegalArgumentException e) { + return ProcessResult.CONTENT_INVALID; + } }); return result == ReadableResult.SUCCEEDED; } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index f0d074f4e..3a2430bd7 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -10,6 +10,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger; @@ -77,11 +78,11 @@ private boolean write(byte[] item, int attemptNumber) throws IOException { * @param processing Is passed over to {@link ReadableFile#readAndProcess(Function)}. * @throws IOException If an unexpected error happens. */ - public ReadableResult readAndProcess(Function processing) throws IOException { + public ReadableResult readAndProcess(Function processing) throws IOException { return readAndProcess(processing, 1); } - private ReadableResult readAndProcess(Function processing, int attemptNumber) + private ReadableResult readAndProcess(Function processing, int attemptNumber) throws IOException { if (isClosed.get()) { logger.log("Refusing to read from storage after being closed."); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java index e88cb0955..934b9ac0e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java @@ -9,6 +9,7 @@ import io.opentelemetry.contrib.disk.buffering.StorageConfiguration; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.DelimitedProtoStreamReader; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil; @@ -83,7 +84,7 @@ public ReadableFile( * If the processing function returns TRUE, then the provided line will be deleted from the * source file. If the function returns FALSE, no changes will be applied to the source file. */ - public synchronized ReadableResult readAndProcess(Function processing) + public synchronized ReadableResult readAndProcess(Function processing) throws IOException { if (isClosed.get()) { return ReadableResult.FAILED; @@ -97,20 +98,25 @@ public synchronized ReadableResult readAndProcess(Function proc cleanUp(); return ReadableResult.FAILED; } - if (processing.apply(read.content)) { - unconsumedResult = null; - readBytes += read.totalReadLength; - int amountOfBytesToTransfer = originalFileSize - readBytes; - if (amountOfBytesToTransfer > 0) { - fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer); - } else { + switch (processing.apply(read.content)) { + case SUCCEEDED: + unconsumedResult = null; + readBytes += read.totalReadLength; + int amountOfBytesToTransfer = originalFileSize - readBytes; + if (amountOfBytesToTransfer > 0) { + fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer); + } else { + cleanUp(); + } + return ReadableResult.SUCCEEDED; + case TRY_LATER: + unconsumedResult = read; + return ReadableResult.PROCESSING_FAILED; + case CONTENT_INVALID: cleanUp(); - } - return ReadableResult.SUCCEEDED; - } else { - unconsumedResult = read; - return ReadableResult.PROCESSING_FAILED; + return ReadableResult.PROCESSING_FAILED; } + return ReadableResult.FAILED; } @Nullable diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java new file mode 100644 index 000000000..bcd43760e --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java @@ -0,0 +1,8 @@ +package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader; + +/** Result of processing the contents of a file. */ +public enum ProcessResult { + SUCCEEDED, + TRY_LATER, + CONTENT_INVALID +} diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java index d3ed0667a..dd5e2ce13 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/StorageTest.java @@ -17,6 +17,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult; import java.io.IOException; @@ -28,7 +29,7 @@ class StorageTest { private FolderManager folderManager; private Storage storage; - private Function processing; + private Function processing; private ReadableFile readableFile; private WritableFile writableFile; diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index 134aff051..71687871a 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models.LogRecordDataImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; +import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult; import io.opentelemetry.contrib.disk.buffering.testutils.TestData; import io.opentelemetry.sdk.common.Clock; @@ -120,7 +121,7 @@ void readSingleItemAndRemoveIt() throws IOException { readableFile.readAndProcess( bytes -> { assertEquals(FIRST_LOG_RECORD, deserialize(bytes)); - return true; + return ProcessResult.SUCCEEDED; }); List logs = getRemainingDataAndClose(readableFile); @@ -132,17 +133,20 @@ void readSingleItemAndRemoveIt() throws IOException { @Test void whenProcessingSucceeds_returnSuccessStatus() throws IOException { - assertEquals(ReadableResult.SUCCEEDED, readableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.SUCCEEDED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); } @Test void whenProcessingFails_returnProcessFailedStatus() throws IOException { - assertEquals(ReadableResult.PROCESSING_FAILED, readableFile.readAndProcess(bytes -> false)); + assertEquals( + ReadableResult.PROCESSING_FAILED, + readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER)); } @Test void deleteTemporaryFileWhenClosing() throws IOException { - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); readableFile.close(); assertFalse(temporaryFile.exists()); @@ -150,8 +154,8 @@ void deleteTemporaryFileWhenClosing() throws IOException { @Test void readMultipleLinesAndRemoveThem() throws IOException { - readableFile.readAndProcess(bytes -> true); - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); List logs = getRemainingDataAndClose(readableFile); @@ -161,7 +165,7 @@ void readMultipleLinesAndRemoveThem() throws IOException { @Test void whenConsumerReturnsFalse_doNotRemoveLineFromSource() throws IOException { - readableFile.readAndProcess(bytes -> false); + readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER); List logs = getRemainingDataAndClose(readableFile); @@ -188,7 +192,8 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent new ReadableFile( emptyFile, CREATED_TIME_MILLIS, clock, getConfiguration(temporaryFileProvider, dir)); - assertEquals(ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.FAILED, emptyReadableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); assertTrue(emptyReadableFile.isClosed()); assertFalse(emptyFile.exists()); @@ -198,21 +203,23 @@ void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContent void whenReadingAfterTheConfiguredReadingTimeExpired_deleteOriginalFile_close_and_returnFileExpiredException() throws IOException { - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); when(clock.now()) .thenReturn(MILLISECONDS.toNanos(CREATED_TIME_MILLIS + MAX_FILE_AGE_FOR_READ_MILLIS)); - assertEquals(ReadableResult.FAILED, readableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); assertTrue(readableFile.isClosed()); } @Test void whenReadingAfterClosed_returnFailedStatus() throws IOException { - readableFile.readAndProcess(bytes -> true); + readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED); readableFile.close(); - assertEquals(ReadableResult.FAILED, readableFile.readAndProcess(bytes -> true)); + assertEquals( + ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.SUCCEEDED)); } private static List getRemainingDataAndClose(ReadableFile readableFile) @@ -224,7 +231,7 @@ private static List getRemainingDataAndClose(ReadableFile readabl readableFile.readAndProcess( bytes -> { result.add(deserialize(bytes)); - return true; + return ProcessResult.SUCCEEDED; }); } From 4b10c44e835b36c24d52cc702ea4d4009b3815c7 Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:22:55 +0100 Subject: [PATCH 2/5] Renaming enum cases --- .../disk/buffering/internal/storage/Storage.java | 2 +- .../internal/storage/files/ReadableFile.java | 4 ++-- .../internal/storage/responses/ReadableResult.java | 2 +- .../buffering/internal/storage/StorageTest.java | 8 ++++---- .../internal/storage/files/ReadableFileTest.java | 13 +++++++++++-- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index 3a2430bd7..c64f7d0d3 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -104,7 +104,7 @@ private ReadableResult readAndProcess(Function processing ReadableResult result = readableFile.readAndProcess(processing); switch (result) { case SUCCEEDED: - case PROCESSING_FAILED: + case TRY_LATER: return result; default: // Retry with new file diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java index 934b9ac0e..c1b92f5ed 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFile.java @@ -111,10 +111,10 @@ public synchronized ReadableResult readAndProcess(Function ProcessResult.TRY_LATER)); } @@ -180,6 +180,15 @@ void whenReadingLastLine_deleteOriginalFile_and_close() throws IOException { assertTrue(readableFile.isClosed()); } + @Test + void whenTheFileContentIsInvalid_deleteOriginalFile_and_close() throws IOException { + assertEquals( + ReadableResult.FAILED, readableFile.readAndProcess(bytes -> ProcessResult.CONTENT_INVALID)); + + assertFalse(source.exists()); + assertTrue(readableFile.isClosed()); + } + @Test void whenNoMoreLinesAvailableToRead_deleteOriginalFile_close_and_returnNoContentStatus() throws IOException { From 7dcfac339a07fa57f5c483c2719eb2aa441cd17e Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:43:33 +0100 Subject: [PATCH 3/5] Creating DeserializationException --- .../internal/exporter/FromDiskExporterImpl.java | 3 ++- .../deserializers/DeserializationException.java | 10 ++++++++++ .../deserializers/LogRecordDataDeserializer.java | 4 ++-- .../deserializers/MetricDataDeserializer.java | 4 ++-- .../deserializers/SignalDeserializer.java | 2 +- .../deserializers/SpanDataDeserializer.java | 4 ++-- .../disk/buffering/FromDiskExporterImplTest.java | 14 +++++++++++++- .../internal/storage/files/ReadableFileTest.java | 10 +++++++--- 8 files changed, 39 insertions(+), 12 deletions(-) create mode 100644 disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java index 202eee91b..19ef6fe2c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/exporter/FromDiskExporterImpl.java @@ -5,6 +5,7 @@ package io.opentelemetry.contrib.disk.buffering.internal.exporter; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage; import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult; @@ -71,7 +72,7 @@ public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException "Now exporting batch of " + telemetry.size() + " " + deserializer.signalType()); CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit); return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER; - } catch (IllegalArgumentException e) { + } catch (DeserializationException e) { return ProcessResult.CONTENT_INVALID; } }); diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java new file mode 100644 index 000000000..192ae71bd --- /dev/null +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java @@ -0,0 +1,10 @@ +package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers; + +import java.io.IOException; + +@SuppressWarnings("serial") +public class DeserializationException extends IOException { + public DeserializationException(Throwable cause) { + super(cause); + } +} diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java index 22e9d12c3..5ac0007d9 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/LogRecordDataDeserializer.java @@ -22,11 +22,11 @@ static LogRecordDataDeserializer getInstance() { } @Override - public List deserialize(byte[] source) { + public List deserialize(byte[] source) throws DeserializationException { try { return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source)); } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new DeserializationException(e); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java index 16c672cbc..34e88b3ef 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/MetricDataDeserializer.java @@ -22,11 +22,11 @@ static MetricDataDeserializer getInstance() { } @Override - public List deserialize(byte[] source) { + public List deserialize(byte[] source) throws DeserializationException { try { return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source)); } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new DeserializationException(e); } } diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java index 435c14088..dd56e356e 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SignalDeserializer.java @@ -25,7 +25,7 @@ static SignalDeserializer ofLogs() { } /** Deserializes the given byte array into a list of telemetry items. */ - List deserialize(byte[] source); + List deserialize(byte[] source) throws DeserializationException; /** Returns the name of the stored type of signal -- one of "metrics", "spans", or "logs". */ default String signalType() { diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java index 03737f8a7..457d5f268 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/SpanDataDeserializer.java @@ -22,11 +22,11 @@ static SpanDataDeserializer getInstance() { } @Override - public List deserialize(byte[] source) { + public List deserialize(byte[] source) throws DeserializationException { try { return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source)); } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new DeserializationException(e); } } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index 1fdeca78b..aea99bbf5 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -14,6 +14,7 @@ import static org.mockito.Mockito.when; import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData; import io.opentelemetry.sdk.common.Clock; @@ -88,12 +89,23 @@ void verifyStorageFolderIsCreated() { assertThat(new File(rootDir, STORAGE_FOLDER_NAME).exists()).isTrue(); } + @Test + void whenDeserializationFails_returnFalse() throws IOException { + when(deserializer.deserialize(any())) + .thenAnswer( + invocation -> { + throw new DeserializationException(new IOException("Some exception")); + }); + + assertThat(exporter.exportStoredBatch(1, TimeUnit.SECONDS)).isFalse(); + } + private void createDummyFile() throws IOException { File file = new File(rootDir, STORAGE_FOLDER_NAME + "/" + 1000L); Files.write(file.toPath(), singletonList("First line")); } - private void setUpSerializer() { + private void setUpSerializer() throws DeserializationException { deserializer = mock(); when(deserializer.deserialize(any())).thenReturn(deserializedData); } diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java index 2dde9610a..a7c127edb 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/ReadableFileTest.java @@ -19,6 +19,7 @@ import io.opentelemetry.api.common.Value; import io.opentelemetry.api.logs.Severity; import io.opentelemetry.contrib.disk.buffering.internal.files.TemporaryFileProvider; +import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException; import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer; import io.opentelemetry.contrib.disk.buffering.internal.serialization.mapping.logs.models.LogRecordDataImpl; import io.opentelemetry.contrib.disk.buffering.internal.serialization.serializers.SignalSerializer; @@ -140,8 +141,7 @@ void whenProcessingSucceeds_returnSuccessStatus() throws IOException { @Test void whenProcessingFails_returnTryLaterStatus() throws IOException { assertEquals( - ReadableResult.TRY_LATER, - readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER)); + ReadableResult.TRY_LATER, readableFile.readAndProcess(bytes -> ProcessResult.TRY_LATER)); } @Test @@ -250,6 +250,10 @@ private static List getRemainingDataAndClose(ReadableFile readabl } private static LogRecordData deserialize(byte[] data) { - return DESERIALIZER.deserialize(data).get(0); + try { + return DESERIALIZER.deserialize(data).get(0); + } catch (DeserializationException e) { + throw new RuntimeException(e); + } } } From d1d1d14277b9d1b982b46bc8947c3c313e5bc0aa Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Tue, 26 Nov 2024 14:44:03 +0100 Subject: [PATCH 4/5] Spotless --- .../deserializers/DeserializationException.java | 5 +++++ .../contrib/disk/buffering/internal/storage/Storage.java | 7 ++++--- .../internal/storage/files/reader/ProcessResult.java | 5 +++++ 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java index 192ae71bd..f8e1d9729 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/serialization/deserializers/DeserializationException.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers; import java.io.IOException; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java index c64f7d0d3..4ff60cbdc 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/Storage.java @@ -78,12 +78,13 @@ private boolean write(byte[] item, int attemptNumber) throws IOException { * @param processing Is passed over to {@link ReadableFile#readAndProcess(Function)}. * @throws IOException If an unexpected error happens. */ - public ReadableResult readAndProcess(Function processing) throws IOException { + public ReadableResult readAndProcess(Function processing) + throws IOException { return readAndProcess(processing, 1); } - private ReadableResult readAndProcess(Function processing, int attemptNumber) - throws IOException { + private ReadableResult readAndProcess( + Function processing, int attemptNumber) throws IOException { if (isClosed.get()) { logger.log("Refusing to read from storage after being closed."); return ReadableResult.FAILED; diff --git a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java index bcd43760e..696d98d2c 100644 --- a/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java +++ b/disk-buffering/src/main/java/io/opentelemetry/contrib/disk/buffering/internal/storage/files/reader/ProcessResult.java @@ -1,3 +1,8 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader; /** Result of processing the contents of a file. */ From eef82fa0017792256dd4bab2b4563a70a0dc067e Mon Sep 17 00:00:00 2001 From: Cesar Munoz <56847527+LikeTheSalad@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:44:58 +0100 Subject: [PATCH 5/5] PR suggestion for test --- .../contrib/disk/buffering/FromDiskExporterImplTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java index aea99bbf5..b2955630e 100644 --- a/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java +++ b/disk-buffering/src/test/java/io/opentelemetry/contrib/disk/buffering/FromDiskExporterImplTest.java @@ -10,6 +10,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -91,11 +92,7 @@ void verifyStorageFolderIsCreated() { @Test void whenDeserializationFails_returnFalse() throws IOException { - when(deserializer.deserialize(any())) - .thenAnswer( - invocation -> { - throw new DeserializationException(new IOException("Some exception")); - }); + doThrow(DeserializationException.class).when(deserializer).deserialize(any()); assertThat(exporter.exportStoredBatch(1, TimeUnit.SECONDS)).isFalse(); }