getTaskEventListeners() {
+ return Collections.emptyList();
+ }
+
/**
* Provides a function to modify index template meta data on startup.
*
diff --git a/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java
index 2690a3cc30238..a2e9cc0b3bbf5 100644
--- a/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java
+++ b/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java
@@ -38,6 +38,8 @@
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.node.Node;
+import org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper;
+import org.opensearch.tracing.opentelemetry.OpenTelemetryService;
import java.util.Arrays;
import java.util.List;
@@ -111,7 +113,7 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
final ThreadFactory threadFactory = OpenSearchExecutors.daemonThreadFactory(
OpenSearchExecutors.threadName(settings.nodeName, name())
);
- final ExecutorService executor = OpenSearchExecutors.newScaling(
+ ExecutorService executor = OpenSearchExecutors.newScaling(
settings.nodeName + "/" + name(),
core,
max,
@@ -120,6 +122,9 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th
threadFactory,
threadContext
);
+ if (OpenTelemetryService.isThreadPoolAllowed(name())) {
+ executor = OpenTelemetryContextWrapper.wrapTask(executor);
+ }
return new ThreadPool.ExecutorHolder(executor, info);
}
diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
index 2c7a4db5b8679..931ddcca5ec98 100644
--- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
+++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java
@@ -45,13 +45,14 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
-import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.concurrent.XRejectedExecutionHandler;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.node.Node;
import org.opensearch.node.ReportingService;
+import org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper;
+import org.opensearch.tracing.opentelemetry.OpenTelemetryService;
import java.io.IOException;
import java.util.ArrayList;
@@ -215,7 +216,6 @@ public ThreadPool(
final ExecutorBuilder>... customBuilders
) {
assert Node.NODE_NAME_SETTING.exists(settings);
-
final Map builders = new HashMap<>();
final int allocatedProcessors = OpenSearchExecutors.allocatedProcessors(settings);
final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors);
@@ -407,9 +407,14 @@ public ExecutorService executor(String name) {
if (holder == null) {
throw new IllegalArgumentException("no executor service found for [" + name + "]");
}
+ if (OpenTelemetryService.isThreadPoolAllowed(Names.GENERIC)) {
+ return OpenTelemetryContextWrapper.wrapTask(holder.executor());
+ }
return holder.executor();
}
+
+
/**
* Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread.
*
@@ -687,7 +692,7 @@ static class ExecutorHolder {
public final Info info;
ExecutorHolder(ExecutorService executor, Info info) {
- assert executor instanceof OpenSearchThreadPoolExecutor || executor == DIRECT_EXECUTOR;
+// assert executor instanceof OpenSearchThreadPoolExecutor || executor == DIRECT_EXECUTOR;
this.executor = executor;
this.info = info;
}
diff --git a/server/src/main/java/org/opensearch/tracing/TaskEventListener.java b/server/src/main/java/org/opensearch/tracing/TaskEventListener.java
new file mode 100644
index 0000000000000..e060a361247e5
--- /dev/null
+++ b/server/src/main/java/org/opensearch/tracing/TaskEventListener.java
@@ -0,0 +1,50 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.tracing;
+
+import io.opentelemetry.context.Context;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * SPI for event listener when a thread picks a task to be executed associated with Context aware ExecutorService.
+ */
+public interface TaskEventListener {
+
+ /** Invoked when a thread start working a task associated with Context aware ExecutorService {@link org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper#wrapTask(ExecutorService)}.
+ * Context and Span{@link io.opentelemetry.api.trace.Span} information can be derived by calling Context{@link Context#current()}
+ * when OpenTelemetry implementation is chosen, as current thread will have the context propagated to it. Span events can be added as necessary.
+ * This can be used to start metering resource consumption by a thread.
+ *
+ * Note: current thread will have the Context set
+ * @param t Thread to start working on a task
+ */
+ void onStart(String operationName, String eventName, Thread t);
+
+ /** Invoked when a thread completes a task associated with Context aware ExecutorService{@link org.opensearch.tracing.opentelemetry.OpenTelemetryContextWrapper#wrapTask(ExecutorService)}
+ * for both success and failure scenarios. Context and Span{@link io.opentelemetry.api.trace.Span} information can
+ * be derived by calling Context{@link Context#current()} when OpenTelemetry implementation is chosen, as
+ * current thread will have the context propagated to it. Span events can be added as necessary.
+ *
+ * This can be used to stop metering resource consumption by a thread.
+ *
+ * @param t Thread which completed the task
+ */
+ void onEnd(String operationName, String eventName, Thread t);
+
+ /**
+ * This is used to check if the TaskEventListener should be called for provided operation and/or event.
+ * Contract here is, individual service provider should know which all operations are needs to be onboarded to
+ * this TaskEventListener. It doesn't make sense to call all available TaskEventListeners for all operations using
+ * current executor service.
+ * @param operationName name of the operation associated with a trace.
+ * @return
+ */
+ boolean isApplicable(String operationName, String eventName);
+}
diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OTelContextPreservingActionListener.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OTelContextPreservingActionListener.java
new file mode 100644
index 0000000000000..a97d1053e0c4f
--- /dev/null
+++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OTelContextPreservingActionListener.java
@@ -0,0 +1,85 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.tracing.opentelemetry;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import org.opensearch.action.ActionListener;
+
+import java.util.Objects;
+
+import static io.opentelemetry.api.common.AttributeKey.longKey;
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+
+/**
+ * It does follow -
+ * 1. Pass the context to thread working on response/failure of delegated ActionListener.
+ * 2. Close the Span if one passed while its construction.
+ * 3. Set the scope back to previous context prior to starting the new Span.
+ * In case, no Span was started and needs to be closed
+ * {@link OTelContextPreservingActionListener#OTelContextPreservingActionListener(ActionListener, Context)} can be used
+ * with beforeAttachContext as {@link Context#current()}.
+ * @param Response object type
+ */
+public final class OTelContextPreservingActionListener implements ActionListener {
+ private final ActionListener delegate;
+ private final Context beforeAttachContext;
+ private final Context afterAttachContext;
+ private final String spanID;
+
+ public OTelContextPreservingActionListener(ActionListener delegate, Context beforeAttachContext, String spanID) {
+ this.delegate = delegate;
+ this.beforeAttachContext = beforeAttachContext;
+ this.afterAttachContext = Context.current();
+ this.spanID = spanID;
+ }
+
+ public OTelContextPreservingActionListener(ActionListener delegate, Context beforeAttachContext) {
+ this(delegate, beforeAttachContext, null);
+ }
+
+ @Override
+ public void onResponse(Response r) {
+ try (Scope ignored = Objects.requireNonNull(afterAttachContext).makeCurrent()) {
+ Span span = Span.current();
+ closeCurrentScope(span);
+ }
+ try (Scope ignored = Objects.requireNonNull(beforeAttachContext).makeCurrent()) {
+ delegate.onResponse(r);
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ try (Scope ignored = Objects.requireNonNull(afterAttachContext).makeCurrent()) {
+ Span span = Span.current();
+ span.setStatus(StatusCode.ERROR);
+ closeCurrentScope(span);
+ }
+ try (Scope ignored = Objects.requireNonNull(beforeAttachContext).makeCurrent()) {
+ delegate.onFailure(e);
+ }
+ }
+
+ private void closeCurrentScope(Span span) {
+ assert spanID == null || span.getSpanContext().getSpanId().equals(spanID);
+ span.setAttribute(stringKey("finish-thread-name"), Thread.currentThread().getName());
+ span.setAttribute(longKey("finish-thread-id"), Thread.currentThread().getId());
+ if (spanID != null) {
+ Span.current().end();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName() + "/" + delegate.toString();
+ }
+}
diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchConcurrentExecutorService.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchConcurrentExecutorService.java
new file mode 100644
index 0000000000000..69381d9661d53
--- /dev/null
+++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchConcurrentExecutorService.java
@@ -0,0 +1,119 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.tracing.opentelemetry;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import org.opensearch.tracing.TaskEventListener;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+final public class OpenSearchConcurrentExecutorService extends OpenSearchForwardingExecutorService {
+ private final List taskEventListeners;
+
+ OpenSearchConcurrentExecutorService(ExecutorService delegate) {
+ this(delegate, OpenTelemetryService.TaskEventListeners.getInstance(null));
+ }
+
+ OpenSearchConcurrentExecutorService(ExecutorService delegate, List taskEventListeners) {
+ super(delegate);
+ this.taskEventListeners = taskEventListeners;
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ return delegate().submit(wrapTask(task, taskEventListeners));
+ }
+
+ @Override
+ public Future submit(Runnable task, T result) {
+ return delegate().submit(wrapTask(task, taskEventListeners), result);
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ return delegate().submit(wrapTask(task, taskEventListeners));
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks)
+ throws InterruptedException {
+ return delegate().invokeAll(wrap(tasks, taskEventListeners));
+ }
+
+ @Override
+ public List> invokeAll(
+ Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return delegate().invokeAll(wrap(tasks, taskEventListeners), timeout, unit);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks)
+ throws InterruptedException, ExecutionException {
+ return delegate().invokeAny(wrap(tasks, taskEventListeners));
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate().invokeAny(wrap(tasks, taskEventListeners), timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ delegate().execute(wrapTask(command, taskEventListeners));
+ }
+
+ private static Collection extends Callable> wrap(Collection extends Callable> tasks,
+ List taskEventListeners) {
+ List> wrapped = new ArrayList<>();
+ for (Callable task : tasks) {
+ wrapped.add(wrapTask(task, taskEventListeners));
+ }
+ return wrapped;
+ }
+
+ private static Callable wrapTask(Callable callable, List taskEventListeners) {
+ return () -> {
+ try (Scope ignored = Context.current().makeCurrent()) {
+ OpenTelemetryService.callTaskEventListeners(true, "", Span.current().getSpanContext().getSpanId() + "-" +
+ Thread.currentThread().getName() + "-Start", Thread.currentThread(), taskEventListeners);
+ return callable.call();
+ } finally {
+ OpenTelemetryService.callTaskEventListeners(false, "", Span.current().getSpanContext().getSpanId() + "-" +
+ Thread.currentThread().getName() + "-End", Thread.currentThread(), taskEventListeners);
+ }
+ };
+ }
+
+ static Runnable wrapTask(Runnable runnable, List taskEventListeners) {
+ return () -> {
+ try (Scope ignored = Context.current().makeCurrent()) {
+ OpenTelemetryService.callTaskEventListeners(true, "", Span.current().getSpanContext().getSpanId() + "-" +
+ Thread.currentThread().getName() + "-Start", Thread.currentThread(), taskEventListeners);
+ runnable.run();
+ } finally {
+ OpenTelemetryService.callTaskEventListeners(false, "", Span.current().getSpanContext().getSpanId() + "-" +
+ Thread.currentThread().getName() + "-End", Thread.currentThread(), taskEventListeners);
+ }
+ };
+ }
+
+
+}
diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchForwardingExecutorService.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchForwardingExecutorService.java
new file mode 100644
index 0000000000000..63435677f31c3
--- /dev/null
+++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenSearchForwardingExecutorService.java
@@ -0,0 +1,51 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.tracing.opentelemetry;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+abstract class OpenSearchForwardingExecutorService implements ExecutorService {
+
+ private final ExecutorService delegate;
+
+ protected OpenSearchForwardingExecutorService(ExecutorService delegate) {
+ this.delegate = delegate;
+ }
+
+ ExecutorService delegate() {
+ return delegate;
+ }
+
+ @Override
+ public final void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public final List shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public final boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public final boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+}
diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryContextWrapper.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryContextWrapper.java
new file mode 100644
index 0000000000000..370fce1f7094a
--- /dev/null
+++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryContextWrapper.java
@@ -0,0 +1,31 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.tracing.opentelemetry;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+public class OpenTelemetryContextWrapper {
+
+ /**
+ * Wraps the ExecutorService for context propagation across threads in the ThreadPool.
+ * @param executorService executor service to be wrapped
+ */
+ public static ExecutorService wrapTask(ExecutorService executorService) {
+ return new OpenSearchConcurrentExecutorService(executorService);
+ }
+
+ /**
+ * Passes the context to the provided delegate executor
+ * @param executor executor to be wrapped with.
+ */
+ public static Executor wrapTask(Executor executor) {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryService.java b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryService.java
new file mode 100644
index 0000000000000..61f971aea7025
--- /dev/null
+++ b/server/src/main/java/org/opensearch/tracing/opentelemetry/OpenTelemetryService.java
@@ -0,0 +1,182 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.tracing.opentelemetry;
+
+import com.sun.management.ThreadMXBean;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
+import org.opensearch.action.ActionListener;
+import org.opensearch.threadpool.ThreadPool;
+import org.opensearch.tracing.TaskEventListener;
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.function.BiFunction;
+
+import static io.opentelemetry.api.common.AttributeKey.longKey;
+import static io.opentelemetry.api.common.AttributeKey.stringKey;
+
+
+public class OpenTelemetryService {
+ public static Resource resource;
+ public static SdkTracerProvider sdkTracerProvider;
+ public static SdkMeterProvider sdkMeterProvider;
+ public static OpenTelemetry openTelemetry;
+ private static final List DEFAULT_TASK_EVENT_LISTENERS;
+ private static final List allowedThreadPools = List.of(ThreadPool.Names.GENERIC);
+ private static final ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
+
+ static {
+ resource = Resource.getDefault()
+ .merge(Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "opensearch-tasks")));
+
+ sdkTracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(OtlpHttpSpanExporter.builder().build()))
+ .setResource(resource)
+ .build();
+
+ sdkMeterProvider = SdkMeterProvider.builder()
+ .registerMetricReader(PeriodicMetricReader.builder(OtlpGrpcMetricExporter.builder().build()).build())
+ .setResource(resource)
+ .build();
+
+ openTelemetry = OpenTelemetrySdk.builder()
+ .setTracerProvider(sdkTracerProvider)
+ .setMeterProvider(sdkMeterProvider)
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .buildAndRegisterGlobal();
+
+ DEFAULT_TASK_EVENT_LISTENERS = List.of(new TaskEventListener() {
+ @Override
+ public void onStart(String operationName, String eventName, Thread t) {
+ Span span = Span.current();
+ if (span != Span.getInvalid()) {
+ span.addEvent(eventName,
+ Attributes.of(
+ AttributeKey.longKey("ThreadID"), t.getId(),
+ AttributeKey.stringKey("ThreadName"), t.getName(),
+ AttributeKey.longKey("CPUUsage"), threadMXBean.getThreadCpuTime(t.getId()),
+ AttributeKey.longKey("MemoryUsage"), threadMXBean.getThreadAllocatedBytes(t.getId()),
+ AttributeKey.longKey("ContentionTime"), threadMXBean.getThreadInfo(t.getId()).getBlockedTime()
+ )
+ );
+ }
+ }
+
+ @Override
+ public void onEnd(String operationName, String eventName, Thread t) {
+ Span span = Span.current();
+ if (span != Span.getInvalid()) {
+ span.addEvent(eventName,
+ Attributes.of(
+ AttributeKey.longKey("ThreadID"), t.getId(),
+ AttributeKey.stringKey("ThreadName"), t.getName(),
+ AttributeKey.longKey("CPUUsage"), threadMXBean.getThreadCpuTime(t.getId()),
+ AttributeKey.longKey("MemoryUsage"), threadMXBean.getThreadAllocatedBytes(t.getId()),
+ AttributeKey.longKey("ContentionTime"), threadMXBean.getThreadInfo(t.getId()).getBlockedTime()
+ )
+ );
+ }
+ }
+
+ @Override
+ public boolean isApplicable(String operationName, String eventName) {
+ return true;
+ }
+ });
+ }
+
+ public static boolean isThreadPoolAllowed(String threadPoolName) {
+ return allowedThreadPools.contains(threadPoolName);
+ }
+
+ /**
+ * starts the span and invokes the function under the scope of new span, closes the scope when function is invoked.
+ * Wraps the ActionListener with {@link OTelContextPreservingActionListener} for context propagation and ends the span
+ * on response/failure of action listener.
+ */
+ public static void callFunctionAndStartSpan(String spanName, BiFunction