Skip to content

Commit

Permalink
feat: add device discover response
Browse files Browse the repository at this point in the history
Signed-off-by: wei <[email protected]>
  • Loading branch information
instpe committed Dec 11, 2024
1 parent 873945b commit 2f4a6b7
Show file tree
Hide file tree
Showing 26 changed files with 400 additions and 126 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.github.protocol.mdtp.client;

import io.github.protocol.mdtp.common.codec.MdtpDecoder;
import io.github.protocol.mdtp.common.model.CDATHeader;
import io.github.protocol.mdtp.common.model.CDATHeaderFactory;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryRequest;
import io.github.protocol.mdtp.common.model.MdtpPacket;
import io.github.protocol.mdtp.common.model.MessageBodyHeader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
Expand Down Expand Up @@ -42,6 +44,8 @@ public void start() throws Exception {
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MdtpDecoder());
}
});
this.channelFuture = bootstrap.connect().sync();
Expand All @@ -66,8 +70,7 @@ public void close() throws IOException {
public void sendDeviceDiscoveryRequest(int[] deviceTypes) {
log.info("start to send device discovery request.");
DeviceDiscoveryRequest request = new DeviceDiscoveryRequest();
request.setMessageBodyHeader(MessageBodyHeader.DEVICE_DISCOVERY_REQUEST);
request.setRequestId(request.generateRequestId());
request.setRequestId(request.generateId());

if (deviceTypes == null) {
request.setDeviceTypeCount((byte) 0);
Expand All @@ -79,14 +82,7 @@ public void sendDeviceDiscoveryRequest(int[] deviceTypes) {
request.setDeviceTypes(deviceTypes);
}

CDATHeader cdatHeader = new CDATHeader();
cdatHeader.setFormatType((byte) 0x02);
cdatHeader.setProtocolVersion((byte) 1);
cdatHeader.setMessageLength((short) 0);
cdatHeader.setTimestamp(System.currentTimeMillis());
cdatHeader.setFlags((byte) 0b01100000);
cdatHeader.setSequenceNumber(0);
cdatHeader.setLogicalChannelId(0);
CDATHeader cdatHeader = CDATHeaderFactory.createDeviceDiscoveryCDATHeader();

MdtpPacket packet = new MdtpPacket();
packet.setHeader(cdatHeader);
Expand All @@ -95,6 +91,6 @@ public void sendDeviceDiscoveryRequest(int[] deviceTypes) {
packet.setSignature(null);

this.channelFuture.channel().writeAndFlush(packet.toByteBuf());
log.info("send device discovery request success: " + packet);
log.info("send device discovery request success.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.github.protocol.mdtp.common.codec;

Check warning on line 1 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/codec/DeviceDiscoveryReponseDecoder.java

View workflow job for this annotation

GitHub Actions / typo check

"Reponse" should be "Response".

import io.github.protocol.mdtp.common.model.AbstractMessageBody;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryResponse;
import io.netty.buffer.ByteBuf;

public class DeviceDiscoveryReponseDecoder implements MessageBodyDecoder {

Check warning on line 7 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/codec/DeviceDiscoveryReponseDecoder.java

View workflow job for this annotation

GitHub Actions / typo check

"Reponse" should be "Response".
@Override
public AbstractMessageBody handle(ByteBuf in) {
return DeviceDiscoveryResponse.readFromBuffer(in);

Check warning on line 10 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/codec/DeviceDiscoveryReponseDecoder.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/codec/DeviceDiscoveryReponseDecoder.java#L10

Added line #L10 was not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
MessageBodyHeader messageBodyHeader = MessageBodyHeader.readByteBuf(in);
MessageBodyDecoder messageDecode = MessageDecoderFactory.getDecoder(messageBodyHeader);
AbstractMessageBody messageBody = messageDecode.handle(in);
messageBody.setMessageBodyHeader(messageBodyHeader);

mdtpPacket.setHeader(header);
mdtpPacket.setBody(messageBody);

out.add(mdtpPacket);
log.info("decode packet success: {}", mdtpPacket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class MessageDecoderFactory {

static {
decoders.put(MessageBodyHeader.DEVICE_DISCOVERY_REQUEST.toShort(), new DeviceDiscoveryRequestDecoder());
decoders.put(MessageBodyHeader.DEVICE_DISCOVERY_RESPONSE.toShort(), new DeviceDiscoveryReponseDecoder());

Check warning on line 14 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/codec/MessageDecoderFactory.java

View workflow job for this annotation

GitHub Actions / typo check

"Reponse" should be "Response".
}

public static MessageBodyDecoder getDecoder(MessageBodyHeader header) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.github.protocol.mdtp.common.handler;

import io.github.protocol.mdtp.common.model.Attributes;
import io.github.protocol.mdtp.common.model.CDATHeader;
import io.github.protocol.mdtp.common.model.CDATHeaderFactory;
import io.github.protocol.mdtp.common.model.Device;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryRequest;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryResponse;
import io.github.protocol.mdtp.common.model.MdtpPacket;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DeviceDiscoveryRequestHandler implements MessageBodyHandler {

@Override
public void handle (ChannelHandlerContext ctx, MdtpPacket requestPacket) {
log.info("start to send device discovery response.");
DeviceDiscoveryRequest deviceDiscoveryRequest = (DeviceDiscoveryRequest) requestPacket.getBody();
DeviceDiscoveryResponse deviceDiscoveryResponse = new DeviceDiscoveryResponse();
deviceDiscoveryResponse.setRequestId(deviceDiscoveryRequest.getRequestId());
deviceDiscoveryResponse.setResponseId(deviceDiscoveryResponse.generateId());
Device device = ctx.channel().attr(Attributes.DEVICE_KEY).get();
deviceDiscoveryResponse.setDevice(device);

CDATHeader cdatHeader = CDATHeaderFactory.createDeviceDiscoveryCDATHeader();

MdtpPacket packet = new MdtpPacket();
packet.setHeader(cdatHeader);
packet.setSecurityHeader(null);
packet.setBody(deviceDiscoveryResponse);
packet.setSignature(null);
ctx.channel().writeAndFlush(packet.toByteBuf());
log.info("send device discovery response success.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.github.protocol.mdtp.common.handler;

import io.github.protocol.mdtp.common.model.MdtpPacket;
import io.netty.channel.ChannelHandlerContext;

public interface MessageBodyHandler {

void handle (ChannelHandlerContext ctx, MdtpPacket mdtpPacket);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.protocol.mdtp.common.handler;


import io.github.protocol.mdtp.common.model.MessageBodyHeader;

import java.util.HashMap;
import java.util.Map;

public class MessageHandlerFactory {

Check warning on line 9 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/handler/MessageHandlerFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/handler/MessageHandlerFactory.java#L9

Added line #L9 was not covered by tests
private static final Map<Short, MessageBodyHandler> handlers = new HashMap<>();

static {
handlers.put(MessageBodyHeader.DEVICE_DISCOVERY_REQUEST.toShort(), new DeviceDiscoveryRequestHandler());
}

public static MessageBodyHandler getHandler(MessageBodyHeader header) {
return handlers.get(header.toShort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
public abstract class AbstractMessageBody {
private MessageBodyHeader messageBodyHeader;

public short generateRequestId() {
protected AbstractMessageBody () {
}

Check warning on line 13 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/AbstractMessageBody.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/AbstractMessageBody.java#L12-L13

Added lines #L12 - L13 were not covered by tests

protected AbstractMessageBody (MessageBodyHeader messageBodyHeader) {
this.messageBodyHeader = messageBodyHeader;
}

public short generateId() {
UUID uuid = UUID.randomUUID();
return (short) (uuid.getLeastSignificantBits() & 0xFFFF);
return (short) (uuid.getLeastSignificantBits() & 0x7FFF);
}

public void writeByteBuf(ByteBuf buffer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.github.protocol.mdtp.common.model;

import io.netty.buffer.ByteBuf;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class Address {
public static final byte IPV4_TYPE = 4;
public static final byte IPV6_TYPE = 6;

private final byte type;
private final byte[] value;

public Address(byte type, byte[] value) {
this.type = type;
this.value = value;
}

public String getIpString() throws UnknownHostException {
return InetAddress.getByAddress(value).getHostAddress();
}

public void writeByteBuf(ByteBuf buffer) {
buffer.writeByte(type);
buffer.writeBytes(value);
}

public static Address readByteBuf(ByteBuf buffer) {
byte type = buffer.readByte();
int length = (type == IPV4_TYPE) ? 4 : 16;
byte[] value = new byte[length];
buffer.readBytes(value);
return new Address(type, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.github.protocol.mdtp.common.model;

import io.netty.util.AttributeKey;

public class Attributes {

Check warning on line 5 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Attributes.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Attributes.java#L5

Added line #L5 was not covered by tests
public static final AttributeKey<Device> DEVICE_KEY = AttributeKey.valueOf("device");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public void writeByteBuf(ByteBuf buffer) {
buffer.writeShort(messageLength);
buffer.writeLong(timestamp);
buffer.writeByte(flags);
buffer.writeInt(sequenceNumber);
buffer.writeInt(logicalChannelId);
if (sequenceNumber != null) {
buffer.writeInt(sequenceNumber);
}
if (logicalChannelId != null) {
buffer.writeInt(logicalChannelId);
}
}

public static CDATHeader readByteBuf(ByteBuf buffer) {
Expand All @@ -43,8 +47,15 @@ public static CDATHeader readByteBuf(ByteBuf buffer) {
header.setMessageLength(buffer.readShort());
header.setTimestamp(buffer.readLong());
header.setFlags(buffer.readByte());
header.setSequenceNumber(buffer.readInt());
header.setLogicalChannelId(buffer.readInt());
byte formatType = header.getFormatType();

if (formatType == 0x00) {
header.setSequenceNumber(buffer.readInt());
header.setLogicalChannelId(buffer.readInt());
} else if (formatType == 0x02) {
header.setSequenceNumber(null);
header.setLogicalChannelId(null);
}

return header;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.github.protocol.mdtp.common.model;

public class CDATHeaderFactory {

Check warning on line 3 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java#L3

Added line #L3 was not covered by tests
public static CDATHeader createMessageTransferCDATHeader() {
return initializeDefault(new CDATHeader(), (byte) 0x00);

Check warning on line 5 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java#L5

Added line #L5 was not covered by tests
}

public static CDATHeader createDeviceDiscoveryCDATHeader() {
return initializeDefault(new CDATHeader(), (byte) 0x02);
}

private static CDATHeader initializeDefault(CDATHeader header, byte formatType) {
header.setFormatType(formatType);
header.setProtocolVersion((byte) 1);
header.setMessageLength((short) 0);
header.setTimestamp(System.currentTimeMillis());
header.setFlags((byte) 0b01100000);

if (formatType == 0x00) {
header.setSequenceNumber(0);
header.setLogicalChannelId(0);

Check warning on line 21 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java#L20-L21

Added lines #L20 - L21 were not covered by tests
} else {
header.setSequenceNumber(null);
header.setLogicalChannelId(null);
}

return header;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.github.protocol.mdtp.common.model;

import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Device {
private byte mask;

private byte deviceStatus;

private byte addressCount;

private List<Address> addresses;

private short port;

private int deviceType;

private byte[] uniqueId;

private String deviceName;

public void writeByteBuf(ByteBuf buffer) {
buffer.writeByte(mask);
buffer.writeByte(deviceStatus);
buffer.writeByte(addressCount);

for (Address address : addresses) {
address.writeByteBuf(buffer);
}

buffer.writeShort(port);
buffer.writeInt(deviceType);

if (uniqueId != null) {
buffer.writeShort(uniqueId.length);
buffer.writeBytes(uniqueId);
} else {
buffer.writeShort(0);
}

if (deviceName != null) {
byte[] nameBytes = deviceName.getBytes(StandardCharsets.UTF_8);
buffer.writeShort(nameBytes.length);
buffer.writeBytes(nameBytes);
} else {
buffer.writeShort(0);
}
}

public static Device readByteBuf(ByteBuf buffer) {
Device device = new Device();

device.mask = buffer.readByte();
device.deviceStatus = buffer.readByte();
device.addressCount = buffer.readByte();

device.addresses = new ArrayList<>();
for (int i = 0; i < device.addressCount; i++) {
device.addresses.add(Address.readByteBuf(buffer));
}

device.port = buffer.readShort();
device.deviceType = buffer.readInt();

int uniqueIdLength = buffer.readShort();
if (uniqueIdLength > 0) {
device.uniqueId = new byte[uniqueIdLength];
buffer.readBytes(device.uniqueId);
} else {
device.uniqueId = null;

Check warning on line 82 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java#L82

Added line #L82 was not covered by tests
}

int deviceNameLength = buffer.readShort();
if (deviceNameLength > 0) {
byte[] nameBytes = new byte[deviceNameLength];
buffer.readBytes(nameBytes);
device.deviceName = new String(nameBytes, StandardCharsets.UTF_8);
} else {
device.deviceName = null;

Check warning on line 91 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java#L91

Added line #L91 was not covered by tests
}

return device;
}
}
Loading

0 comments on commit 2f4a6b7

Please sign in to comment.