Skip to content

Commit

Permalink
Deserialization validation (#1571)
Browse files Browse the repository at this point in the history
  • Loading branch information
LikeTheSalad authored Nov 27, 2024
1 parent 1fa0760 commit b2f04ab
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package io.opentelemetry.contrib.disk.buffering.internal.exporter;

import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.Storage;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand Down Expand Up @@ -64,11 +66,15 @@ public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException
+ " "
+ deserializer.signalType()
+ " bytes from storage.");
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
logger.log(
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
return join.isSuccess();
try {
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
logger.log(
"Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
return join.isSuccess() ? ProcessResult.SUCCEEDED : ProcessResult.TRY_LATER;
} catch (DeserializationException e) {
return ProcessResult.CONTENT_INVALID;
}
});
return result == ReadableResult.SUCCEEDED;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers;

import java.io.IOException;

@SuppressWarnings("serial")
public class DeserializationException extends IOException {
public DeserializationException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ static LogRecordDataDeserializer getInstance() {
}

@Override
public List<LogRecordData> deserialize(byte[] source) {
public List<LogRecordData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoLogsDataMapper.getInstance().fromProto(LogsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
throw new DeserializationException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ static MetricDataDeserializer getInstance() {
}

@Override
public List<MetricData> deserialize(byte[] source) {
public List<MetricData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoMetricsDataMapper.getInstance().fromProto(MetricsData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
throw new DeserializationException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ static SignalDeserializer<LogRecordData> ofLogs() {
}

/** Deserializes the given byte array into a list of telemetry items. */
List<SDK_ITEM> deserialize(byte[] source);
List<SDK_ITEM> deserialize(byte[] source) throws DeserializationException;

/** Returns the name of the stored type of signal -- one of "metrics", "spans", or "logs". */
default String signalType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ static SpanDataDeserializer getInstance() {
}

@Override
public List<SpanData> deserialize(byte[] source) {
public List<SpanData> deserialize(byte[] source) throws DeserializationException {
try {
return ProtoSpansDataMapper.getInstance().fromProto(TracesData.ADAPTER.decode(source));
} catch (IOException e) {
throw new IllegalArgumentException(e);
throw new DeserializationException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult;
import io.opentelemetry.contrib.disk.buffering.internal.utils.DebugLogger;
Expand Down Expand Up @@ -77,12 +78,13 @@ private boolean write(byte[] item, int attemptNumber) throws IOException {
* @param processing Is passed over to {@link ReadableFile#readAndProcess(Function)}.
* @throws IOException If an unexpected error happens.
*/
public ReadableResult readAndProcess(Function<byte[], Boolean> processing) throws IOException {
public ReadableResult readAndProcess(Function<byte[], ProcessResult> processing)
throws IOException {
return readAndProcess(processing, 1);
}

private ReadableResult readAndProcess(Function<byte[], Boolean> processing, int attemptNumber)
throws IOException {
private ReadableResult readAndProcess(
Function<byte[], ProcessResult> processing, int attemptNumber) throws IOException {
if (isClosed.get()) {
logger.log("Refusing to read from storage after being closed.");
return ReadableResult.FAILED;
Expand All @@ -103,7 +105,7 @@ private ReadableResult readAndProcess(Function<byte[], Boolean> processing, int
ReadableResult result = readableFile.readAndProcess(processing);
switch (result) {
case SUCCEEDED:
case PROCESSING_FAILED:
case TRY_LATER:
return result;
default:
// Retry with new file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.opentelemetry.contrib.disk.buffering.StorageConfiguration;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.DelimitedProtoStreamReader;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ReadResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.StreamReader;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.utils.FileTransferUtil;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ReadableFile(
* If the processing function returns TRUE, then the provided line will be deleted from the
* source file. If the function returns FALSE, no changes will be applied to the source file.
*/
public synchronized ReadableResult readAndProcess(Function<byte[], Boolean> processing)
public synchronized ReadableResult readAndProcess(Function<byte[], ProcessResult> processing)
throws IOException {
if (isClosed.get()) {
return ReadableResult.FAILED;
Expand All @@ -97,20 +98,25 @@ public synchronized ReadableResult readAndProcess(Function<byte[], Boolean> proc
cleanUp();
return ReadableResult.FAILED;
}
if (processing.apply(read.content)) {
unconsumedResult = null;
readBytes += read.totalReadLength;
int amountOfBytesToTransfer = originalFileSize - readBytes;
if (amountOfBytesToTransfer > 0) {
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
} else {
switch (processing.apply(read.content)) {
case SUCCEEDED:
unconsumedResult = null;
readBytes += read.totalReadLength;
int amountOfBytesToTransfer = originalFileSize - readBytes;
if (amountOfBytesToTransfer > 0) {
fileTransferUtil.transferBytes(readBytes, amountOfBytesToTransfer);
} else {
cleanUp();
}
return ReadableResult.SUCCEEDED;
case TRY_LATER:
unconsumedResult = read;
return ReadableResult.TRY_LATER;
case CONTENT_INVALID:
cleanUp();
}
return ReadableResult.SUCCEEDED;
} else {
unconsumedResult = read;
return ReadableResult.PROCESSING_FAILED;
return ReadableResult.FAILED;
}
return ReadableResult.FAILED;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader;

/** Result of processing the contents of a file. */
public enum ProcessResult {
SUCCEEDED,
TRY_LATER,
CONTENT_INVALID
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
public enum ReadableResult {
SUCCEEDED,
FAILED,
PROCESSING_FAILED
TRY_LATER
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.DeserializationException;
import io.opentelemetry.contrib.disk.buffering.internal.serialization.deserializers.SignalDeserializer;
import io.opentelemetry.contrib.disk.buffering.internal.storage.TestData;
import io.opentelemetry.sdk.common.Clock;
Expand Down Expand Up @@ -88,12 +90,19 @@ void verifyStorageFolderIsCreated() {
assertThat(new File(rootDir, STORAGE_FOLDER_NAME).exists()).isTrue();
}

@Test
void whenDeserializationFails_returnFalse() throws IOException {
doThrow(DeserializationException.class).when(deserializer).deserialize(any());

assertThat(exporter.exportStoredBatch(1, TimeUnit.SECONDS)).isFalse();
}

private void createDummyFile() throws IOException {
File file = new File(rootDir, STORAGE_FOLDER_NAME + "/" + 1000L);
Files.write(file.toPath(), singletonList("First line"));
}

private void setUpSerializer() {
private void setUpSerializer() throws DeserializationException {
deserializer = mock();
when(deserializer.deserialize(any())).thenReturn(deserializedData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.contrib.disk.buffering.internal.storage;

import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.PROCESSING_FAILED;
import static io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult.TRY_LATER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -17,6 +17,7 @@

import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.reader.ProcessResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.WritableResult;
import java.io.IOException;
Expand All @@ -28,7 +29,7 @@
class StorageTest {
private FolderManager folderManager;
private Storage storage;
private Function<byte[], Boolean> processing;
private Function<byte[], ProcessResult> processing;
private ReadableFile readableFile;
private WritableFile writableFile;

Expand All @@ -52,11 +53,11 @@ void whenReadingAndProcessingSuccessfully_returnSuccess() throws IOException {
}

@Test
void whenReadableFileProcessingFails_returnFailed() throws IOException {
void whenReadableFileProcessingFails_returnTryLater() throws IOException {
when(folderManager.getReadableFile()).thenReturn(readableFile);
when(readableFile.readAndProcess(processing)).thenReturn(PROCESSING_FAILED);
when(readableFile.readAndProcess(processing)).thenReturn(TRY_LATER);

assertEquals(PROCESSING_FAILED, storage.readAndProcess(processing));
assertEquals(TRY_LATER, storage.readAndProcess(processing));

verify(readableFile).readAndProcess(processing);
}
Expand Down
Loading

0 comments on commit b2f04ab

Please sign in to comment.