Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[disk-buffering] Add debug mode for verbose logging. #1455

Merged
merged 4 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static LogRecordFromDiskExporter create(
.setStorageConfiguration(config)
.setDeserializer(SignalDeserializer.ofLogs())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new LogRecordFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static MetricFromDiskExporter create(MetricExporter exporter, StorageConf
.setStorageConfiguration(config)
.setDeserializer(SignalDeserializer.ofMetrics())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new MetricFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public static SpanFromDiskExporter create(SpanExporter exporter, StorageConfigur
.setStorageConfiguration(config)
.setDeserializer(SignalDeserializer.ofSpans())
.setExportFunction(exporter::export)
.setDebugEnabled(config.isDebugEnabled())
.build();
return new SpanFromDiskExporter(delegate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public abstract class StorageConfiguration {
/** The root storage location for buffered telemetry. */
public abstract File getRootDir();

/** Returns true if the storage has been configured with debug verbosity enabled. */
public abstract boolean isDebugEnabled();

/** The max amount of time a file can receive new data. */
public abstract long getMaxFileAgeForWriteMillis();

Expand Down Expand Up @@ -62,6 +65,7 @@ public static Builder builder() {
.setMaxFileAgeForWriteMillis(TimeUnit.SECONDS.toMillis(30))
.setMinFileAgeForReadMillis(TimeUnit.SECONDS.toMillis(33))
.setMaxFileAgeForReadMillis(TimeUnit.HOURS.toMillis(18))
.setDebugEnabled(false)
.setTemporaryFileProvider(fileProvider);
}

Expand All @@ -81,6 +85,8 @@ public abstract static class Builder {

public abstract Builder setRootDir(File rootDir);

public abstract Builder setDebugEnabled(boolean debugEnabled);

public abstract StorageConfiguration build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class FromDiskExporterBuilder<T> {
private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();

private boolean debugEnabled = false;

@NotNull
private static <T> SignalDeserializer<T> noopDeserializer() {
return x -> emptyList();
Expand Down Expand Up @@ -63,8 +65,19 @@ public FromDiskExporterBuilder<T> setExportFunction(
return this;
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> enableDebug() {
return setDebugEnabled(true);
}

@CanIgnoreReturnValue
public FromDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
this.debugEnabled = debugEnabled;
return this;
}

public FromDiskExporterImpl<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new FromDiskExporterImpl<>(serializer, exportFunction, storage);
return new FromDiskExporterImpl<>(serializer, exportFunction, storage, debugEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
Expand All @@ -21,18 +22,21 @@
* another delegated exporter.
*/
public final class FromDiskExporterImpl<EXPORT_DATA> implements FromDiskExporter {
private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName());
private final Storage storage;
private final SignalDeserializer<EXPORT_DATA> deserializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName());
private final boolean debugEnabled;

FromDiskExporterImpl(
SignalDeserializer<EXPORT_DATA> deserializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
Storage storage,
boolean debugEnabled) {
this.deserializer = deserializer;
this.exportFunction = exportFunction;
this.storage = storage;
this.debugEnabled = debugEnabled;
}

public static <T> FromDiskExporterBuilder<T> builder() {
Expand All @@ -44,24 +48,36 @@ public static <T> FromDiskExporterBuilder<T> builder() {
*
* @param timeout The amount of time to wait for the wrapped exporter to finish.
* @param unit The unit of the time provided.
* @return true if there was data available and it was successfully exported within the timeout
* @return true if there was data available, and it was successfully exported within the timeout
* provided. false otherwise.
* @throws IOException If an unexpected error happens.
*/
@Override
public boolean exportStoredBatch(long timeout, TimeUnit unit) throws IOException {
logger.log(Level.INFO, "Attempting to export batch from disk.");
log("Attempting to export " + deserializer.signalType() + " batch from disk.");
ReadableResult result =
storage.readAndProcess(
bytes -> {
logger.log(Level.INFO, "About to export stored batch.");
CompletableResultCode join =
exportFunction.apply(deserializer.deserialize(bytes)).join(timeout, unit);
log(
"Read "
+ bytes.length
+ " "
+ deserializer.signalType()
+ " bytes from storage.");
List<EXPORT_DATA> telemetry = deserializer.deserialize(bytes);
log("Now exporting batch of " + telemetry.size() + " " + deserializer.signalType());
CompletableResultCode join = exportFunction.apply(telemetry).join(timeout, unit);
return join.isSuccess();
});
return result == ReadableResult.SUCCEEDED;
}

private void log(String msg) {
if (debugEnabled) {
logger.log(Level.INFO, msg);
}
}

@Override
public void shutdown() throws IOException {
storage.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,52 @@ public class ToDiskExporter<EXPORT_DATA> {
private final Storage storage;
private final SignalSerializer<EXPORT_DATA> serializer;
private final Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction;
private final boolean debugEnabled;

ToDiskExporter(
SignalSerializer<EXPORT_DATA> serializer,
Function<Collection<EXPORT_DATA>, CompletableResultCode> exportFunction,
Storage storage) {
Storage storage,
boolean debugEnabled) {
this.serializer = serializer;
this.exportFunction = exportFunction;
this.storage = storage;
this.debugEnabled = debugEnabled;
}

public static <T> ToDiskExporterBuilder<T> builder() {
return new ToDiskExporterBuilder<>();
}

public CompletableResultCode export(Collection<EXPORT_DATA> data) {
logger.log(Level.FINER, "Intercepting exporter batch.");
log("Intercepting exporter batch.", Level.FINER);
try {
if (storage.write(serializer.serialize(data))) {
return CompletableResultCode.ofSuccess();
}
logger.log(Level.INFO, "Could not store batch in disk. Exporting it right away.");
log("Could not store batch in disk. Exporting it right away.");
return exportFunction.apply(data);
} catch (IOException e) {
logger.log(
Level.WARNING,
"An unexpected error happened while attempting to write the data in disk. Exporting it right away.",
e);
if (debugEnabled) {
logger.log(
Level.WARNING,
"An unexpected error happened while attempting to write the data in disk. Exporting it right away.",
e);
}
return exportFunction.apply(data);
}
}

private void log(String msg) {
log(msg, Level.INFO);
}

private void log(String msg, Level level) {
if (debugEnabled) {
logger.log(level, msg);
}
}

public void shutdown() throws IOException {
storage.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,21 @@ public final class ToDiskExporterBuilder<T> {

private Function<Collection<T>, CompletableResultCode> exportFunction =
x -> CompletableResultCode.ofFailure();
private boolean debugEnabled = false;

ToDiskExporterBuilder() {}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> enableDebug() {
return setDebugEnabled(true);
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setDebugEnabled(boolean debugEnabled) {
this.debugEnabled = debugEnabled;
return this;
}

@CanIgnoreReturnValue
public ToDiskExporterBuilder<T> setFolderName(String folderName) {
storageBuilder.setFolderName(folderName);
Expand Down Expand Up @@ -61,7 +73,7 @@ public ToDiskExporterBuilder<T> setExportFunction(

public ToDiskExporter<T> build() throws IOException {
Storage storage = storageBuilder.build();
return new ToDiskExporter<>(serializer, exportFunction, storage);
return new ToDiskExporter<>(serializer, exportFunction, storage, debugEnabled);
}

private static void validateConfiguration(StorageConfiguration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public List<LogRecordData> deserialize(byte[] source) {
throw new IllegalArgumentException(e);
}
}

@Override
public String signalType() {
return "logs";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public List<MetricData> deserialize(byte[] source) {
throw new IllegalArgumentException(e);
}
}

@Override
public String signalType() {
return "metrics";
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,11 @@ static SignalDeserializer<LogRecordData> ofLogs() {
return LogRecordDataDeserializer.getInstance();
}

/** Deserializes the given byte array into a list of telemetry items. */
List<SDK_ITEM> deserialize(byte[] source);

/** Returns the name of the type of signal -- one of "metrics", "traces", or "logs". */
default String signalType() {
return "unknown";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public List<SpanData> deserialize(byte[] source) {
throw new IllegalArgumentException(e);
}
}

@Override
public String signalType() {
return "spans";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.IOException;
import java.util.Objects;
import javax.annotation.Nullable;
import org.jetbrains.annotations.NotNull;

public final class FolderManager {
private final File folder;
Expand Down Expand Up @@ -42,6 +43,7 @@ public synchronized ReadableFile getReadableFile() throws IOException {
return null;
}

@NotNull
public synchronized WritableFile createWritableFile() throws IOException {
long systemCurrentTimeMillis = nowMillis(clock);
File[] existingFiles = folder.listFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.contrib.disk.buffering.internal.storage;

import static java.util.logging.Level.WARNING;

import io.opentelemetry.contrib.disk.buffering.internal.exporter.FromDiskExporterImpl;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.ReadableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.files.WritableFile;
import io.opentelemetry.contrib.disk.buffering.internal.storage.responses.ReadableResult;
Expand All @@ -13,17 +16,23 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

public final class Storage implements Closeable {
private static final int MAX_ATTEMPTS = 3;
private static final Logger logger = Logger.getLogger(FromDiskExporterImpl.class.getName());

private final FolderManager folderManager;
private final boolean debugEnabled;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
@Nullable private WritableFile writableFile;
@Nullable private ReadableFile readableFile;

public Storage(FolderManager folderManager) {
public Storage(FolderManager folderManager, boolean debugEnabled) {
this.folderManager = folderManager;
this.debugEnabled = debugEnabled;
}

public static StorageBuilder builder() {
Expand All @@ -42,13 +51,16 @@ public boolean write(byte[] item) throws IOException {

private boolean write(byte[] item, int attemptNumber) throws IOException {
if (isClosed.get()) {
log("Refusing to write to storage after being closed.");
return false;
}
if (attemptNumber > MAX_ATTEMPTS) {
log("Max number of attempts to write buffered data exceeded.", WARNING);
return false;
}
if (writableFile == null) {
writableFile = folderManager.createWritableFile();
log("Created new writableFile: " + writableFile);
}
WritableResult result = writableFile.append(item);
if (result != WritableResult.SUCCEEDED) {
Expand All @@ -72,17 +84,22 @@ public ReadableResult readAndProcess(Function<byte[], Boolean> processing) throw
private ReadableResult readAndProcess(Function<byte[], Boolean> processing, int attemptNumber)
throws IOException {
if (isClosed.get()) {
log("Refusing to read from storage after being closed.");
return ReadableResult.FAILED;
}
if (attemptNumber > MAX_ATTEMPTS) {
log("Maximum number of attempts to read and process buffered data exceeded.", WARNING);
return ReadableResult.FAILED;
}
if (readableFile == null) {
log("Obtaining a new readableFile from the folderManager.");
readableFile = folderManager.getReadableFile();
if (readableFile == null) {
log("Unable to get or create readable file.");
return ReadableResult.FAILED;
}
}
log("Attempting to read data from " + readableFile);
ReadableResult result = readableFile.readAndProcess(processing);
switch (result) {
case SUCCEEDED:
Expand All @@ -95,8 +112,19 @@ private ReadableResult readAndProcess(Function<byte[], Boolean> processing, int
}
}

private void log(String msg) {
log(msg, Level.INFO);
}

private void log(String msg, Level level) {
breedx-splk marked this conversation as resolved.
Show resolved Hide resolved
if (debugEnabled) {
logger.log(level, msg);
}
}

@Override
public void close() throws IOException {
log("Closing disk buffering storage.");
if (isClosed.compareAndSet(false, true)) {
if (writableFile != null) {
writableFile.close();
Expand Down
Loading
Loading