Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserialization validation #1571

Merged
merged 5 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand Down Expand Up @@ -64,11 +66,15 @@ public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException
+ " "
+ deserializer.signalType()
+ " bytes from storage.");
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
logger.log(
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
return join.isSuccess();
try {
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
logger.log(
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER;
} catch (DeserializationException e) {
return ProcessResult.CONTENT_INVALID;
}
});
return result == ReadableResult.SUCCEEDED;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import java.io.IOException;

@SuppressWarnings("serial")
public class DeserializationException extends IOException {
public DeserializationException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ static LogRecordDataDeserializer getInstance() {
}

@Override
public List<LogRecordData> deserialize(byte[] source) {
public List<LogRecordData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
throw new DeserializationException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ static MetricDataDeserializer getInstance() {
}

@Override
public List<MetricData> deserialize(byte[] source) {
public List<MetricData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
throw new DeserializationException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ static SignalDeserializer<LogRecordData> ofLogs() {
}

/** Deserializes the given byte array into a list of telemetry items. */
List<SDK_ITEM> deserialize(byte[] source);
List<SDK_ITEM> deserialize(byte[] source) throws DeserializationException;

/** Returns the name of the stored type of signal -- one of "metrics", "spans", or "logs". */
default String signalType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ static SpanDataDeserializer getInstance() {
}

@Override
public List<SpanData> deserialize(byte[] source) {
public List<SpanData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
throw new DeserializationException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult;
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
Expand Down Expand Up @@ -77,12 +78,13 @@ private boolean write(byte[] item, int attemptNumber) throws IOException {
* @param processing Is passed over to {@link ReadableFile#readAndProcess(Function)}.
* @throws IOException If an unexpected error happens.
*/
public ReadableResult readAndProcess(Function<byte[], Boolean> processing) throws IOException {
public ReadableResult readAndProcess(Function<byte[], ProcessResult> processing)
throws IOException {
return readAndProcess(processing, 1);
}

private ReadableResult readAndProcess(Function<byte[], Boolean> processing, int attemptNumber)
throws IOException {
private ReadableResult readAndProcess(
Function<byte[], ProcessResult> processing, int attemptNumber) throws IOException {
if (isClosed.get()) {
logger.log("Refusing to read from storage after being closed.");
return ReadableResult.FAILED;
Expand All @@ -103,7 +105,7 @@ private ReadableResult readAndProcess(Function<byte[], Boolean> processing, int
ReadableResult result = readableFile.readAndProcess(processing);
switch (result) {
case SUCCEEDED:
case PROCESSING_FAILED:
case TRY_LATER:
return result;
default:
// Retry with new file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.opentelemetry.contrib.disk.buffering.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.DelimitedProtoStreamReader;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ReadableFile(
* If the processing function returns TRUE, then the provided line will be deleted from the
* source file. If the function returns FALSE, no changes will be applied to the source file.
*/
public synchronized ReadableResult readAndProcess(Function<byte[], Boolean> processing)
public synchronized ReadableResult readAndProcess(Function<byte[], ProcessResult> processing)
throws IOException {
if (isClosed.get()) {
return ReadableResult.FAILED;
Expand All @@ -97,20 +98,25 @@ public synchronized ReadableResult readAndProcess(Function<byte[], Boolean> proc
cleanUp();
return ReadableResult.FAILED;
}
if (processing.apply(read.content)) {
unconsumedResult = null;
readBytes += read.totalReadLength;
int amountOfBytesToTransfer = originalFileSize - readBytes;
if (amountOfBytesToTransfer > 0) {
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
} else {
switch (processing.apply(read.content)) {
case SUCCEEDED:
unconsumedResult = null;
readBytes += read.totalReadLength;
int amountOfBytesToTransfer = originalFileSize - readBytes;
if (amountOfBytesToTransfer > 0) {
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
} else {
cleanUp();
}
return ReadableResult.SUCCEEDED;
case TRY_LATER:
unconsumedResult = read;
return ReadableResult.TRY_LATER;
case CONTENT_INVALID:
cleanUp();
}
return ReadableResult.SUCCEEDED;
} else {
unconsumedResult = read;
return ReadableResult.PROCESSING_FAILED;
return ReadableResult.FAILED;
}
return ReadableResult.FAILED;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader;

/** Result of processing the contents of a file. */
public enum ProcessResult {
SUCCEEDED,
TRY_LATER,
CONTENT_INVALID
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
public enum ReadableResult {
SUCCEEDED,
FAILED,
PROCESSING_FAILED
TRY_LATER
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.mockito.Mockito.when;

import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData;
import io.opentelemetry.sdk.common.Clock;
Expand Down Expand Up @@ -88,12 +89,23 @@ void verifyStorageFolderIsCreated() {
assertThat(new File(rootDir, STORAGE_FOLDER_NAME).exists()).isTrue();
}

@Test
void whenDeserializationFails_returnFalse() throws IOException {
when(deserializer.deserialize(any()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doThrow is simpler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added the changes.

.thenAnswer(
invocation -> {
throw new DeserializationException(new IOException("Some exception"));
});

assertThat(exporter.exportStoredBatch(1, TimeUnit.SECONDS)).isFalse();
}

private void createDummyFile() throws IOException {
File file = new File(rootDir, STORAGE_FOLDER_NAME + "/" + 1000L);
Files.write(file.toPath(), singletonList("First line"));
}

private void setUpSerializer() {
private void setUpSerializer() throws DeserializationException {
deserializer = mock();
when(deserializer.deserialize(any())).thenReturn(deserializedData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.contrib.disk.buffering.internal.storage;

import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.PROCESSING_FAILED;
import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.TRY_LATER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -17,6 +17,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult;
import java.io.IOException;
Expand All @@ -28,7 +29,7 @@
class StorageTest {
private FolderManager folderManager;
private Storage storage;
private Function<byte[], Boolean> processing;
private Function<byte[], ProcessResult> processing;
private ReadableFile readableFile;
private WritableFile writableFile;

Expand All @@ -52,11 +53,11 @@ void whenReadingAndProcessingSuccessfully_returnSuccess() throws IOException {
}

@Test
void whenReadableFileProcessingFails_returnFailed() throws IOException {
void whenReadableFileProcessingFails_returnTryLater() throws IOException {
when(folderManager.getReadableFile()).thenReturn(readableFile);
when(readableFile.readAndProcess(processing)).thenReturn(PROCESSING_FAILED);
when(readableFile.readAndProcess(processing)).thenReturn(TRY_LATER);

assertEquals(PROCESSING_FAILED, storage.readAndProcess(processing));
assertEquals(TRY_LATER, storage.readAndProcess(processing));

verify(readableFile).readAndProcess(processing);
}
Expand Down
Loading
Loading