Skip to content

Commit

Permalink
Test no thread leaks in engine and connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed May 9, 2024
1 parent 0f097b7 commit 2477e6b
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import org.junit.jupiter.api.Test;

import static io.trino.plugin.jdbc.H2QueryRunner.createH2QueryRunner;
import static io.trino.testing.ThreadAssertions.assertNoThreadLeakedInQueryRunner;
import static io.trino.tpch.TpchTable.REGION;

public class TestJdbcQueryRunner
{
@Test
public void testNoThreadLeaked()
throws Exception
{
assertNoThreadLeakedInQueryRunner(
() -> createH2QueryRunner(REGION),
queryRunner -> queryRunner.execute("TABLE region"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake;

import org.junit.jupiter.api.Test;

import java.util.List;

import static io.trino.testing.ThreadAssertions.assertNoThreadLeakedInQueryRunner;
import static io.trino.tpch.TpchTable.REGION;

public class TestDeltaLakeQueryRunner
{
@Test
public void testNoThreadLeaked()
throws Exception
{
assertNoThreadLeakedInQueryRunner(
() -> DeltaLakeQueryRunner.builder()
.setInitialTables(List.of(REGION))
.build(),
queryRunner -> queryRunner.execute("TABLE region"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import org.junit.jupiter.api.Test;

import java.util.List;

import static io.trino.testing.ThreadAssertions.assertNoThreadLeakedInQueryRunner;
import static io.trino.tpch.TpchTable.REGION;

public class TestHiveQueryRunner
{
@Test
public void testNoThreadLeaked()
throws Exception
{
// create one table to exercise the plugin code
assertNoThreadLeakedInQueryRunner(
() -> HiveQueryRunner.builder()
// create one table to exercise the plugin code
.setInitialTables(List.of(REGION))
.build(),
queryRunner -> queryRunner.execute("TABLE region"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hudi;

import io.trino.plugin.hudi.testing.TpchHudiTablesInitializer;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

import static io.trino.plugin.hudi.HudiQueryRunner.createHudiQueryRunner;
import static io.trino.testing.ThreadAssertions.assertNoThreadLeakedInQueryRunner;
import static io.trino.tpch.TpchTable.REGION;

public class TestHudiQueryRunner
{
@Test
public void testNoThreadLeaked()
throws Exception
{
assertNoThreadLeakedInQueryRunner(
() -> createHudiQueryRunner(
Map.of(),
// create one table to exercise the plugin code
new TpchHudiTablesInitializer(List.of(REGION))),
queryRunner -> queryRunner.execute("TABLE region"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import org.junit.jupiter.api.Test;

import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static io.trino.testing.ThreadAssertions.assertNoThreadLeakedInQueryRunner;
import static io.trino.tpch.TpchTable.REGION;

public class TestIcebergQueryRunner
{
@Test
public void testNoThreadLeaked()
throws Exception
{
// create one table to exercise the plugin code
assertNoThreadLeakedInQueryRunner(
() -> createIcebergQueryRunner(REGION),
queryRunner -> {
// Create a table with a bunch of manifests so that Iceberg library may process it in parallel
for (int i = 0; i < 3; i++) {
queryRunner.execute("INSERT INTO region SELECT * FROM tpch.tiny.region WHERE regionkey = 0");
}
queryRunner.execute("TABLE region");
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.testing;

import io.trino.jvm.Threads;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static java.util.stream.Collectors.joining;

public final class ThreadAssertions
{
private ThreadAssertions() {}

private static final AtomicInteger sequence = new AtomicInteger();

public static void assertNoThreadLeakedInPlanTester(ThrowingSupplier<PlanTester> resourceCreator, Consumer<? super PlanTester> exerciseResource)
throws Exception
{
assertNoThreadLeaked(resourceCreator, exerciseResource);
}

public static <T extends QueryRunner> void assertNoThreadLeakedInQueryRunner(ThrowingSupplier<T> resourceCreator, Consumer<? super T> exerciseResource)
throws Exception
{
assertNoThreadLeaked(resourceCreator, exerciseResource);
}

private static <T extends AutoCloseable> void assertNoThreadLeaked(
ThrowingSupplier<T> resourceCreator,
Consumer<? super T> exerciseResource)
throws Exception
{
// warm up all statically initialized threads
try (T resource = resourceCreator.get()) {
exerciseResource.accept(resource);
}

ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
CompletableFuture<Void> testFuture = new CompletableFuture<>();
ThreadGroup threadGroup = new ThreadGroup("test-group-" + sequence.incrementAndGet());
Thread.ofPlatform().group(threadGroup).start(() -> {
testFuture.completeAsync(() -> {
try (T resource = resourceCreator.get()) {
exerciseResource.accept(resource);
}
catch (Exception e) {
throw new RuntimeException(e);
}
Thread[] threads = new Thread[256];
// TODO detect leaked virtual threads -- enumerate does not return them and they live in their own thread group
int count = threadGroup.enumerate(threads, true);
String stackTraces = Arrays.stream(threads, 0, count)
.filter(thread -> thread != Thread.currentThread())
// Common ForkJoinPool threads are statically managed, not considered a leak
.filter(thread -> !thread.getName().startsWith("ForkJoinPool.commonPool-worker-"))
// OkHttp TaskRunner is statically managed (okhttp3.internal.concurrent.TaskRunner), not considered a leak
.filter(thread -> !thread.getName().equals("OkHttp TaskRunner"))
.map(thread -> threadMXBean.getThreadInfo(thread.threadId(), Integer.MAX_VALUE))
.filter(Objects::nonNull) // could be virtual, or exit concurrently
.map(Threads::fullToString)
.collect(joining("\n"));

if (!stackTraces.isEmpty()) {
throw new AssertionError("Threads leaked:\n" + stackTraces);
}

return null;
}, directExecutor());
});
getFutureValue(testFuture);
}

public interface ThrowingSupplier<T>
{
T get()
throws Exception;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.tests;

import io.trino.testing.PlanTester;
import org.junit.jupiter.api.Test;

import static io.trino.testing.TestingSession.testSession;
import static io.trino.testing.ThreadAssertions.assertNoThreadLeakedInPlanTester;

public class TestPlanTester
{
@Test
public void testNoThreadLeaked()
throws Exception
{
assertNoThreadLeakedInPlanTester(
() -> PlanTester.create(testSession()),
planTester -> planTester.inTransaction(transactionSession -> {
planTester.createPlan(transactionSession, "SELECT 1");
return null;
}));
}
}

0 comments on commit 2477e6b

Please sign in to comment.