Skip to content

Commit

Permalink
Python debugging support (#3075)
Browse files Browse the repository at this point in the history
This commit adds a configuration option to customize engine-created
threads, and provides a default implementation that will register those
threads for debugging with pydevd if python is enabled.

As of this commit, pydevd debugging seems to work correctly, but
VSCode's debugging doesn't work for all threads yet.

Partial #2997
  • Loading branch information
niloc132 authored Feb 3, 2023
1 parent bb0e2bc commit 46ef898
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 181 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,42 @@

public class NamingThreadFactory implements ThreadFactory {
private final AtomicInteger threadCounter = new AtomicInteger(0);
public final String name;
private final Class clazz;
private boolean daemon;
private final String name;
private final Class<?> clazz;
private final boolean daemon;
private final ThreadGroup threadGroup;

public NamingThreadFactory(final Class clazz, final String name) {
this(clazz, name, false);
/**
* Creates a thread factory using the provided class and name as part of the thread name. All created threads will
* be daemon threads.
*
* @param clazz a class to use when naming each thread
* @param name a name component to add after the class name when naming each thread
*/
public NamingThreadFactory(final Class<?> clazz, final String name) {
this(clazz, name, true);
}

public NamingThreadFactory(final Class clazz, final String name, boolean daemon) {
/**
* Creates a thread factory using the provided class and name as part of the thread name.
*
* @param clazz a class to use when naming each thread
* @param name a name component to add after the class name when naming each thread
* @param daemon true to make each thread a daemon thread
*/
public NamingThreadFactory(final Class<?> clazz, final String name, boolean daemon) {
this(null, clazz, name, daemon);
}

public NamingThreadFactory(ThreadGroup threadGroup, final Class clazz, final String name, boolean daemon) {
/**
* Creates a thread factory using the provided class and name as part of the thread name.
*
* @param threadGroup a thread group to add each thread to
* @param clazz a class to use when naming each thread
* @param name a name component to add after the class name when naming each thread
* @param daemon true to make each thread a daemon thread
*/
public NamingThreadFactory(ThreadGroup threadGroup, final Class<?> clazz, final String name, boolean daemon) {
this.threadGroup = threadGroup;
this.clazz = clazz;
this.name = name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.deephaven.util.thread;

import io.deephaven.configuration.Configuration;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* Extension point to allow threads that will run user code from within the platform to be controlled by configuration.
*/
public interface ThreadInitializationFactory {
/* private */ String[] CONFIGURED_INITIALIZATION_TYPES =
Configuration.getInstance().getStringArrayFromProperty("thread.initialization");
/* private */ List<ThreadInitializationFactory> INITIALIZERS = Arrays.stream(CONFIGURED_INITIALIZATION_TYPES)
.filter(str -> !str.isBlank())
.map(type -> {
try {
// noinspection unchecked
Class<? extends ThreadInitializationFactory> clazz =
(Class<? extends ThreadInitializationFactory>) Class.forName(type);
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException
| InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException(
"Error instantiating initializer " + type + ", please check configuration");
}
})
.collect(Collectors.toUnmodifiableList());

/**
* Chains configured initializers to run before/around any given runnable, returning a runnable intended to be run
* by a new thread.
*/
static Runnable wrapRunnable(Runnable runnable) {
Runnable acc = runnable;
for (ThreadInitializationFactory INITIALIZER : INITIALIZERS) {
acc = INITIALIZER.createInitializer(acc);
}
return acc;
}

Runnable createInitializer(Runnable runnable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.chunk.util.pools.MultiChunkPool;
import io.deephaven.configuration.Configuration;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -39,11 +40,11 @@ public static boolean isInitializationThread() {
true) {
@Override
public Thread newThread(@NotNull Runnable r) {
return super.newThread(() -> {
return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> {
isInitializationThread.set(true);
MultiChunkPool.enableDedicatedPoolForThisThread();
r.run();
});
}));
}
};
executorService = Executors.newFixedThreadPool(NUM_THREADS, threadFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class SparseSelect {

private final static ExecutorService executor = SPARSE_SELECT_THREADS == 1 ? null
: Executors.newFixedThreadPool(SPARSE_SELECT_THREADS,
new NamingThreadFactory(SparseSelect.class, "copyThread", true));
new NamingThreadFactory(SparseSelect.class, "copyThread"));

private SparseSelect() {} // static use only

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider;
import io.deephaven.engine.table.impl.locations.impl.SubscriptionAggregator;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.util.thread.NamingThreadFactory;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.Future;
Expand All @@ -31,8 +32,6 @@ public class ExecutorTableDataRefreshService implements TableDataRefreshService
private final long tableLocationProviderRefreshIntervalMillis;
private final long tableLocationRefreshIntervalMillis;

private final AtomicInteger threadCount = new AtomicInteger(0);

private final ScheduledThreadPoolExecutor scheduler;

private final Value providerSubscriptions;
Expand All @@ -50,8 +49,9 @@ public ExecutorTableDataRefreshService(@NotNull final String name,
this.tableLocationRefreshIntervalMillis =
Require.gtZero(tableLocationRefreshIntervalMillis, "tableLocationRefreshIntervalMillis");

NamingThreadFactory threadFactory = new NamingThreadFactory(TableDataRefreshService.class, "refreshThread");
scheduler =
new ScheduledThreadPoolExecutor(threadPoolSize, this::makeThread, new ThreadPoolExecutor.AbortPolicy());
new ScheduledThreadPoolExecutor(threadPoolSize, threadFactory, new ThreadPoolExecutor.AbortPolicy());
scheduler.setRemoveOnCancelPolicy(true);

providerSubscriptions = Stats.makeItem(NAME_PREFIX + name, "providerSubscriptions", Counter.FACTORY).getValue();
Expand All @@ -62,13 +62,6 @@ public ExecutorTableDataRefreshService(@NotNull final String name,
.makeItem(NAME_PREFIX + name, "locationSubscriptionRefreshDurationNanos", State.FACTORY).getValue();
}

private Thread makeThread(final Runnable runnable) {
final Thread thread =
new Thread(runnable, NAME_PREFIX + name + "-refreshThread-" + threadCount.incrementAndGet());
thread.setDaemon(true);
return thread;
}

@Override
public void submitOneTimeAsyncTask(@NotNull final Runnable task) {
scheduler.submit(task);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.deephaven.util.locks.AwareFunctionalLock;
import io.deephaven.util.process.ProcessEnvironment;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadDump;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -262,17 +265,14 @@ public synchronized void take(final AccumulatedCycleStats out) {
notificationProcessor = makeNotificationProcessor();
jvmIntrospectionContext = new JvmIntrospectionContext();

refreshThread = new Thread("UpdateGraphProcessor." + name() + ".refreshThread") {
@Override
public void run() {
configureRefreshThread();
// noinspection InfiniteLoopStatement
while (true) {
Assert.eqFalse(ALLOW_UNIT_TEST_MODE, "ALLOW_UNIT_TEST_MODE");
refreshTablesAndFlushNotifications();
}
refreshThread = new Thread(ThreadInitializationFactory.wrapRunnable(() -> {
configureRefreshThread();
// noinspection InfiniteLoopStatement
while (true) {
Assert.eqFalse(ALLOW_UNIT_TEST_MODE, "ALLOW_UNIT_TEST_MODE");
refreshTablesAndFlushNotifications();
}
};
}), "UpdateGraphProcessor." + name() + ".refreshThread");
refreshThread.setDaemon(true);

final int updateThreads =
Expand Down Expand Up @@ -1800,10 +1800,10 @@ private UpdateGraphProcessorThreadFactory(@NotNull final ThreadGroup threadGroup

@Override
public Thread newThread(@NotNull final Runnable r) {
return super.newThread(() -> {
return super.newThread(ThreadInitializationFactory.wrapRunnable(() -> {
configureRefreshThread();
r.run();
});
}));
}
}

Expand Down Expand Up @@ -1835,7 +1835,7 @@ private ExecutorService makeUnitTestRefreshExecutor() {
private class UnitTestRefreshThreadFactory extends NamingThreadFactory {

private UnitTestRefreshThreadFactory() {
super(UpdateGraphProcessor.class, "unitTestRefresh", true);
super(UpdateGraphProcessor.class, "unitTestRefresh");
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions props/configs/src/main/resources/dh-defaults.prop
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ default.processEnvironmentFactory=io.deephaven.util.process.DefaultProcessEnviro

OperationInitializationThreadPool.threads=1

deephaven.console.type=python

# Default session duration is 5 minutes
http.session.durationMs=300000

Expand All @@ -63,3 +65,7 @@ client.configuration.list=java.version,deephaven.version,barrage.version,http.se
# jar, and a class that is found in that jar. Any such keys will be made available to the client.configuration.list
# as <key>.version.
client.version.list=deephaven=io.deephaven.engine.table.Table,barrage=io.deephaven.barrage.flatbuf.BarrageMessageWrapper


# Specifies additional setup to run on threads that can perform table operations with user code. Comma-separated list, instances must be of type io.deephaven.util.thread.ThreadInitializationFactory
thread.initialization=io.deephaven.server.console.python.DebuggingInitializer
4 changes: 4 additions & 0 deletions props/test-configs/src/main/resources/dh-tests.prop
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,8 @@ http.session.durationMs=300000
AuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler
authentication.client.configuration.list=
client.version.list=

authentication.anonymous.warn=false

deephaven.console.type=none
thread.initialization=
Loading

0 comments on commit 46ef898

Please sign in to comment.