Skip to content

Commit

Permalink
Fix query transition to RUNNING state with task-level retries
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 2, 2022
1 parent 102ab2a commit 91cca41
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,8 @@ public void schedule()
return;
}

stateMachine.transitionToRunning();

try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
List<ListenableFuture<Void>> blockedStages = new ArrayList<>();
while (!isFinishingOrDone(queryStateMachine) && !stateMachine.getState().isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,49 @@
*/
package io.trino.testing;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.Session;
import io.trino.execution.QueryManager;
import io.trino.server.BasicQueryInfo;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.FeaturesConfig.JoinDistributionType.BROADCAST;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.execution.QueryState.RUNNING;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public abstract class AbstractDistributedEngineOnlyQueries
extends AbstractTestEngineOnlyQueries
{
private ExecutorService executorService;

@BeforeClass
public void setUp()
{
executorService = newCachedThreadPool();
}

@AfterClass(alwaysRun = true)
public void shutdown()
{
executorService.shutdownNow();
}

/**
* Ensure the tests are run with {@link io.trino.testing.DistributedQueryRunner}. E.g. {@link io.trino.testing.LocalQueryRunner} takes some
* shortcuts, not exercising certain aspects.
Expand Down Expand Up @@ -294,4 +320,30 @@ public void testImplicitCastToRowWithFieldsRequiringDelimitation()
// run INSERT to verify that field names in generated CAST expressions are properly delimited
assertUpdate("INSERT INTO target_table SELECT * from source_table", 0);
}

@Test(timeOut = 10_000)
public void testQueryTransitionsToRunningState()
{
String query = format(
// use random marker in query for unique matching below
"SELECT count(*) c_%s FROM lineitem CROSS JOIN lineitem CROSS JOIN lineitem",
randomTableSuffix());
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
ListenableFuture<?> queryFuture = Futures.submit(
() -> queryRunner.execute(getSession(), query), executorService);

QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
assertEventually(() -> {
List<BasicQueryInfo> queryInfos = queryManager.getQueries().stream()
.filter(q -> q.getQuery().equals(query))
.collect(toImmutableList());

assertThat(queryInfos).hasSize(1);
assertThat(queryInfos.get(0).getState()).isEqualTo(RUNNING);
// we are good. Let's kill the query
queryManager.cancelQuery(queryInfos.get(0).getQueryId());
});

assertThatThrownBy(queryFuture::get).hasMessageContaining("Query was canceled");
}
}

0 comments on commit 91cca41

Please sign in to comment.