diff --git a/core/trino-main/src/main/java/io/trino/execution/executor/dedicated/SplitProcessor.java b/core/trino-main/src/main/java/io/trino/execution/executor/dedicated/SplitProcessor.java index f2721e77494..85846b4e428 100644 --- a/core/trino-main/src/main/java/io/trino/execution/executor/dedicated/SplitProcessor.java +++ b/core/trino-main/src/main/java/io/trino/execution/executor/dedicated/SplitProcessor.java @@ -73,35 +73,37 @@ public void run(SchedulerContext context) long previousScheduledNanos = 0; try (SetThreadName ignored = new SetThreadName("SplitRunner-%s-%s", taskId, splitId)) { while (!split.isFinished()) { - ListenableFuture blocked = split.processFor(SPLIT_RUN_QUANTA); - CpuTimer.CpuDuration elapsed = timer.elapsedTime(); - - long scheduledNanos = elapsed.getWall().roundTo(NANOSECONDS); - processSpan.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, scheduledNanos - previousScheduledNanos); - previousScheduledNanos = scheduledNanos; - - long cpuNanos = elapsed.getCpu().roundTo(NANOSECONDS); - processSpan.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, cpuNanos - previousCpuNanos); - previousCpuNanos = cpuNanos; - - if (!split.isFinished()) { - if (blocked.isDone()) { - processSpan.addEvent("yield"); - processSpan.end(); - if (!context.maybeYield()) { - processSpan = null; - return; + try (var ignored2 = processSpan.makeCurrent()) { + ListenableFuture blocked = split.processFor(SPLIT_RUN_QUANTA); + CpuTimer.CpuDuration elapsed = timer.elapsedTime(); + + long scheduledNanos = elapsed.getWall().roundTo(NANOSECONDS); + processSpan.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, scheduledNanos - previousScheduledNanos); + previousScheduledNanos = scheduledNanos; + + long cpuNanos = elapsed.getCpu().roundTo(NANOSECONDS); + processSpan.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, cpuNanos - previousCpuNanos); + previousCpuNanos = cpuNanos; + + if (!split.isFinished()) { + if (blocked.isDone()) { + processSpan.addEvent("yield"); + processSpan.end(); + if (!context.maybeYield()) { + processSpan = null; + return; + } } - } - else { - processSpan.addEvent("blocked"); - processSpan.end(); - if (!context.block(blocked)) { - processSpan = null; - return; + else { + processSpan.addEvent("blocked"); + processSpan.end(); + if (!context.block(blocked)) { + processSpan = null; + return; + } } + processSpan = newSpan(splitSpan, processSpan); } - processSpan = newSpan(splitSpan, processSpan); } } } diff --git a/core/trino-main/src/test/java/io/trino/execution/TestConnectorTracingContextPropagation.java b/core/trino-main/src/test/java/io/trino/execution/TestConnectorTracingContextPropagation.java new file mode 100644 index 00000000000..5f5c1478a15 --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/execution/TestConnectorTracingContextPropagation.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.collect.ImmutableMap; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; +import io.trino.testing.QueryRunner; +import io.trino.testing.StandaloneQueryRunner; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestConnectorTracingContextPropagation +{ + private static final String CATALOG_NAME = "test_catalog"; + private static final String CONNECTOR_NAME = "test_connector"; + + @Test + public void testTracingContextCapture() + { + AtomicReference capturedContext = new AtomicReference<>(); + + try (QueryRunner queryRunner = new StandaloneQueryRunner(testSessionBuilder().build())) { + queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder() + .withName(CONNECTOR_NAME) + .withData(table -> { // invoked in ConnectorPageSourceProvider + capturedContext.set(Context.current()); + return List.of(List.of()); + }) + .build())); + queryRunner.createCatalog(CATALOG_NAME, CONNECTOR_NAME, ImmutableMap.of()); + + queryRunner.execute("SELECT COUNT(*) FROM %s.test.test".formatted(CATALOG_NAME)); + } + + assertThat(capturedContext.get()) + .matches(ctx -> Span.fromContext(ctx).getSpanContext().isValid(), "valid tracing context"); + } +}