Skip to content

Commit

Permalink
[upb] Fix retry logic (openhab#11342)
Browse files Browse the repository at this point in the history
* [upb] Fix retry logic

The retry logic was broken so it never retried. This fixes it and adds
unit tests for the serial communication and retry behavior.

Signed-off-by: Marcus Better <[email protected]>

* Remove excessive log

Signed-off-by: Marcus Better <[email protected]>
  • Loading branch information
marcusb authored and frederictobiasc committed Oct 26, 2021
1 parent 6b1c705 commit 4ef4edf
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
import org.openhab.binding.upb.internal.message.MessageBuilder;
import org.openhab.binding.upb.internal.message.MessageParseException;
import org.openhab.binding.upb.internal.message.UPBMessage;
import org.openhab.core.common.NamedThreadFactory;
Expand Down Expand Up @@ -177,9 +176,13 @@ private void handleMessage(final UPBMessage msg) {
listener.incomingMessage(msg);
}

public CompletionStage<CmdStatus> enqueue(final MessageBuilder msg) {
public CompletionStage<CmdStatus> enqueue(final String msg) {
return enqueue(msg, 1);
}

private CompletionStage<CmdStatus> enqueue(final String msg, int numAttempts) {
final CompletableFuture<CmdStatus> completion = new CompletableFuture<>();
final Runnable task = new WriteRunnable(msg.build(), completion);
final Runnable task = new WriteRunnable(msg, completion, numAttempts);
try {
writeExecutor.execute(task);
} catch (final RejectedExecutionException e) {
Expand Down Expand Up @@ -232,23 +235,18 @@ private class WriteRunnable implements Runnable {
private final String msg;
private final CompletableFuture<CmdStatus> completion;
private final CountDownLatch ackLatch = new CountDownLatch(1);
private final int numAttempts;

private @Nullable Boolean ack;

public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion) {
public WriteRunnable(final String msg, final CompletableFuture<CmdStatus> completion, int numAttempts) {
this.msg = msg;
this.completion = completion;
this.numAttempts = numAttempts;
}

// called by reader thread on ACK or NAK
public void ackReceived(final boolean ack) {
if (logger.isDebugEnabled()) {
if (ack) {
logger.debug("ACK received");
} else {
logger.debug("NAK received");
}
}
this.ack = ack;
ackLatch.countDown();
}
Expand All @@ -262,25 +260,32 @@ public void run() {
if (out == null) {
throw new IOException("serial port is not writable");
}
for (int tries = 0; tries < MAX_RETRIES && ack == null; tries++) {
out.write(0x14);
out.write(msg.getBytes(US_ASCII));
out.write(0x0d);
out.flush();
final boolean acked = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
if (acked) {
break;
final CmdStatus res;
out.write(0x14);
out.write(msg.getBytes(US_ASCII));
out.write(0x0d);
out.flush();
final boolean latched = ackLatch.await(ACK_TIMEOUT_MS, MILLISECONDS);
if (latched) {
final Boolean ack = this.ack;
if (ack == null) {
logger.debug("write not acked, attempt {}", numAttempts);
res = CmdStatus.WRITE_FAILED;
} else if (ack) {
completion.complete(CmdStatus.ACK);
return;
} else {
logger.debug("NAK received, attempt {}", numAttempts);
res = CmdStatus.NAK;
}
logger.debug("ack timed out, retrying ({} of {})", tries + 1, MAX_RETRIES);
} else {
logger.debug("ack timed out, attempt {}", numAttempts);
res = CmdStatus.WRITE_FAILED;
}
final Boolean ack = this.ack;
if (ack == null) {
logger.debug("write not acked");
completion.complete(CmdStatus.WRITE_FAILED);
} else if (ack) {
completion.complete(CmdStatus.ACK);
if (numAttempts < MAX_RETRIES) {
enqueue(msg, numAttempts + 1).thenAccept(completion::complete);
} else {
completion.complete(CmdStatus.NAK);
completion.complete(res);
}
} catch (final IOException | InterruptedException e) {
logger.warn("error writing message", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void openSerialPort(final String portId) {
public CompletionStage<CmdStatus> sendPacket(final MessageBuilder msg) {
final SerialIoThread receiveThread = this.receiveThread;
if (receiveThread != null) {
return receiveThread.enqueue(msg);
return receiveThread.enqueue(msg.build());
} else {
return exceptionallyCompletedFuture(new IllegalStateException("I/O thread not active"));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/**
* Copyright (c) 2010-2021 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.upb.internal;

import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.openhab.binding.upb.internal.handler.MessageListener;
import org.openhab.binding.upb.internal.handler.SerialIoThread;
import org.openhab.binding.upb.internal.handler.UPBIoHandler.CmdStatus;
import org.openhab.binding.upb.internal.message.Command;
import org.openhab.binding.upb.internal.message.MessageBuilder;
import org.openhab.binding.upb.internal.message.UPBMessage;
import org.openhab.core.io.transport.serial.SerialPort;
import org.openhab.core.thing.ThingUID;

/**
* @author Marcus Better - Initial contribution
*/
public class SerialIoThreadTest {

private static final String ENABLE_MESSAGE_MODE_CMD = "\u001770028E\n";

private final ThingUID thingUID = new ThingUID("a", "b", "c");
private final Listener msgListener = new Listener();
private final PipedOutputStream in = new PipedOutputStream();
private final OutputStreamWriter inbound = new OutputStreamWriter(in, US_ASCII);
private final PipedOutputStream out = new PipedOutputStream();

private @Mock SerialPort serialPort;
private SerialIoThread thread;
private InputStreamReader outbound;
final char[] buf = new char[256];

@BeforeEach
public void setup() throws IOException {
serialPort = mock(SerialPort.class);
outbound = new InputStreamReader(new PipedInputStream(out), US_ASCII);
when(serialPort.getInputStream()).thenReturn(new PipedInputStream(in));
when(serialPort.getOutputStream()).thenReturn(out);
thread = new SerialIoThread(serialPort, msgListener, thingUID);
thread.start();
}

@AfterEach
public void cleanup() {
thread.terminate();
}

@Test
public void testName() {
assertEquals("OH-binding-a:b:c-serial-reader", thread.getName());
assertTrue(thread.isDaemon());
}

@Test
public void receive() throws Exception {
writeInbound("PU8905FA011220FFFF47\r");
final UPBMessage msg = msgListener.readInbound();
assertEquals(Command.ACTIVATE, msg.getCommand());
assertEquals(1, msg.getDestination());
writeInbound("PU8905FA011221FFFF48\r");
final UPBMessage msg2 = msgListener.readInbound();
assertEquals(Command.DEACTIVATE, msg2.getCommand());
verifyMessageModeCmd();
}

@Test
public void send() throws Exception {
final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
.destination((byte) 5).build();
final CompletionStage<CmdStatus> fut = thread.enqueue(msg);
verifyMessageModeCmd();
final int n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
ack();
final CmdStatus res = fut.toCompletableFuture().join();
assertEquals(CmdStatus.ACK, res);
}

@Test
public void resend() throws Exception {
final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
.destination((byte) 5).build();
final CompletableFuture<CmdStatus> fut = thread.enqueue(msg).toCompletableFuture();
verifyMessageModeCmd();
int n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
nak();

// should re-send
n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
assertFalse(fut.isDone());
ack();
final CmdStatus res = fut.join();
assertEquals(CmdStatus.ACK, res);
}

@Test
public void resendMaxAttempts() throws Exception {
final String msg = MessageBuilder.forCommand(Command.GOTO).args((byte) 10).network((byte) 2)
.destination((byte) 5).build();
final CompletableFuture<CmdStatus> fut = thread.enqueue(msg).toCompletableFuture();
verifyMessageModeCmd();
int n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
nak();

// retry
n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
assertFalse(fut.isDone());
// no response - wait for ack timeout

// last retry
n = outbound.read(buf);
assertEquals("\u001408100205FF220AB6\r", new String(buf, 0, n));
assertFalse(fut.isDone());
nak();
final CmdStatus res = fut.join();
assertEquals(CmdStatus.NAK, res);
}

private void ack() throws IOException {
writeInbound("PK\r");
}

private void nak() throws IOException {
writeInbound("PN\r");
}

private void writeInbound(String s) throws IOException {
inbound.write(s);
inbound.flush();
}

private void verifyMessageModeCmd() throws IOException {
final int n = outbound.read(buf, 0, ENABLE_MESSAGE_MODE_CMD.length());
assertEquals(ENABLE_MESSAGE_MODE_CMD, new String(buf, 0, n));
}

private static class Listener implements MessageListener {

private final BlockingQueue<UPBMessage> messages = new LinkedBlockingQueue<>();

@Override
public void incomingMessage(final UPBMessage msg) {
messages.offer(msg);
}

@Override
public void onError(final Throwable t) {
}

public UPBMessage readInbound() {
try {
return messages.take();
} catch (InterruptedException e) {
return null;
}
}
}
}

0 comments on commit 4ef4edf

Please sign in to comment.