diff --git a/bundles/org.openhab.binding.velux/README.md b/bundles/org.openhab.binding.velux/README.md index ec0ced2768172..f1bd775a49766 100644 --- a/bundles/org.openhab.binding.velux/README.md +++ b/bundles/org.openhab.binding.velux/README.md @@ -52,7 +52,7 @@ In addition there are some optional Configuration Parameters. |-------------------------|------------------|:--------:|--------------------------------------------------------------| | ipAddress | | Yes | Hostname or address for accessing the Velux Bridge. | | password | velux123 | Yes | Password for authentication against the Velux Bridge.(\*\*) | -| timeoutMsecs | 2000 | No | Communication timeout in milliseconds. | +| timeoutMsecs | 3000 | No | Communication timeout in milliseconds. | | protocol | slip | No | Underlying communication protocol (http/https/slip). | | tcpPort | 51200 | No | TCP port (80 or 51200) for accessing the Velux Bridge. | | retries | 5 | No | Number of retries during I/O. | diff --git a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java index cbb8c3cbc980e..230c917062938 100644 --- a/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java +++ b/bundles/org.openhab.binding.velux/src/main/java/org/openhab/binding/velux/internal/bridge/slip/io/DataInputStreamWithTimeout.java @@ -21,8 +21,10 @@ import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -49,6 +51,7 @@ class DataInputStreamWithTimeout implements Closeable { // special character that marks the first and last byte of a slip message private static final byte SLIP_MARK = (byte) 0xc0; + private static final byte SLIP_PROT = 0; private final Logger logger = LoggerFactory.getLogger(DataInputStreamWithTimeout.class); @@ -63,9 +66,19 @@ class DataInputStreamWithTimeout implements Closeable { private class Poller implements Callable { private boolean interrupted = false; + private Future pollerFinished; + + public Poller(ExecutorService executor) { + logger.trace("Poller: created"); + pollerFinished = executor.submit(this); + } public void interrupt() { interrupted = true; + try { + pollerFinished.get(); + } catch (InterruptedException | ExecutionException e) { + } } /** @@ -75,34 +88,47 @@ public void interrupt() { */ @Override public Boolean call() throws Exception { + logger.trace("Poller.call(): started"); byte[] buf = new byte[BUFFER_SIZE]; - byte byt; + int byt; int i = 0; // clean start, no exception, empty queue pollException = null; slipMessageQueue.clear(); - // loop forever or until internally or externally interrupted - while ((!interrupted) && (!Thread.interrupted())) { + // loop forever or until externally interrupted + while (!Thread.interrupted()) { try { - buf[i] = byt = (byte) inputStream.read(); - if (byt == SLIP_MARK) { - if (i > 0) { - // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM] - if ((i > 5) && (buf[0] == SLIP_MARK)) { - slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1)); - if (slipMessageQueue.size() > QUEUE_SIZE) { - logger.warn("pollRunner() => slip message queue overflow => PLEASE REPORT !!"); - slipMessageQueue.poll(); - } + if (interrupted) { + // fully flush the input buffer + inputStream.readAllBytes(); + break; + } + byt = inputStream.read(); + if (byt < 0) { + // end of stream is OK => keep on polling + continue; + } + buf[i] = (byte) byt; + if ((i > 0) && (buf[i] == SLIP_MARK)) { + // the minimal slip message is 7 bytes [MM PP LL CC CC KK MM] + if ((i > 5) && (buf[0] == SLIP_MARK) && (buf[1] == SLIP_PROT)) { + slipMessageQueue.offer(Arrays.copyOfRange(buf, 0, i + 1)); + if (slipMessageQueue.size() > QUEUE_SIZE) { + logger.warn("Poller.call(): slip message queue overflow => PLEASE REPORT !!"); + slipMessageQueue.poll(); } i = 0; + } else { + logger.warn("Poller.call(): non slip messsage discarded => PLEASE REPORT !!"); buf[0] = SLIP_MARK; - continue; + i = 1; } + continue; } if (++i >= BUFFER_SIZE) { + logger.warn("Poller.call(): input buffer overrun => PLEASE REPORT !!"); i = 0; } } catch (SocketTimeoutException e) { @@ -112,11 +138,12 @@ public Boolean call() throws Exception { // any other exception => stop polling String msg = e.getMessage(); pollException = msg != null ? msg : "Generic IOException"; - logger.debug("pollRunner() stopping '{}'", pollException); + logger.debug("Poller.call(): stopping '{}'", pollException); break; } } + logger.trace("Poller.call(): ended"); // we only get here if shutdown or an error occurs so free ourself so we can be recreated again pollRunner = null; return true; @@ -210,11 +237,9 @@ public void flush() { * Start the polling task */ private void startPolling() { - Poller pollRunner = this.pollRunner; if (pollRunner == null) { logger.trace("startPolling()"); - pollRunner = this.pollRunner = new Poller(); - executor.submit(pollRunner); + pollRunner = new Poller(executor); } } @@ -226,7 +251,6 @@ private void stopPolling() { if (pollRunner != null) { logger.trace("stopPolling()"); pollRunner.interrupt(); - this.pollRunner = null; } executor.shutdown(); } diff --git a/bundles/org.openhab.binding.velux/src/main/resources/OH-INF/config/config.xml b/bundles/org.openhab.binding.velux/src/main/resources/OH-INF/config/config.xml index a9c2684bd327a..0f8567a9869ac 100644 --- a/bundles/org.openhab.binding.velux/src/main/resources/OH-INF/config/config.xml +++ b/bundles/org.openhab.binding.velux/src/main/resources/OH-INF/config/config.xml @@ -39,10 +39,11 @@ velux123 - + @text/config.velux.bridge.timeoutMsecs.description - 2000 + false + 3000 true