From 0274095defad72f17993d052b3505808b342b152 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 30 Aug 2023 16:22:38 -0400 Subject: [PATCH] Allow MockTracingTelemetry to await for asynchronous tasks termination before validating spans (#9561) * Allow MockTracingTelemetry to await for asynchronous tasks termination before validating spans Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- .../test/telemetry/MockTelemetry.java | 29 ++++++++++++--- .../test/telemetry/MockTelemetryPlugin.java | 36 ++++++++++++++++++- .../tracing/MockTracingTelemetry.java | 13 +++++++ 3 files changed, 73 insertions(+), 5 deletions(-) diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java index de24ea0de77bb..894e8a67cea1f 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java @@ -13,25 +13,46 @@ import org.opensearch.telemetry.metrics.MetricsTelemetry; import org.opensearch.telemetry.tracing.TracingTelemetry; import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; /** * Mock {@link Telemetry} implementation for testing. */ public class MockTelemetry implements Telemetry { - - private final TelemetrySettings settings; + private final ThreadPool threadPool; /** * Constructor with settings. * @param settings telemetry settings. */ public MockTelemetry(TelemetrySettings settings) { - this.settings = settings; + this(settings, null); + } + + /** + * Constructor with settings. + * @param settings telemetry settings. + * @param threadPool thread pool to watch for termination + */ + public MockTelemetry(TelemetrySettings settings, ThreadPool threadPool) { + this.threadPool = threadPool; } @Override public TracingTelemetry getTracingTelemetry() { - return new MockTracingTelemetry(); + return new MockTracingTelemetry(() -> { + // There could be some asynchronous tasks running that we should await for before the closing + // up the tracer instance. + if (threadPool != null) { + try { + threadPool.awaitTermination(10, TimeUnit.SECONDS); + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + }); } @Override diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java index 4f483098caf82..ebba9857aa8f1 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java @@ -8,18 +8,34 @@ package org.opensearch.test.telemetry; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.SetOnce; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.script.ScriptService; import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; +import java.util.Collection; +import java.util.Collections; import java.util.Optional; +import java.util.function.Supplier; /** * Mock {@link TelemetryPlugin} implementation for testing. */ public class MockTelemetryPlugin extends Plugin implements TelemetryPlugin { private static final String MOCK_TRACER_NAME = "mock"; + private final SetOnce threadPool = new SetOnce<>(); /** * Base constructor. @@ -28,9 +44,27 @@ public MockTelemetryPlugin() { } + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + this.threadPool.set(threadPool); + return Collections.emptyList(); + } + @Override public Optional getTelemetry(TelemetrySettings settings) { - return Optional.of(new MockTelemetry(settings)); + return Optional.of(new MockTelemetry(settings, threadPool.get())); } @Override diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java index 9b958bbb40f84..a5e51dd27541b 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingTelemetry.java @@ -24,12 +24,22 @@ public class MockTracingTelemetry implements TracingTelemetry { private final SpanProcessor spanProcessor = new StrictCheckSpanProcessor(); + private final Runnable onClose; /** * Base constructor. */ public MockTracingTelemetry() { + this(() -> {}); + } + /** + * Base constructor. + * + * @param onClose on close hook + */ + public MockTracingTelemetry(final Runnable onClose) { + this.onClose = onClose; } @Override @@ -46,6 +56,9 @@ public TracingContextPropagator getContextPropagator() { @Override public void close() { + // Run onClose hook + onClose.run(); + List spanData = ((StrictCheckSpanProcessor) spanProcessor).getFinishedSpanItems(); if (spanData.size() != 0) { TelemetryValidators validators = new TelemetryValidators(