Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow a custom ExecutorService to be set for each pool #113

Merged
merged 1 commit into from
Jul 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions hikaricp/src/main/java/com/zaxxer/hikari/HikariConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import javax.sql.DataSource;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class HikariConfig implements HikariConfigMBean
private Properties dataSourceProperties;
private IConnectionCustomizer customizer;
private int transactionIsolation;
private ThreadFactory threadFactory;

static {
JavassistProxyFactory.initialize();
Expand Down Expand Up @@ -625,6 +627,26 @@ public void setUsername(String username)
this.username = username;
}

/**
* Get the thread factory used to create threads.
*
* @return the thread factory (may be null, in which case the default thread factory is used)
*/
public ThreadFactory getThreadFactory()
{
return threadFactory;
}

/**
* Set the thread factory to be used to create threads.
*
* @param threadFactory the thread factory (setting to null causes the default thread factory to be used)
*/
public void setThreadFactory(ThreadFactory threadFactory)
{
this.threadFactory = threadFactory;
}

public void validate()
{
Logger logger = LoggerFactory.getLogger(getClass());
Expand Down
23 changes: 12 additions & 11 deletions hikaricp/src/main/java/com/zaxxer/hikari/pool/HikariPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -48,6 +48,7 @@
import com.zaxxer.hikari.proxy.ProxyFactory;
import com.zaxxer.hikari.util.ConcurrentBag;
import com.zaxxer.hikari.util.ConcurrentBag.IBagStateListener;
import com.zaxxer.hikari.util.DefaultThreadFactory;
import com.zaxxer.hikari.util.DriverDataSource;
import com.zaxxer.hikari.util.PropertyBeanSetter;

Expand All @@ -71,7 +72,7 @@ public final class HikariPool implements HikariPoolMBean, IBagStateListener

private final AtomicReference<Throwable> lastConnectionFailure;
private final AtomicInteger totalConnections;
private final Timer houseKeepingTimer;
private final ScheduledThreadPoolExecutor houseKeepingExecutorService;

private final boolean isAutoCommit;
private final boolean isIsolateInternalQueries;
Expand Down Expand Up @@ -136,13 +137,14 @@ public HikariPool(HikariConfig configuration, String username, String password)
HikariMBeanElf.registerMBeans(configuration, this);
}

addConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection filler");
addConnectionExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection filler", configuration.getThreadFactory());

fillPool();

long delayPeriod = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", TimeUnit.SECONDS.toMillis(30L));
houseKeepingTimer = new Timer("Hikari Housekeeping Timer (pool " + configuration.getPoolName() + ")", true);
houseKeepingTimer.scheduleAtFixedRate(new HouseKeeper(), delayPeriod, delayPeriod);
houseKeepingExecutorService = new ScheduledThreadPoolExecutor(1, configuration.getThreadFactory() != null ? configuration.getThreadFactory() : new DefaultThreadFactory("Hikari Housekeeping Timer (pool " + configuration.getPoolName() + ")", true));
houseKeepingExecutorService.setRemoveOnCancelPolicy(true);
houseKeepingExecutorService.scheduleAtFixedRate(new HouseKeeper(), delayPeriod, delayPeriod, TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -181,7 +183,7 @@ public Connection getConnection() throws SQLException
}

if (leakDetectionThreshold != 0) {
connection.captureStack(leakDetectionThreshold, houseKeepingTimer);
connection.captureStack(leakDetectionThreshold, houseKeepingExecutorService);
}

return connection;
Expand Down Expand Up @@ -234,7 +236,7 @@ public void shutdown() throws InterruptedException
LOGGER.info("HikariCP pool {} is shutting down.", configuration.getPoolName());

logPoolState("Before shutdown ");
houseKeepingTimer.cancel();
houseKeepingExecutorService.shutdownNow();
addConnectionExecutor.shutdownNow();

