Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow MockTracingTelemetry to await for asynchronous tasks termination before validating spans #9561

Merged
merged 2 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,46 @@
import org.opensearch.telemetry.metrics.MetricsTelemetry;
import org.opensearch.telemetry.tracing.TracingTelemetry;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

/**
* Mock {@link Telemetry} implementation for testing.
*/
public class MockTelemetry implements Telemetry {

private final TelemetrySettings settings;
private final ThreadPool threadPool;

/**
* Constructor with settings.
* @param settings telemetry settings.
*/
public MockTelemetry(TelemetrySettings settings) {
this.settings = settings;
this(settings, null);
}

/**
* Constructor with settings.
* @param settings telemetry settings.
* @param threadPool thread pool to watch for termination
*/
public MockTelemetry(TelemetrySettings settings, ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public TracingTelemetry getTracingTelemetry() {
return new MockTracingTelemetry();
return new MockTracingTelemetry(() -> {
// There could be some asynchronous tasks running that we should await for before the closing
// up the tracer instance.
if (threadPool != null) {
try {
threadPool.awaitTermination(10, TimeUnit.SECONDS);
andrross marked this conversation as resolved.
Show resolved Hide resolved
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,34 @@

package org.opensearch.test.telemetry;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Mock {@link TelemetryPlugin} implementation for testing.
*/
public class MockTelemetryPlugin extends Plugin implements TelemetryPlugin {
private static final String MOCK_TRACER_NAME = "mock";
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();

/**
* Base constructor.
Expand All @@ -28,9 +44,27 @@ public MockTelemetryPlugin() {

}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.threadPool.set(threadPool);
return Collections.emptyList();
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(new MockTelemetry(settings));
return Optional.of(new MockTelemetry(settings, threadPool.get()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,22 @@
public class MockTracingTelemetry implements TracingTelemetry {

private final SpanProcessor spanProcessor = new StrictCheckSpanProcessor();
private final Runnable onClose;

/**
* Base constructor.
*/
public MockTracingTelemetry() {
this(() -> {});
}

/**
* Base constructor.
*
* @param onClose on close hook
*/
public MockTracingTelemetry(final Runnable onClose) {
this.onClose = onClose;
}

@Override
Expand All @@ -46,6 +56,9 @@ public TracingContextPropagator getContextPropagator() {

@Override
public void close() {
// Run onClose hook
onClose.run();

List<MockSpanData> spanData = ((StrictCheckSpanProcessor) spanProcessor).getFinishedSpanItems();
if (spanData.size() != 0) {
TelemetryValidators validators = new TelemetryValidators(
Expand Down