Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error handlers #3

Merged
merged 7 commits into from
Dec 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
root = true

[*]
indent_style = space
indent_size = 4
continuation_indent_size = 8
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[*.bat]
end_of_line = crlf

[build.gradle]
continuation_indent_size = 4

[.idea/codeStyles/*.xml]
indent_size = 2
insert_final_newline = false
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>network.bisq.btcd-cli4j</groupId>
<artifactId>btcd-cli4j-parent</artifactId>
<version>0.5.8.3</version>
<version>0.5.8.4</version>
</parent>
<artifactId>btcd-cli4j-core</artifactId>
<packaging>jar</packaging>
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/com/neemre/btcdcli4j/core/BtcdCli4jVersion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package com.neemre.btcdcli4j.core;

public class BtcdCli4jVersion {
public static final String VERSION = "0.5.8.4";
}
12 changes: 11 additions & 1 deletion daemon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>network.bisq.btcd-cli4j</groupId>
<artifactId>btcd-cli4j-parent</artifactId>
<version>0.5.8.3</version>
<version>0.5.8.4</version>
</parent>
<artifactId>btcd-cli4j-daemon</artifactId>
<packaging>jar</packaging>
Expand All @@ -25,5 +25,15 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>20.0</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.neemre.btcdcli4j.daemon;

import com.google.common.util.concurrent.*;
import com.neemre.btcdcli4j.core.BitcoindException;
import com.neemre.btcdcli4j.core.CommunicationException;
import com.neemre.btcdcli4j.core.NodeProperties;
Expand All @@ -11,12 +12,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public class BtcdDaemonImpl implements BtcdDaemon {

Expand All @@ -25,16 +27,25 @@ public class BtcdDaemonImpl implements BtcdDaemon {
private DaemonConfigurator configurator;
private Map<Notifications, NotificationMonitor> monitors;
private Map<Notifications, Future<Void>> futures;
private ExecutorService monitorPool;
private ListeningExecutorService monitorPool;

private BtcdClient client;

@Nullable
private Consumer<Throwable> errorHandler;


public BtcdDaemonImpl() {
this(new Properties());
}

public BtcdDaemonImpl(BtcdClient btcdProvider) throws BitcoindException, CommunicationException {
this(btcdProvider, null);
}

public BtcdDaemonImpl(BtcdClient btcdProvider, @Nullable Consumer<Throwable> errorHandler)
throws BitcoindException, CommunicationException {
this.errorHandler = errorHandler;
initialize();
this.client = configurator.checkBtcdProvider(btcdProvider);
buildMonitors(configurator.checkNodeConfig(client.getNodeConfig()));
Expand Down Expand Up @@ -166,24 +177,48 @@ private void initialize() {

private void buildMonitors(Properties nodeConfig) {
int alertPort = Integer.parseInt(nodeConfig.getProperty(NodeProperties.ALERT_PORT.getKey()));
NotificationMonitor alertNotificationMonitor = new NotificationMonitor(Notifications.ALERT, alertPort, null);
NotificationMonitor alertNotificationMonitor = new NotificationMonitor(Notifications.ALERT, alertPort, null,
throwable -> {
if (errorHandler != null)
errorHandler.accept(throwable);
});
monitors.put(Notifications.ALERT, alertNotificationMonitor);

int blockPort = Integer.parseInt(nodeConfig.getProperty(NodeProperties.BLOCK_PORT.getKey()));
NotificationMonitor blockNotificationMonitor = new NotificationMonitor(Notifications.BLOCK, blockPort, client);
NotificationMonitor blockNotificationMonitor = new NotificationMonitor(Notifications.BLOCK, blockPort, client,
throwable -> {
if (errorHandler != null)
errorHandler.accept(throwable);
});
monitors.put(Notifications.BLOCK, blockNotificationMonitor);

int walletPort = Integer.parseInt(nodeConfig.getProperty(NodeProperties.WALLET_PORT.getKey()));
NotificationMonitor walletNotificationMonitor = new NotificationMonitor(Notifications.WALLET, walletPort, client);
NotificationMonitor walletNotificationMonitor = new NotificationMonitor(Notifications.WALLET, walletPort, client,
throwable -> {
if (errorHandler != null)
errorHandler.accept(throwable);
});
monitors.put(Notifications.WALLET, walletNotificationMonitor);
monitorPool = Executors.newFixedThreadPool(monitors.size());

monitorPool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(monitors.size()));
}

private void startMonitors() {
for (Notifications notificationType : monitors.keySet()) {
NotificationMonitor monitor = monitors.get(notificationType);
futures.put(notificationType, monitorPool.submit(monitor, (Void) null));

ListenableFuture<Void> future = monitorPool.submit(monitor);
futures.put(notificationType, future);

Futures.addCallback(future, new FutureCallback<Void>() {
public void onSuccess(Void ignore) {
}

public void onFailure(Throwable throwable) {
if (errorHandler != null)
errorHandler.accept(throwable);
}
});
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = false)
public class NotificationHandlerException extends RuntimeException {
public class NotificationHandlerException extends Exception {

private static final long serialVersionUID = 1L;

private final Errors error;
private int code;


public NotificationHandlerException(Errors error) {
this(error, Constants.STRING_EMPTY);
}

public NotificationHandlerException(Errors error, String additionalMsg) {
super(error.getDescription() + additionalMsg);
this.error = error;
code = error.getCode();
}

public NotificationHandlerException(Errors error, Exception cause) {
public NotificationHandlerException(Errors error, Throwable cause) {
super(error.getDescription(), cause);
this.error = error;
code = error.getCode();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.neemre.btcdcli4j.daemon.notification;

import com.google.common.util.concurrent.*;
import com.neemre.btcdcli4j.core.client.BtcdClient;
import com.neemre.btcdcli4j.core.common.Constants;
import com.neemre.btcdcli4j.core.common.Errors;
Expand All @@ -11,17 +12,20 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class NotificationMonitor extends Observable implements Observer, Runnable {
public class NotificationMonitor extends Observable implements Observer, Callable<Void> {

private static final Logger LOG = LoggerFactory.getLogger(NotificationMonitor.class);
private static final int WORKER_MIN_COUNT = 1;
Expand All @@ -34,45 +38,73 @@ public class NotificationMonitor extends Observable implements Observer, Runnabl
private int serverPort;
private ServerSocket serverSocket;
private volatile boolean isActive;
private ThreadPoolExecutor workerPool;

@Nullable
private BtcdClient client;
@Nullable
private Consumer<Throwable> errorHandler;

private ThreadPoolExecutor executor;
private ListeningExecutorService workerPool;

public NotificationMonitor(Notifications type, int serverPort, BtcdClient client) {
public NotificationMonitor(Notifications type, int serverPort, @Nullable BtcdClient client) {
this(type, serverPort, client, null);
}

public NotificationMonitor(Notifications type, int serverPort, @Nullable BtcdClient client,
@Nullable Consumer<Throwable> errorHandler) {
LOG.info("** NotificationMonitor(): launching new '{}' notification monitor (port: '{}', "
+ "RPC-capable: '{}')", type.name(), serverPort, ((client == null) ? "no" : "yes"));
this.errorHandler = errorHandler;
this.type = type;
this.serverPort = serverPort;
this.client = client;
}

@Override
public void run() {
public Void call() throws NotificationHandlerException {
activate();

LOG.info("-- run(..): started listening for '{}' notifications on port '{}'", type.name(),
serverSocket.getLocalPort());
while (isActive) {
try {
Socket socket = serverSocket.accept();
NotificationWorker worker = NotificationWorkerFactory.createWorker(type, socket, client);
worker.addObserver(this);
workerPool.submit(worker);

ListenableFuture<Void> future = workerPool.submit(worker);

Futures.addCallback(future, new FutureCallback<Void>() {
public void onSuccess(Void ignore) {
}

public void onFailure(Throwable throwable) {
if (errorHandler != null)
errorHandler.accept(throwable);
}
});

LOG.trace("-- run(..): total no. of '{}' notifications received: '{}', task queue "
+ "occupancy: '{}/{}'", type.name(), workerPool.getTaskCount(),
workerPool.getQueue().size(), TASK_QUEUE_LENGTH);
+ "occupancy: '{}/{}'", type.name(), executor.getTaskCount(),
executor.getQueue().size(), TASK_QUEUE_LENGTH);

} catch (SocketTimeoutException e) {
LOG.trace("-- run(..): polling '{}' notification monitor for interrupts (socket idle "
+ "for {}ms)", type.name(), IDLE_SOCKET_TIMEOUT);
} catch (IOException e) {
Thread.currentThread().interrupt();
throw new NotificationHandlerException(Errors.IO_SOCKET_UNINITIALIZED, e);
} catch (Throwable e) {
Thread.currentThread().interrupt();
throw new NotificationHandlerException(Errors.IO_SOCKET_UNINITIALIZED, e);
} finally {
if (Thread.interrupted()) {
deactivate();
}
}
}
return null;
}

@Override
Expand All @@ -88,7 +120,7 @@ public boolean isActive() {
return isActive;
}

private void activate() {
private void activate() throws NotificationHandlerException {
Thread.currentThread().setName(getUniqueName());
isActive = true;
try {
Expand All @@ -105,8 +137,12 @@ private void activate() {
throw new NotificationHandlerException(Errors.IO_SERVERSOCKET_UNINITIALIZED, e);
}
}
workerPool = new ThreadPoolExecutor(WORKER_MIN_COUNT, WORKER_MAX_COUNT, IDLE_WORKER_TIMEOUT,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(TASK_QUEUE_LENGTH));

executor = new ThreadPoolExecutor(WORKER_MIN_COUNT, WORKER_MAX_COUNT, IDLE_WORKER_TIMEOUT,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(TASK_QUEUE_LENGTH));
executor.allowCoreThreadTimeOut(true);
executor.setRejectedExecutionHandler((r, e) -> LOG.error("RejectedExecutionHandler called"));
workerPool = MoreExecutors.listeningDecorator(executor);
}

private void deactivate() {
Expand All @@ -127,4 +163,4 @@ private String getUniqueName() {
return "NotificationMonitor[" + StringUtils.capitalize(type.name().toLowerCase()) + "]-"
+ StringUtils.random(4, Constants.DECIMAL_DIGITS);
}
}
}
Loading