final long start = System.currentTimeMillis();
Expand Down Expand Up @@ -465,7 +467,7 @@ else if (configuration.getMinimumIdle() > 0) {
*/
private void abortActiveConnections() throws InterruptedException
{
ThreadPoolExecutor assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin");
ExecutorService assassinExecutor = createThreadPoolExecutor(configuration.getMaximumPoolSize(), "HikariCP connection assassin", configuration.getThreadFactory());
connectionBag.values(ConcurrentBag.STATE_IN_USE).parallelStream().forEach(connectionProxy -> {
try {
connectionProxy.abort(assassinExecutor);
Expand Down Expand Up @@ -528,15 +530,14 @@ private void logPoolState(String... prefix)
/**
* The house keeping task to retire idle and maxAge connections.
*/
private class HouseKeeper extends TimerTask
private class HouseKeeper implements Runnable
{
@Override
public void run()
{
logPoolState("Before cleanup ");

connectionTimeout = configuration.getConnectionTimeout(); // refresh member in case it changed
houseKeepingTimer.purge(); // purge cancelled timers

final long now = System.currentTimeMillis();
final long idleTimeout = configuration.getIdleTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.sql.Statement;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
Expand Down Expand Up @@ -64,7 +64,7 @@ public abstract class ConnectionProxy implements IHikariConnectionProxy
private volatile long lastAccess;
private long uncloseTime;

private TimerTask leakTask;
private LeakTask leakTask;

private final int hashCode;

Expand Down Expand Up @@ -151,14 +151,14 @@ public String toString()

/** {@inheritDoc} */
@Override
public final void captureStack(long leakDetectionThreshold, Timer scheduler)
public final void captureStack(long leakDetectionThreshold, ScheduledExecutorService executorService)
{
StackTraceElement[] trace = Thread.currentThread().getStackTrace();
StackTraceElement[] leakTrace = new StackTraceElement[trace.length - 4];
System.arraycopy(trace, 4, leakTrace, 0, leakTrace.length);

leakTask = new LeakTask(leakTrace, leakDetectionThreshold);
scheduler.schedule(leakTask, leakDetectionThreshold);
executorService.schedule(leakTask, leakDetectionThreshold, TimeUnit.MILLISECONDS);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Timer;
import java.util.concurrent.ScheduledExecutorService;

import com.zaxxer.hikari.util.ConcurrentBag.IBagManagable;

Expand All @@ -35,9 +35,9 @@ public interface IHikariConnectionProxy extends Connection, IBagManagable
* Catpure the stack and start leak detection.
*
* @param leakThreshold the number of milliseconds before a leak is reported
* @param houseKeepingTimer the timer to run the leak detection task with
* @param houseKeepingExecutorService the executor service to run the leak detection task with
*/
void captureStack(long leakThreshold, Timer houseKeepingTimer);
void captureStack(long leakThreshold, ScheduledExecutorService houseKeepingExecutorService);

/**
* Check if the provided SQLException contains a SQLSTATE that indicates
Expand Down
13 changes: 3 additions & 10 deletions hikaricp/src/main/java/com/zaxxer/hikari/proxy/LeakTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

package com.zaxxer.hikari.proxy;

import java.util.TimerTask;

import org.slf4j.LoggerFactory;

/**
* @author Brett Wooldridge
*/
class LeakTask extends TimerTask
class LeakTask implements Runnable
{
private final long leakTime;
private StackTraceElement[] stackTrace;
Expand All @@ -46,13 +44,8 @@ public void run()
}
}

@Override
public boolean cancel()
public void cancel()
{
boolean cancelled = super.cancel();
if (cancelled) {
stackTrace = null;
}
return cancelled;
stackTrace = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.zaxxer.hikari.util;

import java.util.concurrent.ThreadFactory;

public class DefaultThreadFactory implements ThreadFactory {

private String threadName;
private boolean daemon;

public DefaultThreadFactory(String threadName, boolean daemon){
this.threadName = threadName;
this.daemon = daemon;
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, threadName);
thread.setDaemon(daemon);
return thread;
}
}
13 changes: 4 additions & 9 deletions hikaricp/src/main/java/com/zaxxer/hikari/util/PoolUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,11 @@ public static <T> T createInstance(String className, Class<T> clazz, Object... a
}
}

public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName)
public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, final String threadName, ThreadFactory threadFactory)
{
ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r)
{
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}
};
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}

int processors = Math.max(1, Runtime.getRuntime().availableProcessors() / 2);
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize);
Expand Down