Skip to content

Commit

Permalink
[issue #86] Improve logging for http messaging and remoting
Browse files Browse the repository at this point in the history
[issue #64] Inexpressive thread names
[issue #85] Reduce number of threads in ui layer
fixes issue #86
fixes issue #64
minor changes for issue #85
  • Loading branch information
Michael Grossmann committed Apr 14, 2018
1 parent c979d0c commit 8e0cd80
Show file tree
Hide file tree
Showing 30 changed files with 246 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding/<project>=UTF-8
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Collection;

import org.jowidgets.service.api.IServicesDecoratorProvider;
import org.jowidgets.util.concurrent.IThreadInterruptObservable;
import org.jowidgets.util.concurrent.ThreadInterruptObserver;

public interface ICancelServicesDecoratorProviderBuilder {

Expand Down Expand Up @@ -105,6 +107,21 @@ public interface ICancelServicesDecoratorProviderBuilder {
*/
ICancelServicesDecoratorProviderBuilder setOrder(int order);

/**
* Sets a thread interrupt observable that will be used to observe the executing thread for interrupts.
*
* If not set, the {@link ThreadInterruptObserver} will be used with a default delay of 1000 milliseconds and the
* observer will be started when executing {@link #build()} method.
*
* If set manually with this method, the invoker is responsible to start the observer.
*
* @param threadInterruptObservable The observable to set, must not be null
*
* @return This builder
*/
ICancelServicesDecoratorProviderBuilder setThreadInterruptObservable(
final IThreadInterruptObservable threadInterruptObservable);

/**
* @return A new {@link IServicesDecoratorProvider}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,30 @@
import org.jowidgets.cap.service.hibernate.api.ICancelServicesDecoratorProviderBuilder;
import org.jowidgets.service.api.IServicesDecoratorProvider;
import org.jowidgets.util.Assert;
import org.jowidgets.util.IProvider;
import org.jowidgets.util.concurrent.IThreadInterruptObservable;

final class CancelServicesDecoratorProviderBuilder implements ICancelServicesDecoratorProviderBuilder {

private final String persistenceUnitName;
private final IThreadInterruptObservable threadInterruptObservable;
private final IProvider<IThreadInterruptObservable> threadInterruptObservableProvider;

private final Set<Class<?>> services;

private IThreadInterruptObservable threadInterruptObservable;
private Long killAfterMillis = Long.valueOf(60000);
private Long minQueryRuntimeMillis = Long.valueOf(25);

private int order;

CancelServicesDecoratorProviderBuilder(
final String persistenceUnitName,
final IThreadInterruptObservable threadInterruptObservable) {

final IProvider<IThreadInterruptObservable> threadInterruptObservableProvider) {
Assert.paramNotEmpty(persistenceUnitName, "persistenceUnitName");
Assert.paramNotNull(threadInterruptObservable, "threadInterruptObservable");
Assert.paramNotNull(threadInterruptObservableProvider, "");

this.persistenceUnitName = persistenceUnitName;
this.threadInterruptObservable = threadInterruptObservable;
this.threadInterruptObservableProvider = threadInterruptObservableProvider;
this.order = ICancelServicesDecoratorProviderBuilder.DEFAULT_ORDER;
this.services = new HashSet<Class<?>>();

Expand Down Expand Up @@ -109,11 +111,26 @@ public ICancelServicesDecoratorProviderBuilder setOrder(final int order) {
return this;
}

@Override
public ICancelServicesDecoratorProviderBuilder setThreadInterruptObservable(
final IThreadInterruptObservable threadInterruptObservable) {
Assert.paramNotNull(threadInterruptObservableProvider, "threadInterruptObservable");
this.threadInterruptObservable = threadInterruptObservable;
return this;
}

private IThreadInterruptObservable getOrCreateThreadInterruptObservable() {
if (threadInterruptObservable == null) {
threadInterruptObservable = threadInterruptObservableProvider.get();
}
return threadInterruptObservable;
}

@Override
public IServicesDecoratorProvider build() {
return new CancelServicesDecoratorProviderImpl(
persistenceUnitName,
threadInterruptObservable,
getOrCreateThreadInterruptObservable(),
services,
minQueryRuntimeMillis,
killAfterMillis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import java.sql.Connection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
Expand Down Expand Up @@ -68,7 +68,7 @@ final class CancelableInvoker {
private final CountDownLatch queryFinishedLatch;
private final CountDownLatch queryStartedLatch;
private final AtomicLong queryStartedTimestamp;
private final AtomicBoolean terminateInvokedByCancel;
private final AtomicReference<Long> terminateInvokedByCancelTimestamp;

private final String clientIdentifier;
private final IKillSessionSupport killSessionSupport;
Expand Down Expand Up @@ -131,7 +131,7 @@ final class CancelableInvoker {
this.queryFinishedLatch = new CountDownLatch(1);
this.queryStartedLatch = new CountDownLatch(1);
this.queryStartedTimestamp = new AtomicLong(0);
this.terminateInvokedByCancel = new AtomicBoolean(false);
this.terminateInvokedByCancelTimestamp = new AtomicReference<Long>();

this.clientIdentifier = UUID.randomUUID().toString();
this.killSessionSupport = killSessionSupport;
Expand Down Expand Up @@ -226,16 +226,20 @@ private class ThreadInterruptListener implements IThreadInterruptListener {
@Override
public void interrupted(final Thread thread) {
//avoid that terminate will be invoked twice on one interrupt listener interval
if (!terminateInvokedByCancel.get()) {
final Long terminateInvokedByCancel = terminateInvokedByCancelTimestamp.get();
if (terminateInvokedByCancel == null
|| getDelay(terminateInvokedByCancel.longValue()) > threadInterruptObservable.getDelayMillis()) {
queryTerminator.terminateQuery();
terminateInvokedByCancelTimestamp.set(null);
}
else {
//do invoke terminate next time in case termination fails
terminateInvokedByCancel.set(false);
}

}
}

private long getDelay(final long timestampMillis) {
return systemTimeProvider.currentTimeMillis() - timestampMillis;
}

private class CancelListener implements IExecutionCallbackListener {

private final Thread executingThread;
Expand All @@ -247,7 +251,7 @@ private class CancelListener implements IExecutionCallbackListener {
@Override
public void canceled() {
//avoid that terminate will be invoked twice on one interrupt listener interval
terminateInvokedByCancel.set(true);
terminateInvokedByCancelTimestamp.set(Long.valueOf(systemTimeProvider.currentTimeMillis()));

//interrupt the executing thread, necessary e.g. if connection pool waits for connection
executingThread.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,37 @@

package org.jowidgets.cap.service.hibernate.impl;

import java.util.concurrent.ThreadFactory;

import org.jowidgets.cap.service.hibernate.api.ICancelServicesDecoratorProviderBuilder;
import org.jowidgets.cap.service.hibernate.api.IHibernateServiceToolkit;
import org.jowidgets.logging.api.ILogger;
import org.jowidgets.logging.api.LoggerProvider;
import org.jowidgets.util.DefaultProvider;
import org.jowidgets.util.IDecorator;
import org.jowidgets.util.IExceptionHandler;
import org.jowidgets.util.IFactory;
import org.jowidgets.util.IProvider;
import org.jowidgets.util.concurrent.DaemonThreadFactory;
import org.jowidgets.util.concurrent.IThreadInterruptObservable;
import org.jowidgets.util.concurrent.ThreadInterruptObserver;

public final class HibernateServiceToolkitImpl implements IHibernateServiceToolkit {

private static final ILogger LOGGER = LoggerProvider.get(HibernateServiceToolkitImpl.class);
private static final IExceptionHandler THREAD_INTERRUPT_OBSERVER_EXCEPTION_HANDLER = new LoggingExceptionHandler(
LoggerProvider.get(CancelServicesDecoratorProviderImpl.class, LoggingExceptionHandler.class));

private static final ThreadFactory THREAD_INTERRUPT_OBSERVER_THREAD_FACTORY = DaemonThreadFactory.create(
HibernateServiceToolkitImpl.class.getName() + ".QueryThreadInterruptObserver");

private static final long THREAD_INTERRUPT_OBSERVER_DEFAULT_PERIOD = 1000;

private final IDecorator<Throwable> exceptionDecorator;
private final ThreadInterruptObserver threadInterruptObserver;
private final IProvider<IThreadInterruptObservable> threadInterruptObservableProvider;

public HibernateServiceToolkitImpl() {
this.exceptionDecorator = new HibernateExceptionDecoratorImpl();
this.threadInterruptObserver = new ThreadInterruptObserver(
HibernateServiceToolkitImpl.class.getName() + "-CancelQueryThreadInterruptWatchdog-",
new LoggingExceptionHandler());
this.threadInterruptObservableProvider = new DefaultProvider<IThreadInterruptObservable>(
new DefaultThreadInterruptObserverFactory());
}

@Override
Expand All @@ -57,16 +68,18 @@ public IDecorator<Throwable> exceptionDecorator() {

@Override
public ICancelServicesDecoratorProviderBuilder cancelServiceDecoratorProviderBuilder(final String persistenceUnitName) {
if (!threadInterruptObserver.isRunning()) {
threadInterruptObserver.start();
}
return new CancelServicesDecoratorProviderBuilder(persistenceUnitName, threadInterruptObserver);
return new CancelServicesDecoratorProviderBuilder(persistenceUnitName, threadInterruptObservableProvider);
}

private class LoggingExceptionHandler implements IExceptionHandler {
private class DefaultThreadInterruptObserverFactory implements IFactory<IThreadInterruptObservable> {
@Override
public void handleException(final Throwable exception) {
LOGGER.error(exception);
public IThreadInterruptObservable create() {
final ThreadInterruptObserver result = new ThreadInterruptObserver(
THREAD_INTERRUPT_OBSERVER_THREAD_FACTORY,
THREAD_INTERRUPT_OBSERVER_EXCEPTION_HANDLER,
THREAD_INTERRUPT_OBSERVER_DEFAULT_PERIOD);
result.start();
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2018, grossmann
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the jo-widgets.org nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL jo-widgets.org BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*/

package org.jowidgets.cap.service.hibernate.impl;

import org.jowidgets.logging.api.ILogger;
import org.jowidgets.util.IExceptionHandler;

final class LoggingExceptionHandler implements IExceptionHandler {

private final ILogger logger;

LoggingExceptionHandler(final ILogger logger) {
this.logger = logger;
}

@Override
public void handleException(final Throwable exception) {
logger.error(exception);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private boolean cancelQueryOfSession() {
}

private boolean killSession() {
LOGGER.info("Try to kill session with client identifier: '" + clientIdentifier + "'.");
LOGGER.warn("Try to kill session with client identifier: '" + clientIdentifier + "'.");

EntityManager entityManager = null;
Connection connection = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.util.EntityUtils;
import org.jowidgets.classloading.tools.SharedClassLoadingObjectInputStream;
import org.jowidgets.logging.api.ILogger;
import org.jowidgets.logging.api.LoggerProvider;
import org.jowidgets.message.api.IExceptionCallback;
import org.jowidgets.message.api.IMessageChannel;
import org.jowidgets.message.api.IMessageReceiver;
Expand All @@ -60,6 +62,8 @@

final class MessageBroker implements IMessageBroker, IMessageChannel {

private static final ILogger LOGGER = LoggerProvider.get(MessageBroker.class);

private final Object brokerId;
private final String url;
private final IHttpRequestInitializer httpRequestInitializer;
Expand Down Expand Up @@ -104,13 +108,13 @@ final class MessageBroker implements IMessageBroker, IMessageChannel {
}

private Thread createSenderThread() {
final Thread result = new Thread(new MessageSenderLoop(), MessageBroker.class.getName() + "-messageSender");
final Thread result = new Thread(new MessageSenderLoop(), MessageBroker.class.getName() + ".MessageSender");
result.setDaemon(true);
return result;
}

private Thread createReceiverThread() {
final Thread result = new Thread(new MessageReceiverLoop(), MessageBroker.class.getName() + "-messageReceiver");
final Thread result = new Thread(new MessageReceiverLoop(), MessageBroker.class.getName() + ".MessageReceiver");
result.setDaemon(true);
return result;
}
Expand Down Expand Up @@ -201,14 +205,21 @@ IExceptionCallback getExceptionCallback() {
return exceptionCallback;
}

@Override
public String toString() {
return "DeferredMessage [message=" + message + "]";
}

}

private class MessageSenderLoop implements Runnable {

@Override
public void run() {
try {
LOGGER.debug("Wait sending messages until session is initialized...");
sessionInitialized.await();
LOGGER.debug("...session is initialized");
while (!Thread.interrupted()) {
trySendMessage();
}
Expand Down Expand Up @@ -237,7 +248,9 @@ private void sendMessage(final DeferredMessage message) throws IOException, Inte
HttpResponse response = null;
try {
request = createHttpRequest(message);
LOGGER.debug("Before send message to server: " + message);
response = httpClient.execute(request);
LOGGER.debug("After send message to server: " + message);
checkStatusLine(response);
}
catch (final ConnectException e) {
Expand Down Expand Up @@ -361,6 +374,7 @@ private void executeMessagesFromStream(final InputStream inputStream) throws IOE
final ObjectInputStream objectInputStream = new SharedClassLoadingObjectInputStream(inputStream);
try {
final int objectCount = objectInputStream.readInt();
LOGGER.debug("Received " + objectCount + " messages from server");
for (int i = 0; i < objectCount; i++) {
try {
executeMessage(objectInputStream.readObject());
Expand All @@ -376,12 +390,15 @@ private void executeMessagesFromStream(final InputStream inputStream) throws IOE
}

private void executeMessage(final Object message) {
LOGGER.debug("Message received from server: " + message);
incommingMessageExecutor.execute(new Runnable() {
@Override
public void run() {
final IMessageReceiver currentReceiver = receiver;
if (currentReceiver != null) {
LOGGER.debug("Before handle incoming message: " + message);
currentReceiver.onMessage(message, MessageBroker.this);
LOGGER.debug("After handle incoming message: " + message);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ private ExecutorService getOrCreateExecutorService() {
}
else {
return Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2,
new DaemonThreadFactory(MessageBroker.class.getName() + "-incommingMessageExecutor-"));
8,
DaemonThreadFactory.multi(MessageBroker.class.getName() + ".IncommingMessageExecutor"));
}
}

Expand Down
Loading

0 comments on commit 8e0cd80

Please sign in to comment.