Skip to content

Commit

Permalink
#204, #205: fixed bug where executor could be used after shutting dow…
Browse files Browse the repository at this point in the history
…n. Threads are now named. Removed redundant warning suppressions
  • Loading branch information
bbottema committed Apr 25, 2019
1 parent 91b427f commit a1f621f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.String.format;
import static org.simplejavamail.converter.EmailConverter.mimeMessageToEML;
Expand Down Expand Up @@ -72,7 +70,7 @@ public class MailSender {
* Only set when {@link ProxyConfig} is provided with authentication details.
*/
@Nullable
private AnonymousSocks5Server proxyServer = null;
private AnonymousSocks5Server proxyServer;

/**
* Allows us to manage how many thread we run at the same time using a thread pool.
Expand Down Expand Up @@ -194,21 +192,9 @@ the proxy bridge server (or connection pool in async mode) while a non-async ema
smtpRequestsPhaser.register();
if (async) {
// start up thread pool if necessary
if (executor == null) {
executor = Executors.newFixedThreadPool(operationalConfig.getThreadPoolSize(), new ThreadFactory() {
final AtomicInteger threadCounter = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Simple Java Mail async mail sender #" + threadCounter.getAndIncrement());
if (!thread.isDaemon()) {
thread.setDaemon(true);
}
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
});
if (executor == null || executor.isShutdown()) {
executor = Executors.newFixedThreadPool(operationalConfig.getThreadPoolSize(),
new NamedThreadFactory("Simple Java Mail async mail sender"));
}
configureSessionWithTimeout(session, operationalConfig.getSessionTimeout());
executor.execute(new Runnable() {
Expand Down Expand Up @@ -269,7 +255,6 @@ private void sendMailClosure(@Nonnull final Session session, @Nonnull final Emai

try {
synchronized (this) {
//noinspection ConstantConditions
if (needsAuthenticatedProxy() && !proxyServer.isRunning()) {
LOGGER.trace("starting proxy bridge");
proxyServer.start();
Expand Down Expand Up @@ -319,19 +304,23 @@ private void configureBounceToAddress(final Session session, final Email email)
}

/**
* We need to keep a count of running threads in case a proxyserver is running
* We need to keep a count of running threads in case a proxyserver is running or a connection pool needs to be shut down.
*/
private synchronized void checkShutDownRunningProcesses() {
smtpRequestsPhaser.arriveAndDeregister();
LOGGER.trace("SMTP request threads left: {}", smtpRequestsPhaser.getUnarrivedParties());
// if this thread is the last one finishing
if (smtpRequestsPhaser.getUnarrivedParties() == 0) {
LOGGER.trace("all threads have finished processing");
//noinspection ConstantConditions
if (needsAuthenticatedProxy() && proxyServer.isRunning() && !proxyServer.isStopping()) {
LOGGER.trace("stopping proxy bridge...");
proxyServer.stop();
}
// shutdown the threadpool, or else the Mailer will keep any JVM alive forever
// executor is only available in async mode
if (executor != null) {
executor.shutdown();
}
}
}

Expand Down Expand Up @@ -389,7 +378,6 @@ public synchronized void testConnection() {
logSession(session, "connection test");

try (Transport transport = session.getTransport()) {
//noinspection ConstantConditions
if (needsAuthenticatedProxy() && !proxyServer.isRunning()) {
LOGGER.trace("starting proxy bridge for testing connection");
proxyServer.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.simplejavamail.mailer.internal.mailsender;

import javax.annotation.Nonnull;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.String.format;
import static java.lang.Thread.currentThread;

class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final String threadName;

NamedThreadFactory(@Nonnull final String threadName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : currentThread().getThreadGroup();
this.threadName = threadName;
}

@Nonnull
public Thread newThread(@Nonnull Runnable r) {
Thread t = new Thread(group, r, format("%s %d", threadName, threadNumber.getAndIncrement()));
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

0 comments on commit a1f621f

Please sign in to comment.