diff --git a/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialIoThread.java b/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialIoThread.java index 3244ef66196b3..813f95aa5fbd4 100644 --- a/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialIoThread.java +++ b/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialIoThread.java @@ -32,7 +32,6 @@ import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus; -import org.openhab.binding.upb.internal.message.MessageBuilder; import org.openhab.binding.upb.internal.message.MessageParseException; import org.openhab.binding.upb.internal.message.UPBMessage; import org.openhab.core.common.NamedThreadFactory; @@ -177,9 +176,13 @@ private void handleMessage(final UPBMessage msg) { listener.incomingMessage(msg); } - public CompletionStage enqueue(final MessageBuilder msg) { + public CompletionStage enqueue(final String msg) { + return enqueue(msg, 1); + } + + private CompletionStage enqueue(final String msg, int numAttempts) { final CompletableFuture completion = new CompletableFuture<>(); - final Runnable task = new WriteRunnable(msg.build(), completion); + final Runnable task = new WriteRunnable(msg, completion, numAttempts); try { writeExecutor.execute(task); } catch (final RejectedExecutionException e) { @@ -232,23 +235,18 @@ private class WriteRunnable implements Runnable { private final String msg; private final CompletableFuture completion; private final CountDownLatch ackLatch = new CountDownLatch(1); + private final int numAttempts; private @Nullable Boolean ack; - public WriteRunnable(final String msg, final CompletableFuture completion) { + public WriteRunnable(final String msg, final CompletableFuture completion, int numAttempts) { this.msg = msg; this.completion = completion; + this.numAttempts = numAttempts; } // called by reader thread on ACK or NAK public void ackReceived(final boolean ack) { - if (logger.isDebugEnabled()) { - if (ack) { - logger.debug("ACK received"); - } else { - logger.debug("NAK received"); - } - } this.ack = ack; ackLatch.countDown(); } @@ -262,25 +260,32 @@ public void run() { if (out == null) { throw new IOException("serial port is not writable"); } - for (int tries = 0; tries < MAX_RETRIES && ack == null; tries++) { - out.write(0x14); - out.write(msg.getBytes(US_ASCII)); - out.write(0x0d); - out.flush(); - final boolean acked = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS); - if (acked) { - break; + final CmdStatus res; + out.write(0x14); + out.write(msg.getBytes(US_ASCII)); + out.write(0x0d); + out.flush(); + final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS); + if (latched) { + final Boolean ack = this.ack; + if (ack == null) { + logger.debug("write not acked, attempt {}", numAttempts); + res = CmdStatus.WRITE_FAILED; + } else if (ack) { + completion.complete(CmdStatus.ACK); + return; + } else { + logger.debug("NAK received, attempt {}", numAttempts); + res = CmdStatus.NAK; } - logger.debug("ack timed out, retrying ({} of {})", tries + 1, MAX_RETRIES); + } else { + logger.debug("ack timed out, attempt {}", numAttempts); + res = CmdStatus.WRITE_FAILED; } - final Boolean ack = this.ack; - if (ack == null) { - logger.debug("write not acked"); - completion.complete(CmdStatus.WRITE_FAILED); - } else if (ack) { - completion.complete(CmdStatus.ACK); + if (numAttempts < MAX_RETRIES) { + enqueue(msg, numAttempts + 1).thenAccept(completion::complete); } else { - completion.complete(CmdStatus.NAK); + completion.complete(res); } } catch (final IOException | InterruptedException e) { logger.warn("error writing message", e); diff --git a/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialPIMHandler.java b/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialPIMHandler.java index 0117999b7da89..a95abfb6ecdce 100644 --- a/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialPIMHandler.java +++ b/bundles/org.openhab.binding.upb/src/main/java/org/openhab/binding/upb/internal/handler/SerialPIMHandler.java @@ -155,7 +155,7 @@ private void openSerialPort(final String portId) { public CompletionStage sendPacket(final MessageBuilder msg) { final SerialIoThread receiveThread = this.receiveThread; if (receiveThread != null) { - return receiveThread.enqueue(msg); + return receiveThread.enqueue(msg.build()); } else { return exceptionallyCompletedFuture(new IllegalStateException("I/O thread not active")); } diff --git a/bundles/org.openhab.binding.upb/src/test/java/org/openhab/binding/upb/internal/SerialIoThreadTest.java b/bundles/org.openhab.binding.upb/src/test/java/org/openhab/binding/upb/internal/SerialIoThreadTest.java new file mode 100644 index 0000000000000..ec09fe8afdf00 --- /dev/null +++ b/bundles/org.openhab.binding.upb/src/test/java/org/openhab/binding/upb/internal/SerialIoThreadTest.java @@ -0,0 +1,192 @@ +/** + * Copyright (c) 2010-2021 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.binding.upb.internal; + +import static java.nio.charset.StandardCharsets.US_ASCII; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.LinkedBlockingQueue; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.openhab.binding.upb.internal.handler.MessageListener; +import org.openhab.binding.upb.internal.handler.SerialIoThread; +import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus; +import org.openhab.binding.upb.internal.message.Command; +import org.openhab.binding.upb.internal.message.MessageBuilder; +import org.openhab.binding.upb.internal.message.UPBMessage; +import org.openhab.core.io.transport.serial.SerialPort; +import org.openhab.core.thing.ThingUID; + +/** + * @author Marcus Better - Initial contribution + */ +public class SerialIoThreadTest { + + private static final String ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n"; + + private final ThingUID thingUID = new ThingUID("a", "b", "c"); + private final Listener msgListener = new Listener(); + private final PipedOutputStream in = new PipedOutputStream(); + private final OutputStreamWriter inbound = new OutputStreamWriter(in, US_ASCII); + private final PipedOutputStream out = new PipedOutputStream(); + + private @Mock SerialPort serialPort; + private SerialIoThread thread; + private InputStreamReader outbound; + final char[] buf = new char[256]; + + @BeforeEach + public void setup() throws IOException { + serialPort = mock(SerialPort.class); + outbound = new InputStreamReader(new PipedInputStream(out), US_ASCII); + when(serialPort.getInputStream()).thenReturn(new PipedInputStream(in)); + when(serialPort.getOutputStream()).thenReturn(out); + thread = new SerialIoThread(serialPort, msgListener, thingUID); + thread.start(); + } + + @AfterEach + public void cleanup() { + thread.terminate(); + } + + @Test + public void testName() { + assertEquals("OH-binding-a:b:c-serial-reader", thread.getName()); + assertTrue(thread.isDaemon()); + } + + @Test + public void receive() throws Exception { + writeInbound("PU8905FA011220FFFF47\r"); + final UPBMessage msg = msgListener.readInbound(); + assertEquals(Command.ACTIVATE, msg.getCommand()); + assertEquals(1, msg.getDestination()); + writeInbound("PU8905FA011221FFFF48\r"); + final UPBMessage msg2 = msgListener.readInbound(); + assertEquals(Command.DEACTIVATE, msg2.getCommand()); + verifyMessageModeCmd(); + } + + @Test + public void send() throws Exception { + final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2) + .destination((byte) 5).build(); + final CompletionStage fut = thread.enqueue(msg); + verifyMessageModeCmd(); + final int n = outbound.read(buf); + assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n)); + ack(); + final CmdStatus res = fut.toCompletableFuture().join(); + assertEquals(CmdStatus.ACK, res); + } + + @Test + public void resend() throws Exception { + final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2) + .destination((byte) 5).build(); + final CompletableFuture fut = thread.enqueue(msg).toCompletableFuture(); + verifyMessageModeCmd(); + int n = outbound.read(buf); + assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n)); + nak(); + + // should re-send + n = outbound.read(buf); + assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n)); + assertFalse(fut.isDone()); + ack(); + final CmdStatus res = fut.join(); + assertEquals(CmdStatus.ACK, res); + } + + @Test + public void resendMaxAttempts() throws Exception { + final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2) + .destination((byte) 5).build(); + final CompletableFuture fut = thread.enqueue(msg).toCompletableFuture(); + verifyMessageModeCmd(); + int n = outbound.read(buf); + assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n)); + nak(); + + // retry + n = outbound.read(buf); + assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n)); + assertFalse(fut.isDone()); + // no response - wait for ack timeout + + // last retry + n = outbound.read(buf); + assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n)); + assertFalse(fut.isDone()); + nak(); + final CmdStatus res = fut.join(); + assertEquals(CmdStatus.NAK, res); + } + + private void ack() throws IOException { + writeInbound("PK\r"); + } + + private void nak() throws IOException { + writeInbound("PN\r"); + } + + private void writeInbound(String s) throws IOException { + inbound.write(s); + inbound.flush(); + } + + private void verifyMessageModeCmd() throws IOException { + final int n = outbound.read(buf, 0, ENABLE_MESSAGE_MODE_CMD.length()); + assertEquals(ENABLE_MESSAGE_MODE_CMD, new String(buf, 0, n)); + } + + private static class Listener implements MessageListener { + + private final BlockingQueue messages = new LinkedBlockingQueue<>(); + + @Override + public void incomingMessage(final UPBMessage msg) { + messages.offer(msg); + } + + @Override + public void onError(final Throwable t) { + } + + public UPBMessage readInbound() { + try { + return messages.take(); + } catch (InterruptedException e) { + return null; + } + } + } +}