Skip to content

Commit

Permalink
Allow MockTracingTelemetry to await for asynchronous tasks terminatio…
Browse files Browse the repository at this point in the history
…n before validating spans (#9561) (#9646)

* Allow MockTracingTelemetry to await for asynchronous tasks termination before validating spans



* Address code review comments



---------


(cherry picked from commit 0274095)

Signed-off-by: Andriy Redko <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 12299c6 commit 21da767
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,46 @@
import org.opensearch.telemetry.metrics.MetricsTelemetry;
import org.opensearch.telemetry.tracing.TracingTelemetry;
import org.opensearch.test.telemetry.tracing.MockTracingTelemetry;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;

/**
* Mock {@link Telemetry} implementation for testing.
*/
public class MockTelemetry implements Telemetry {

private final TelemetrySettings settings;
private final ThreadPool threadPool;

/**
* Constructor with settings.
* @param settings telemetry settings.
*/
public MockTelemetry(TelemetrySettings settings) {
this.settings = settings;
this(settings, null);
}

/**
* Constructor with settings.
* @param settings telemetry settings.
* @param threadPool thread pool to watch for termination
*/
public MockTelemetry(TelemetrySettings settings, ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public TracingTelemetry getTracingTelemetry() {
return new MockTracingTelemetry();
return new MockTracingTelemetry(() -> {
// There could be some asynchronous tasks running that we should await for before the closing
// up the tracer instance.
if (threadPool != null) {
try {
threadPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,34 @@

package org.opensearch.test.telemetry;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.telemetry.Telemetry;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Supplier;

/**
* Mock {@link TelemetryPlugin} implementation for testing.
*/
public class MockTelemetryPlugin extends Plugin implements TelemetryPlugin {
private static final String MOCK_TRACER_NAME = "mock";
private final SetOnce<ThreadPool> threadPool = new SetOnce<>();

/**
* Base constructor.
Expand All @@ -28,9 +44,27 @@ public MockTelemetryPlugin() {

}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.threadPool.set(threadPool);
return Collections.emptyList();
}

@Override
public Optional<Telemetry> getTelemetry(TelemetrySettings settings) {
return Optional.of(new MockTelemetry(settings));
return Optional.of(new MockTelemetry(settings, threadPool.get()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,22 @@
public class MockTracingTelemetry implements TracingTelemetry {

private final SpanProcessor spanProcessor = new StrictCheckSpanProcessor();
private final Runnable onClose;

/**
* Base constructor.
*/
public MockTracingTelemetry() {
this(() -> {});
}

/**
* Base constructor.
*
* @param onClose on close hook
*/
public MockTracingTelemetry(final Runnable onClose) {
this.onClose = onClose;
}

@Override
Expand All @@ -46,6 +56,9 @@ public TracingContextPropagator getContextPropagator() {

@Override
public void close() {
// Run onClose hook
onClose.run();

List<MockSpanData> spanData = ((StrictCheckSpanProcessor) spanProcessor).getFinishedSpanItems();
if (spanData.size() != 0) {
TelemetryValidators validators = new TelemetryValidators(
Expand Down

0 comments on commit 21da767

Please sign in to comment.