Skip to content

Commit

Permalink
Fixed tracing context propagation in connector
Browse files Browse the repository at this point in the history
  • Loading branch information
alekkol authored and martint committed Apr 29, 2024
1 parent 32efe57 commit a7b2e3b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,35 +73,37 @@ public void run(SchedulerContext context)
long previousScheduledNanos = 0;
try (SetThreadName ignored = new SetThreadName("SplitRunner-%s-%s", taskId, splitId)) {
while (!split.isFinished()) {
ListenableFuture<Void> blocked = split.processFor(SPLIT_RUN_QUANTA);
CpuTimer.CpuDuration elapsed = timer.elapsedTime();

long scheduledNanos = elapsed.getWall().roundTo(NANOSECONDS);
processSpan.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, scheduledNanos - previousScheduledNanos);
previousScheduledNanos = scheduledNanos;

long cpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
processSpan.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, cpuNanos - previousCpuNanos);
previousCpuNanos = cpuNanos;

if (!split.isFinished()) {
if (blocked.isDone()) {
processSpan.addEvent("yield");
processSpan.end();
if (!context.maybeYield()) {
processSpan = null;
return;
try (var ignored2 = processSpan.makeCurrent()) {
ListenableFuture<Void> blocked = split.processFor(SPLIT_RUN_QUANTA);
CpuTimer.CpuDuration elapsed = timer.elapsedTime();

long scheduledNanos = elapsed.getWall().roundTo(NANOSECONDS);
processSpan.setAttribute(TrinoAttributes.SPLIT_SCHEDULED_TIME_NANOS, scheduledNanos - previousScheduledNanos);
previousScheduledNanos = scheduledNanos;

long cpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
processSpan.setAttribute(TrinoAttributes.SPLIT_CPU_TIME_NANOS, cpuNanos - previousCpuNanos);
previousCpuNanos = cpuNanos;

if (!split.isFinished()) {
if (blocked.isDone()) {
processSpan.addEvent("yield");
processSpan.end();
if (!context.maybeYield()) {
processSpan = null;
return;
}
}
}
else {
processSpan.addEvent("blocked");
processSpan.end();
if (!context.block(blocked)) {
processSpan = null;
return;
else {
processSpan.addEvent("blocked");
processSpan.end();
if (!context.block(blocked)) {
processSpan = null;
return;
}
}
processSpan = newSpan(splitSpan, processSpan);
}
processSpan = newSpan(splitSpan, processSpan);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution;

import com.google.common.collect.ImmutableMap;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.trino.connector.MockConnectorFactory;
import io.trino.connector.MockConnectorPlugin;
import io.trino.testing.QueryRunner;
import io.trino.testing.StandaloneQueryRunner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static io.trino.testing.TestingSession.testSessionBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@TestInstance(PER_CLASS)
public class TestConnectorTracingContextPropagation
{
private static final String CATALOG_NAME = "test_catalog";
private static final String CONNECTOR_NAME = "test_connector";

@Test
public void testTracingContextCapture()
{
AtomicReference<Context> capturedContext = new AtomicReference<>();

try (QueryRunner queryRunner = new StandaloneQueryRunner(testSessionBuilder().build())) {
queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder()
.withName(CONNECTOR_NAME)
.withData(table -> { // invoked in ConnectorPageSourceProvider
capturedContext.set(Context.current());
return List.of(List.of());
})
.build()));
queryRunner.createCatalog(CATALOG_NAME, CONNECTOR_NAME, ImmutableMap.of());

queryRunner.execute("SELECT COUNT(*) FROM %s.test.test".formatted(CATALOG_NAME));
}

assertThat(capturedContext.get())
.matches(ctx -> Span.fromContext(ctx).getSpanContext().isValid(), "valid tracing context");
}
}

0 comments on commit a7b2e3b

Please sign in to comment.