diff --git a/pkts-examples/my_tcp_traffic.pcap b/pkts-examples/my_tcp_traffic.pcap new file mode 100644 index 00000000..e83f6f29 Binary files /dev/null and b/pkts-examples/my_tcp_traffic.pcap differ diff --git a/pkts-examples/src/main/java/io/pkts/examples/streams/StreamsExample002.java b/pkts-examples/src/main/java/io/pkts/examples/streams/StreamsExample002.java new file mode 100644 index 00000000..5d3c8559 --- /dev/null +++ b/pkts-examples/src/main/java/io/pkts/examples/streams/StreamsExample002.java @@ -0,0 +1,92 @@ +package io.pkts.examples.streams; + +import io.pkts.Pcap; +import io.pkts.framer.FramingException; +import io.pkts.packet.TCPPacket; +import io.pkts.streams.Stream; +import io.pkts.streams.StreamId; +import io.pkts.streams.StreamListener; +import io.pkts.streams.TcpStream; +import io.pkts.streams.impl.TcpStreamHandler; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; + +/** + * Simple example showing how to use streams. + * + * The core pcap support provided by pkts.io is only focusing on each individual + * packet but quite often your application may be interested in a stream of + * packets. A stream can mean different things for different protocols. E.g. for + * UDP, a stream in this context could be all packets sent and received from the + * same local and remote port-pair (which is how the stream support in pkts.io + * has defined a UDP stream). + * + * For other protocols, there may be other identifiers within the protocol that + * defines what a stream is. As an example, TCP is a connection oriented protocol that + * uses signaling packets (SYN, SYN-ACK, RST, FIN) to establish and end connections. A TCP + * stream is therefore equivalent to the exchange of packets within a TCP connection. + * + * This particular example shows how to setup pkts.io and its stream support to + * consume {@link TcpStream}s. + * + * @author sebastien.amelinckx@gmail.com + */ +public class StreamsExample002 { + + public static void main(final String[] args) throws IOException, FramingException { + + // Step 1 - Open the pcap containing our traffic. + final Pcap pcap = Pcap.openStream("my_tcp_traffic.pcap"); + // Step 2 - Instead of implementing our own PacketHandler we will be + // using a TcpStreamHandler provided for us by the io.pkts.streams + // library. It has a StreamHandler (which obviously + // implements the FrameHandler) that will detect new tcp streams + // and call a StreamListener when appropriate. + final TcpStreamHandler streamHandler = new TcpStreamHandler(); + + // Step 3 - In this simple example we will just supply a very basic + // StreamListener for TCP. All we will do is + // print to std out when a new event occurs for a stream. + streamHandler.addStreamListener(new StreamListener() { + + @Override + public void startStream(final Stream stream, final TCPPacket packet) { + + TcpStream tcpStream = (TcpStream) stream; + + System.out.println("New TCP stream detected. Stream n°" + tcpStream.getUuid() + "\n" + + " Stream id: " + stream.getStreamIdentifier()); + System.out.println("First packet seq num was: " + packet.getSequenceNumber()); + } + + @Override + public void packetReceived(final Stream stream, final TCPPacket packet) { + TcpStream tcpStream = (TcpStream) stream; + System.out.println("Received a new TCP packet for stream: " + tcpStream.getUuid()); + } + + @Override + public void endStream(final Stream stream) { + TcpStream tcpStream = (TcpStream) stream; + System.out.println("The stream ended. Stream n°" + tcpStream.getUuid()); + } + }); + + // Step 4 - Call the loop function as usual but pass in the TcpStreamHandler + // instead of your own "raw" FrameHandler. + pcap.loop(streamHandler); + + // Step 5 - Do whatever with the streams and packets inside + Map allStreams = streamHandler.getStreams(); + + ArrayList streams = new ArrayList(allStreams.values()); + + for(TcpStream stream : streams) { + stream.getPackets().forEach(packet -> { + System.out.println("Packet seq num: " + packet.getSequenceNumber()); + }); + } + } +} diff --git a/pkts-streams/pom.xml b/pkts-streams/pom.xml index 5aa06fd8..63be9069 100644 --- a/pkts-streams/pom.xml +++ b/pkts-streams/pom.xml @@ -8,7 +8,19 @@ Pkts Streams - io.pkts + + + + org.apache.maven.plugins + maven-compiler-plugin + + 16 + 16 + + + + + io.pkts pkts-streams jar @@ -24,7 +36,13 @@ pkts-sdp - + + io.hektor + hektor-fsm + 0.0.5 + + + org.slf4j slf4j-api diff --git a/pkts-streams/src/main/java/io/pkts/streams/TcpStream.java b/pkts-streams/src/main/java/io/pkts/streams/TcpStream.java new file mode 100644 index 00000000..cb0673f3 --- /dev/null +++ b/pkts-streams/src/main/java/io/pkts/streams/TcpStream.java @@ -0,0 +1,33 @@ +package io.pkts.streams; + +import io.pkts.packet.TCPPacket; +import io.pkts.streams.impl.tcpFSM.TcpStreamFSM.TcpState; + +/** + * An {@link TcpStream} represents a stream of {@link TCPPacket}s belonging to the same connection. + * A TCP conversation is identified it's 5-tuple (src-ip, src-port, dest-ip, dest-port, protocol=TCP), and + * should follow a valid state progression for the lifetime of the connection (see RFC 793). This means that for + * the same 5-tuple, if an event occurs that indicates the connection is no longer valid (such as a new SYN exchange + * or data exchange when the connection should be closed), the stream should be ended and a new stream should be + * potentially created for the new connection. + * + * @author sebastien.amelinckx@gmail.com + */ +public interface TcpStream extends Stream { + + String getSrcAddr(); + + String getDestAddr(); + + int getSrcPort(); + + int getDestPort(); + + void addPacket(TCPPacket packet); + + TcpState getState(); + + long getUuid(); + + boolean Ended(); +} diff --git a/pkts-streams/src/main/java/io/pkts/streams/impl/DefaultTcpStream.java b/pkts-streams/src/main/java/io/pkts/streams/impl/DefaultTcpStream.java new file mode 100644 index 00000000..d90c8406 --- /dev/null +++ b/pkts-streams/src/main/java/io/pkts/streams/impl/DefaultTcpStream.java @@ -0,0 +1,135 @@ +package io.pkts.streams.impl; + +import io.hektor.fsm.FSM; +import io.hektor.fsm.TransitionListener; +import io.pkts.frame.PcapGlobalHeader; +import io.pkts.packet.TCPPacket; +import io.pkts.streams.StreamId; +import io.pkts.streams.TcpStream; +import io.pkts.streams.impl.tcpFSM.TcpStreamContext; +import io.pkts.streams.impl.tcpFSM.TcpStreamData; +import io.pkts.streams.impl.tcpFSM.TcpStreamFSM; +import io.pkts.streams.impl.tcpFSM.TcpStreamFSM.TcpState; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; + +/** + * @author sebastien.amelinckx@gmail.com + */ +public class DefaultTcpStream implements TcpStream { + + private final PcapGlobalHeader globalHeader; + + private final TransportStreamId id; + private final long uuid; + + private final PriorityQueue packets; // packets are ordered by arrival time and can have the same arrival time + private final FSM fsm; + + public DefaultTcpStream(PcapGlobalHeader globalHeader, TransportStreamId id, long uuid, TransitionListener synListener){ + this.globalHeader = globalHeader; + this.id = id; + this.uuid = uuid; + this.packets = new PriorityQueue(new PacketComparator()); + this.fsm = TcpStreamFSM.definition.newInstance(uuid, new TcpStreamContext(), new TcpStreamData(), null, synListener); + fsm.start(); + } + @Override + public List getPackets() { + return new ArrayList(this.packets); + } + + @Override + public long getDuration() { + return this.getTimeOfLastPacket() - this.getTimeOfFirstPacket(); + } + + @Override + public long getTimeOfFirstPacket() { + if (this.packets.isEmpty()) { + return -1; + } + + return packets.peek().getArrivalTime(); + } + + @Override + public long getTimeOfLastPacket() { + if (this.packets.isEmpty()) { + return -1; + } + + TCPPacket last = null; + for (TCPPacket packet : packets){ + last = packet; + } + + return last.getArrivalTime(); + } + + @Override + public StreamId getStreamIdentifier() { + return this.id; + } + + @Override + public void write(OutputStream out) throws IOException { + throw new UnsupportedOperationException("Writing out a DefaultTCPStream is Unsupported"); + } + + @Override + public String getSrcAddr() { + return id.getSourceAddress(); + } + + @Override + public String getDestAddr() { + return id.getDestinationAddress(); + } + + @Override + public int getSrcPort() { + return id.getSourcePort(); + } + + @Override + public int getDestPort() { + return id.getDestinationPort(); + } + + @Override + public void addPacket(TCPPacket packet){ + fsm.onEvent(packet); + // if new syn exchange, a new stream will be started by the synListener + // in that case, no need to add the new syn packet to the stream + if (fsm.getState() != TcpState.CLOSED_PORTS_REUSED){ + packets.add(packet); + } + } + + @Override + public TcpState getState(){ + return (TcpState) fsm.getState(); + } + + @Override + public long getUuid(){ + return this.uuid; + } + + /** + * A TCP stream is ended when the connection ends, but even in this state + * a stream can receive new arriving packets. So, the stream is ended in the + * closed state, but also when the ports are reused which is the terminal state + * of the {@link TcpStreamFSM}. + */ + @Override + public boolean Ended() { + return fsm.getState() == TcpState.CLOSED || fsm.getState() == TcpState.CLOSED_PORTS_REUSED; + } + +} diff --git a/pkts-streams/src/main/java/io/pkts/streams/impl/TcpStreamHandler.java b/pkts-streams/src/main/java/io/pkts/streams/impl/TcpStreamHandler.java new file mode 100644 index 00000000..8c467e3b --- /dev/null +++ b/pkts-streams/src/main/java/io/pkts/streams/impl/TcpStreamHandler.java @@ -0,0 +1,258 @@ +package io.pkts.streams.impl; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.LinkedHashMap; +import java.util.HashMap; +import java.util.Map; + +import io.hektor.fsm.TransitionListener; + +import io.pkts.frame.PcapGlobalHeader; +import io.pkts.frame.Frame; +import io.pkts.framer.FramerManager; + +import io.pkts.packet.Packet; +import io.pkts.packet.IPPacket; +import io.pkts.packet.TCPPacket; +import io.pkts.packet.IPv4Packet; +import io.pkts.packet.IPv6Packet; +import io.pkts.packet.PacketParseException; + +import io.pkts.protocol.Protocol; + +import io.pkts.streams.FragmentListener; +import io.pkts.streams.StreamHandler; +import io.pkts.streams.StreamListener; +import io.pkts.streams.SipStatistics; +import io.pkts.streams.Stream; +import io.pkts.streams.StreamId; +import io.pkts.streams.TcpStream; +import io.pkts.streams.impl.tcpFSM.TcpStreamFSM.TcpState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link StreamHandler} for TCP conversations. + * The handler will figure out if the received {@link Frame} contains a TCP packet and if so, + * will parse the {@link Frame} in a {@link TCPPacket} and add it to the corresponding Stream. + * A {@link TcpStream} is identified by a 5-tuple (src addr, dest addr, src port, dest port, TCP protocol), + * with the additional catch that a stream can see it's 5-tuple réused for another connection. + * A {@link TcpStream} CAN be started with a SYN 3-way handshake, but in case a new 5-tuple is observed + * with no 3-way handshake it will be assumed that a new stream has started. + * IP fragmentation is not handled by this class, but adding a {@link FragmentListener} is supported. + * + * @author sebastien.amelinckx@gmail.com + */ +public class TcpStreamHandler implements StreamHandler { + + /** + * Class implementing a {@link TransitionListener} to check if a stream closes due to a new SYN packet + * in which case a new stream has to be started from that same closing SYN packet. + * */ + private class SynListener implements TransitionListener{ + @Override + public void onTransition(TcpState currentState, TcpState toState, Object o) { + if (!(o instanceof TCPPacket)){ + throw new RuntimeException("Object passed to TcpFSM is of Class: " + o.getClass()); + } else if (toState == TcpState.CLOSED_PORTS_REUSED && ((TCPPacket) o).isSYN()) { + startNewStream((TCPPacket) o); + } + } + } + + + private final static Logger logger = LoggerFactory.getLogger(TcpStreamHandler.class); + + private final FramerManager framerManager; + + private StreamListener tcpListener; + + private FragmentListener fragmentListener; + + // This map is for looking up what streams are accessible and attribute packets to them + private final Map activeTcpStreams = new HashMap(); + + // This map is for the user to recover all streams + private final Map streams = new LinkedHashMap(); + private int uuid_counter = 0; + + public TcpStreamHandler() { + this.framerManager = FramerManager.getInstance(); + } + + + @Override + public void addStreamListener(StreamListener listener) throws IllegalArgumentException { + try { + final Method method = listener.getClass().getMethod("endStream", Stream.class); + final ParameterizedType parameterizedType = (ParameterizedType) method.getGenericParameterTypes()[0]; + final Type[] parameterArgTypes = parameterizedType.getActualTypeArguments(); + + final Type parameterArgType = parameterArgTypes[0]; + final Class parameterArgClass = (Class) parameterArgType; + if (parameterArgClass.equals(TCPPacket.class)) { + this.tcpListener = (StreamListener) listener; + } else { + throw new ClassCastException(); + } + + } catch (final ArrayIndexOutOfBoundsException e) { + throw new RuntimeException("Unable to figure out the paramterized type", e); + } catch (final SecurityException e) { + throw new RuntimeException("Unable to access method information due to security constraints", e); + } catch (final NoSuchMethodException e) { + throw new RuntimeException("The startStream method doesn't exist. Signature changed?", e); + } catch (final ClassCastException e) { + // means that the user had not parameterized the StreamListener + // interface, which means that we cannot actually detect streams. + throw new IllegalArgumentException("The supplied listener has not been correctly parameterized"); + } + } + + @Override + public void setFragmentListener(FragmentListener fragmentListener) { + this.fragmentListener = fragmentListener; + } + + @Override + public SipStatistics getSipStatistics() { + throw new UnsupportedOperationException("Getting Sip Statistics from a TCPStreamHandler is Unsupported"); + } + + @Override + public Map getStreams() { + return this.streams; + } + + @Override + public boolean nextPacket(Packet packet){ + try { + if (packet.hasProtocol(Protocol.IPv4)) { // handle IPv4 fragmentation notification + final IPPacket ip = (IPPacket) packet.getPacket(Protocol.IPv4); + if (ip.isFragmented()) { + packet = handleFragmentation(ip); + if (packet == null) { + return true; + } + } + } else if (packet.hasProtocol(Protocol.IPv6)){ // handle IPv6 fragmentation notification + final IPPacket ip = (IPPacket) packet.getPacket(Protocol.IPv6); + if (ip.isFragmented()) { + packet = handleFragmentation(ip); + if (packet == null) { + return true; + } + } + } + + if (packet.hasProtocol(Protocol.TCP) && + (packet.hasProtocol(Protocol.IPv4) || packet.hasProtocol(Protocol.IPv6))) { + this.processFrame(packet); + } + + } catch (final IOException | PacketParseException e) { + e.printStackTrace(); + } + + return true; + } + + public void processFrame(final Packet frame) throws PacketParseException { + try { + final IPPacket ipPacket = + frame.hasProtocol(Protocol.IPv4) ? + (IPv4Packet) frame.getPacket(Protocol.IPv4) + : (frame.hasProtocol(Protocol.IPv6) ? (IPv6Packet) frame.getPacket(Protocol.IPv6) : null); + + final TCPPacket tcpPacket = (TCPPacket) frame.getPacket(Protocol.TCP); + + if (ipPacket == null || tcpPacket == null){ + throw new NullPointerException("tcp or ip packet was null when processed"); + } + + final TransportStreamId pktStreamId = new TransportStreamId(tcpPacket); + + TcpStream stream = activeTcpStreams.get(pktStreamId); + stream = (stream == null) ? activeTcpStreams.get(pktStreamId.oppositeFlowDirection()) : stream; + + if (stream == null) { + startNewStream(tcpPacket); + } else { + final boolean isAlreadyClosed = stream.Ended(); + stream.addPacket(tcpPacket); + this.notifyPacketReceived(stream, tcpPacket); + if (!isAlreadyClosed && stream.Ended()){ // call endStream only once when the last packet closed it + this.notifyEndStream(stream); + } + } + + } catch (Exception e){ + e.printStackTrace(); + } + } + + private IPPacket handleFragmentation(final IPPacket ipPacket) { + if (this.fragmentListener == null) { + return null; + } + try { + return this.fragmentListener.handleFragment(ipPacket); + } catch (final Throwable t) { + logger.warn("Exception thrown by FragmentListener when processing the IP frame", t); + } + return null; + } + + private void notifyStartStream(final TcpStream stream, final TCPPacket pkt) { + if (this.tcpListener != null) { + this.tcpListener.startStream(stream, pkt); + } + } + + private void notifyPacketReceived(final TcpStream stream, final TCPPacket pkt) { + if (this.tcpListener != null) { + this.tcpListener.packetReceived(stream, pkt); + } + } + + private void notifyEndStream(final TcpStream stream) { + if (this.tcpListener != null) { + this.tcpListener.endStream(stream); + } + } + + private PcapGlobalHeader assignGlobalHeader(Packet frame) throws PacketParseException{ + PcapGlobalHeader header = null; + try { + if (frame.hasProtocol(Protocol.SLL)) { + header = PcapGlobalHeader.createDefaultHeader(Protocol.SLL); + } else if (frame.hasProtocol(Protocol.ETHERNET_II)) { + header = PcapGlobalHeader.createDefaultHeader(Protocol.ETHERNET_II); + } else { + throw new PacketParseException(0, "Unable to create the PcapGlobalHeader because the " + + "link type isn't recognized. Currently only Ethernet II " + + "and Linux SLL (linux cooked capture) are implemented"); + } + + } catch (IOException e){ + e.printStackTrace(); + } + return header; + } + + private void startNewStream(TCPPacket packet){ + TransportStreamId pktStreamId = new TransportStreamId(packet); + + PcapGlobalHeader header = assignGlobalHeader(packet.getParentPacket().getParentPacket()); + TcpStream stream = new DefaultTcpStream(header, pktStreamId, uuid_counter++, new SynListener()); + + this.activeTcpStreams.put(pktStreamId, stream); // stream replaced if 5-tuple already present + this.streams.put(new LongStreamId(stream.getUuid()), stream); + stream.addPacket(packet); + this.notifyStartStream(stream, packet); + } +} diff --git a/pkts-streams/src/main/java/io/pkts/streams/impl/TransportStreamId.java b/pkts-streams/src/main/java/io/pkts/streams/impl/TransportStreamId.java new file mode 100644 index 00000000..420f98e9 --- /dev/null +++ b/pkts-streams/src/main/java/io/pkts/streams/impl/TransportStreamId.java @@ -0,0 +1,98 @@ +package io.pkts.streams.impl; + +import io.pkts.packet.TCPPacket; +import io.pkts.protocol.Protocol; +import io.pkts.streams.StreamId; + +import java.util.Objects; + +/** + * Represents the 5-tuple identifier for a transport layer stream. This 5-tuple consists + * of the source and destination IP addresses, source and destination port numbers, + * and the transport layer protocol (TCP/UDP/...). It's important to note that a same 5-tuple may be used + * multiple times in a packet capture, but might belong to different streams. + * + * @author sebastien.amelinckx@gmail.com + */ +public class TransportStreamId implements StreamId { + + private final String sourceAddress; + private final String destinationAddress; + private final int sourcePort; + private final int destinationPort; + + private final Protocol protocol; + + public TransportStreamId(String sourceAddress, String destinationAddress, int sourcePort, int destinationPort, Protocol protocol){ + this.sourceAddress = sourceAddress; + this.destinationAddress = destinationAddress; + this.sourcePort = sourcePort; + this.destinationPort = destinationPort; + this.protocol = protocol; + } + + public TransportStreamId(TCPPacket packet){ + this.sourceAddress = packet.getParentPacket().getSourceIP(); + this.destinationAddress = packet.getParentPacket().getDestinationIP(); + this.sourcePort = packet.getSourcePort(); + this.destinationPort = packet.getDestinationPort(); + this.protocol = packet.getProtocol(); + } + + public TransportStreamId oppositeFlowDirection(){ + return new TransportStreamId(this.destinationAddress, this.sourceAddress, this.destinationPort, this.sourcePort, this.protocol); + } + + public String getSourceAddress() { + return sourceAddress; + } + + public String getDestinationAddress() { + return destinationAddress; + } + + public int getSourcePort() { + return sourcePort; + } + + public int getDestinationPort() { + return destinationPort; + } + + public Protocol getProtocol() { + return protocol; + } + + @Override + public int hashCode(){ + return Objects.hash(sourceAddress, destinationAddress, sourcePort, destinationPort, protocol); + } + + @Override + public String asString() { + return "Source Address: "+ sourceAddress + "\n" + + "Destination Address: " + destinationAddress + "\n" + + "Source Port: " + sourcePort + "\n" + + "Destination Port: " + destinationPort + "\n" + + "Transport Layer Protocol: " + protocol + "\n"; + } + + @Override + public String toString() { + return asString(); + } + + @Override + public boolean equals(final Object obj){ + if (obj instanceof TransportStreamId){ + TransportStreamId other = (TransportStreamId) obj; + return (other.getSourceAddress().equals(this.sourceAddress) + && other.getDestinationAddress().equals(this.destinationAddress) + && other.getSourcePort() == this.sourcePort + && other.getDestinationPort() == this.destinationPort + && other.getProtocol() == this.protocol); + } else { + return false; + } + } +} diff --git a/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamContext.java b/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamContext.java new file mode 100644 index 00000000..ef6f204d --- /dev/null +++ b/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamContext.java @@ -0,0 +1,10 @@ +package io.pkts.streams.impl.tcpFSM; + +import io.hektor.fsm.Context; + +/** + * Necessary class for the {@link TcpStreamFSM} using Hektor. + * + * @author sebastien.amelinckx@gmail.com + */ +public class TcpStreamContext implements Context {} diff --git a/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamData.java b/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamData.java new file mode 100644 index 00000000..59d619b6 --- /dev/null +++ b/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamData.java @@ -0,0 +1,97 @@ +package io.pkts.streams.impl.tcpFSM; + +import io.hektor.fsm.Data; +import io.pkts.packet.TCPPacket; +import io.pkts.streams.impl.TransportStreamId; + +/** + * Necessary class for the {@link TcpStreamFSM} using Hektor. + * Stores sequence numbers of SYN and FIN packets and their corresponding stream IDs to determine + * if SYN packets are retransmissions, making sure the state does not change in that case, + * and to determine if FIN packets are acked, closing their side of the connection in that case. + * + * @author sebastien.amelinckx@gmail.com + */ +public class TcpStreamData implements Data { + private long syn1Seq = -1; // in most cases base sequence number + private long syn2Seq = -1; // in case of two way SYN + private long fin1Seq; + private long fin2Seq; + + private TransportStreamId syn1Id; + private TransportStreamId syn2Id; + private TransportStreamId fin1Id; + private TransportStreamId fin2Id; + private boolean isFin1Terminated = false; + private boolean isFin2Terminated = false; + + public TcpStreamData(){} + + public void setFin1Seq(TCPPacket packet) { + this.fin1Seq = packet.getSequenceNumber(); + this.fin1Id = new TransportStreamId(packet); + } + + public void setFin2Seq(TCPPacket packet) { + this.fin2Seq = packet.getSequenceNumber(); + this.fin2Id = new TransportStreamId(packet); + } + + public void setSyn1Seq(TCPPacket packet){ + this.syn1Seq = packet.getSequenceNumber(); + this.syn1Id = new TransportStreamId(packet); + } + + public void setSyn2Seq(TCPPacket packet){ + this.syn2Seq = packet.getSequenceNumber(); + this.syn2Id = new TransportStreamId(packet); + } + + public long getSyn1Seq() { + return syn1Seq; + } + + public long getSyn2Seq() { + return syn2Seq; + } + + public TransportStreamId getSyn1Id() { + return syn1Id; + } + + public TransportStreamId getSyn2Id() { + return syn2Id; + } + + public long getFin1Seq() { + return fin1Seq; + } + + public long getFin2Seq() { + return fin2Seq; + } + + public TransportStreamId getFin1Id() { + return fin1Id; + } + + public TransportStreamId getFin2Id() { + return fin2Id; + } + + public boolean isFin1Terminated() { + return isFin1Terminated; + } + + public boolean isFin2Terminated() { + return isFin2Terminated; + } + + public void terminateFin1() { + this.isFin1Terminated = true; + } + + public void terminateFin2() { + this.isFin2Terminated = true; + } +} diff --git a/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamFSM.java b/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamFSM.java new file mode 100644 index 00000000..a83f9421 --- /dev/null +++ b/pkts-streams/src/main/java/io/pkts/streams/impl/tcpFSM/TcpStreamFSM.java @@ -0,0 +1,176 @@ +package io.pkts.streams.impl.tcpFSM; + +import io.hektor.fsm.Definition; +import io.hektor.fsm.FSM; +import io.hektor.fsm.builder.FSMBuilder; +import io.hektor.fsm.builder.StateBuilder; +import io.pkts.packet.TCPPacket; +import io.pkts.streams.impl.TransportStreamId; + +import static io.pkts.streams.impl.tcpFSM.TcpStreamFSM.TcpState.*; + +/** + * FSM using {@link io.hektor.fsm.FSM} to model the TCP connection. This class handles the state transitions for an + * ongoing TCP connection. The states are defined in {@link TcpState}. This class uses {@link TcpStreamData} to store + * the necessary data for the FSM to work. It has no use for the {@link TcpStreamContext} class, but it is there to + * make the library work. + * + * @author sebastien.amelinckx@gmail.com + */ +public class TcpStreamFSM{ + public final static Definition definition; + public enum TcpState{ + INIT, HANDSHAKE, ESTABLISHED, FIN_WAIT_1, FIN_WAIT_2, + CLOSING_1_CLOSING_2, CLOSED_1_CLOSING_2, CLOSING_1_CLOSED_2, CLOSED, CLOSED_PORTS_REUSED + } + + static { + final FSMBuilder builder= + FSM.of(TcpState.class).ofContextType(TcpStreamContext.class).withDataType(TcpStreamData.class); + + + // define all states for the builder + final StateBuilder init = builder.withInitialState(INIT); + final StateBuilder handshake = builder.withState(HANDSHAKE); + final StateBuilder established = builder.withState(ESTABLISHED); + final StateBuilder finWait1 = builder.withState(FIN_WAIT_1); + final StateBuilder finWait2 = builder.withState(FIN_WAIT_2); + final StateBuilder closing1Closing2 = builder.withState(CLOSING_1_CLOSING_2); + final StateBuilder closed1Closing2 = builder.withState(CLOSED_1_CLOSING_2); + final StateBuilder closing1Closed2 = builder.withState(CLOSING_1_CLOSED_2); + final StateBuilder closed = builder.withState(CLOSED); + final StateBuilder closedPortsReused = builder.withFinalState(CLOSED_PORTS_REUSED); + + // define all transitions + init.transitionTo(HANDSHAKE).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isSynPacket).withAction(TcpStreamFSM::setSyn1); + init.transitionTo(FIN_WAIT_1).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isFinPacket).withAction(TcpStreamFSM::setFin1);; + init.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + init.transitionTo(ESTABLISHED).onEvent(TCPPacket.class); + + handshake.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + handshake.transitionToSelf().onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isSynPacket).withAction(TcpStreamFSM::setSyn2); + handshake.transitionTo(FIN_WAIT_1).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isFinPacket).withAction(TcpStreamFSM::setFin1); + handshake.transitionTo(ESTABLISHED).onEvent(TCPPacket.class); + + established.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + established.transitionTo(CLOSED_PORTS_REUSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isNewSynPacket); // skipped the end of stream, New stream noticed + established.transitionTo(FIN_WAIT_1).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isFinPacket).withAction(TcpStreamFSM::setFin1); + established.transitionToSelf().onEvent(TCPPacket.class); + + finWait1.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + finWait1.transitionTo(CLOSED_PORTS_REUSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isNewSynPacket); + finWait1.transitionTo(CLOSED_1_CLOSING_2).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::ackOfFin1AndFin2).withAction(TcpStreamFSM::closeFin1SetFin2); // special case FIN + ACKOfFin1 packet + finWait1.transitionTo(FIN_WAIT_2).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isAckOfFin1).withAction(TcpStreamFSM::closeFin1); // if first fin has been acked + finWait1.transitionTo(CLOSING_1_CLOSING_2).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isSecondFinPacket).withAction(TcpStreamFSM::setFin2); + finWait1.transitionToSelf().onEvent(TCPPacket.class); + + finWait2.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + finWait2.transitionTo(CLOSED_PORTS_REUSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isNewSynPacket); + finWait2.transitionTo(CLOSED_1_CLOSING_2).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isSecondFinPacket).withAction(TcpStreamFSM::setFin2); // 2nd fin observed + finWait2.transitionToSelf().onEvent(TCPPacket.class); + + closing1Closing2.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + closing1Closing2.transitionTo(CLOSED_PORTS_REUSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isNewSynPacket); + closing1Closing2.transitionTo(CLOSED_1_CLOSING_2).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isAckOfFin1).withAction(TcpStreamFSM::closeFin1); + closing1Closing2.transitionTo(CLOSING_1_CLOSED_2).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isAckOfFin2).withAction(TcpStreamFSM::closeFin2); + closing1Closing2.transitionToSelf().onEvent(TCPPacket.class); + + closed1Closing2.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + closed1Closing2.transitionTo(CLOSED_PORTS_REUSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isNewSynPacket); + closed1Closing2.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isAckOfFin2).withAction(TcpStreamFSM::closeFin2); + closed1Closing2.transitionToSelf().onEvent(TCPPacket.class); + + closing1Closed2.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isRstPacket); + closing1Closed2.transitionTo(CLOSED_PORTS_REUSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isNewSynPacket); + closing1Closed2.transitionTo(CLOSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isAckOfFin1).withAction(TcpStreamFSM::closeFin1); + closing1Closed2.transitionToSelf().onEvent(TCPPacket.class); + + /* + * When a stream is in a closed state (gracefully or abruptly), it can still receive packets such + * as retransmissions, keep-alive, new RST packets, data, etc. The only case were we should never add a + * new packet to the stream is when a new SYN packet is received, which means a new stream is starting. + * Explaining why here, the terminal state for the FSM is when we observe that ports are reused. + */ + closed.transitionTo(CLOSED_PORTS_REUSED).onEvent(TCPPacket.class).withGuard(TcpStreamFSM::isNewSynPacket); + closed.transitionToSelf().onEvent(TCPPacket.class); + + definition = builder.build(); + } + + private static boolean isSynPacket(TCPPacket packet){ + return packet.isSYN(); + } + + private static boolean isFinPacket(TCPPacket packet){ + return packet.isFIN(); + } + + private static boolean isSecondFinPacket(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ // check if FIN segment comes from the second party + return packet.isFIN() && (!data.getFin1Id().equals(new TransportStreamId(packet))); // is it a FIN segment AND not coming from the same direction as FIN 1 + } + + private static boolean isRstPacket(TCPPacket packet){ + return packet.isRST(); + } + + private static void setFin1(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + data.setFin1Seq(packet); + } + + private static void setFin2(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + data.setFin2Seq(packet); + } + + private static void closeFin1(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + data.terminateFin1(); + } + + private static void closeFin2(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + data.terminateFin2(); + } + + private static boolean isAckOfFin1(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + return packet.isACK() && !data.isFin1Terminated() && + data.getFin1Id().equals(new TransportStreamId(packet).oppositeFlowDirection()) && + data.getFin1Seq() < packet.getAcknowledgementNumber(); + } + + private static boolean isAckOfFin2(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + return packet.isACK() && !data.isFin2Terminated() && + data.getFin2Id().equals(new TransportStreamId(packet).oppositeFlowDirection()) && + data.getFin2Seq() < packet.getAcknowledgementNumber(); + } + + private static boolean ackOfFin1AndFin2(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + return isAckOfFin1(packet, ctx, data) && isSecondFinPacket(packet, ctx, data); + } + + private static void closeFin1SetFin2(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + closeFin1(packet, ctx, data); + setFin2(packet, ctx, data); + } + + private static boolean isNewSynPacket(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + return isSynPacket(packet) && !isSynDuplicate(packet, ctx, data); + } + + // special case where a SYN packet is a retransmitted packet, which means we need to ignore it + // instead of closing the stream + private static boolean isSynDuplicate(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + long seqSynNum = packet.getSequenceNumber(); + TransportStreamId packetStreamId = new TransportStreamId(packet); + + return (seqSynNum == data.getSyn1Seq() && packetStreamId.equals(data.getSyn1Id())) + || (seqSynNum == data.getSyn2Seq() && packetStreamId.equals(data.getSyn2Id())); + } + + private static void setSyn1(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + data.setSyn1Seq(packet); + } + + private static void setSyn2(TCPPacket packet, TcpStreamContext ctx, TcpStreamData data){ + if (!isSynDuplicate(packet, ctx, data)){ + data.setSyn2Seq(packet); + } + } +} diff --git a/pkts-streams/src/test/java/io/pkts/streams/TcpStreamTest.java b/pkts-streams/src/test/java/io/pkts/streams/TcpStreamTest.java new file mode 100644 index 00000000..c91c8458 --- /dev/null +++ b/pkts-streams/src/test/java/io/pkts/streams/TcpStreamTest.java @@ -0,0 +1,149 @@ +package io.pkts.streams; + +import io.pkts.Pcap; +import io.pkts.frame.PcapGlobalHeader; +import io.pkts.framer.FramingException; +import io.pkts.packet.Packet; +import io.pkts.packet.PacketParseException; +import io.pkts.packet.TCPPacket; +import io.pkts.protocol.Protocol; +import io.pkts.streams.impl.TcpStreamHandler; +import io.pkts.streams.impl.tcpFSM.TcpStreamFSM; +import org.junit.After; +import org.junit.Test; +import io.pkts.streams.impl.TransportStreamId; +import io.pkts.streams.impl.DefaultTcpStream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; + +import static org.junit.Assert.*; + +/** + * Simple unit tests for objects used for implementing tcp streams and the example file. + * + * @author sebastien.amelinckx@gmail.com + */ +public class TcpStreamTest { + TransportStreamId id; + TcpStream stream; + + @After + public void tearDown(){ + id = null; + stream = null; + } + + @Test + public void basicTcpStreamTest() { + try { + Pcap pcap = Pcap.openStream(TcpStreamTest.class.getResourceAsStream("tcp-fsm/few_established_only.pcap")); + pcap.loop(packet -> { + if (packet.hasProtocol(Protocol.TCP)){ + TCPPacket TcpPacket = (TCPPacket) packet.getPacket(Protocol.TCP); + + if(id == null){ + id = new TransportStreamId(TcpPacket); + stream = new DefaultTcpStream(assignGlobalHeader(TcpPacket.getParentPacket().getParentPacket()), id, 1, null); + } + stream.addPacket(TcpPacket); + } + return true; + }); + } catch (IOException e) { + e.printStackTrace(); + fail("Failed to open pcap file"); + } + + assertEquals(stream.getStreamIdentifier(), id); + assertEquals(stream.getState(), TcpStreamFSM.TcpState.ESTABLISHED); + assertFalse(stream.Ended()); + + assertEquals(stream.getSrcAddr(), "172.16.100.13"); + assertEquals(stream.getDestAddr(), "172.16.100.10"); + assertEquals(stream.getSrcPort(), 2436); + assertEquals(stream.getDestPort(), 389); + assertEquals(stream.getUuid(), 1); + + } + + /** + * StreamExample002 as a JUnit test to verify the output. + * + * @author sebastien.amelinckx@gmail.com + */ + @Test + public void StreamExample002Test() throws IOException, FramingException { + + // Step 1 - Open the pcap containing our traffic. + final Pcap pcap = Pcap.openStream(TcpStreamTest.class.getResourceAsStream("tcp-streams/example_tcp_traffic.pcap")); + // Step 2 - Instead of implementing our own PacketHandler we will be + // using a TcpStreamHandler provided for us by the io.pkts.streams + // library. It has a StreamHandler (which obviously + // implements the FrameHandler) that will detect new tcp streams + // and call a StreamListener when appropriate. + final TcpStreamHandler streamHandler = new TcpStreamHandler(); + + // Step 3 - In this simple example we will just supply a very basic + // StreamListener for TCP. All we will do is + // print to std out when a new event occurs for a stream. + streamHandler.addStreamListener(new StreamListener() { + + @Override + public void startStream(final Stream stream, final TCPPacket packet) { + + TcpStream tcpStream = (TcpStream) stream; + + System.out.println("New TCP stream detected. Stream n°" + tcpStream.getUuid() + "\n" + + " Stream id: " + stream.getStreamIdentifier()); + System.out.println("First packet seq num was: " + packet.getSequenceNumber()); + } + + @Override + public void packetReceived(final Stream stream, final TCPPacket packet) { + TcpStream tcpStream = (TcpStream) stream; + System.out.println("Received a new TCP packet for stream: " + tcpStream.getUuid()); + } + + @Override + public void endStream(final Stream stream) { + TcpStream tcpStream = (TcpStream) stream; + System.out.println("The stream ended. Stream n°" + tcpStream.getUuid()); + } + }); + + // Step 4 - Call the loop function as usual but pass in the TcpStreamHandler + // instead of your own "raw" FrameHandler. + pcap.loop(streamHandler); + + // Step 5 - Do whatever with the streams and packets inside + Map allStreams = streamHandler.getStreams(); + + ArrayList streams = new ArrayList(allStreams.values()); + + for(TcpStream stream : streams) { + System.out.println("Stream " + stream.getUuid() + " has " + stream.getPackets().size() + " packets."); + } + } + + + private static PcapGlobalHeader assignGlobalHeader(Packet frame) throws PacketParseException { + PcapGlobalHeader header = null; + try { + if (frame.hasProtocol(Protocol.SLL)) { + header = PcapGlobalHeader.createDefaultHeader(Protocol.SLL); + } else if (frame.hasProtocol(Protocol.ETHERNET_II)) { + header = PcapGlobalHeader.createDefaultHeader(Protocol.ETHERNET_II); + } else { + throw new PacketParseException(0, "Unable to create the PcapGlobalHeader because the " + + "link type isn't recognized. Currently only Ethernet II " + + "and Linux SLL (linux cooked capture) are implemented"); + } + + } catch (IOException e){ + e.printStackTrace(); + } + return header; + } +} diff --git a/pkts-streams/src/test/java/io/pkts/streams/impl/TcpStreamFSMTest.java b/pkts-streams/src/test/java/io/pkts/streams/impl/TcpStreamFSMTest.java new file mode 100644 index 00000000..e7af7264 --- /dev/null +++ b/pkts-streams/src/test/java/io/pkts/streams/impl/TcpStreamFSMTest.java @@ -0,0 +1,374 @@ +package io.pkts.streams.impl; + +import java.util.ArrayList; + +import io.hektor.fsm.FSM; +import io.pkts.Pcap; +import io.pkts.packet.TCPPacket; +import io.pkts.protocol.Protocol; +import io.pkts.streams.StreamsTestBase; +import io.pkts.streams.impl.tcpFSM.TcpStreamContext; +import io.pkts.streams.impl.tcpFSM.TcpStreamData; +import io.pkts.streams.impl.tcpFSM.TcpStreamFSM; +import static org.junit.Assert.assertEquals; + +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for the {@link TcpStreamFSM}. + * + * @author sebastien.amelinckx@gmail.com + */ +public class TcpStreamFSMTest { + + FSM stream; + TcpStreamContext ctx; + TcpStreamData data; + ArrayList packets; + + @Before + public void setUp(){ + ctx = new TcpStreamContext(); + data = new TcpStreamData(); + stream = TcpStreamFSM.definition.newInstance("uuid-123",ctx, data); + stream.start(); + } + + @Test + public void testFewEstablishedOnly() { + packets = retrievePackets("tcp-fsm/few_established_only.pcap"); + assertEquals(TcpStreamFSM.TcpState.INIT, stream.getState()); + for (TCPPacket packet : packets) { + stream.onEvent(packet); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + } + } + + @Test + public void testEstablishedOnly() { + packets = retrievePackets("tcp-fsm/established_only.pcap"); + assertEquals(TcpStreamFSM.TcpState.INIT, stream.getState()); + for (TCPPacket packet : packets) { + stream.onEvent(packet); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + } + } + + @Test + public void testStartFin() { + packets = retrievePackets("tcp-fsm/start_fin.pcap"); + + assertEquals(TcpStreamFSM.TcpState.INIT, stream.getState()); + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_2, stream.getState()); + stream.onEvent(packets.get(2)); + assertEquals(TcpStreamFSM.TcpState.CLOSED_1_CLOSING_2, stream.getState()); + stream.onEvent(packets.get(3)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + } + + @Test + public void testGracefulFin1Fin2() { + packets = retrievePackets("tcp-fsm/graceful_fin1_fin2.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + packets.remove(0); + packets.remove(0); + + // process established connection until start of graceful end + int count = 0; + for (TCPPacket packet : packets) { + count++; + stream.onEvent(packet); + if (stream.getState() != TcpStreamFSM.TcpState.ESTABLISHED){ + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + break; + } + } + + stream.onEvent(packets.get(count)); + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_2, stream.getState()); + + stream.onEvent(packets.get(++count)); + assertEquals(TcpStreamFSM.TcpState.CLOSED_1_CLOSING_2, stream.getState()); + + stream.onEvent(packets.get(++count)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + + } + + @Test + public void testGracefulFin1Fin2PlusAck() { + packets = retrievePackets("tcp-fsm/graceful_fin1_fin2_+_ack.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + packets.remove(0); + packets.remove(0); + + // process established connection until start of gracefull end + int count = 0; + for (TCPPacket packet : packets) { + count++; + stream.onEvent(packet); + if (stream.getState() != TcpStreamFSM.TcpState.ESTABLISHED){ + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + break; + } + } + + stream.onEvent(packets.get(count)); // case FIN + ACK of first FIN + assertEquals(TcpStreamFSM.TcpState.CLOSED_1_CLOSING_2, stream.getState()); + + stream.onEvent(packets.get(++count)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + } + + @Test + public void testGracefulSimultaneous() { + packets = retrievePackets("tcp-fsm/graceful_simultaneous.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + packets.remove(0); + packets.remove(0); + + // process established connection until graceful end + int count = 0; + for (TCPPacket packet : packets) { + count++; + stream.onEvent(packet); + if (stream.getState() != TcpStreamFSM.TcpState.ESTABLISHED){ + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + break; + } + } + + stream.onEvent(packets.get(count)); + assertEquals(TcpStreamFSM.TcpState.CLOSING_1_CLOSING_2, stream.getState()); + + stream.onEvent(packets.get(++count)); + assertEquals(TcpStreamFSM.TcpState.CLOSING_1_CLOSED_2, stream.getState()); + + stream.onEvent(packets.get(++count)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + } + + @Test + public void testAbruptInit() { + packets = retrievePackets("tcp-fsm/abrupt_init.pcap"); + + assertEquals(TcpStreamFSM.TcpState.INIT, stream.getState()); + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + } + + @Test + public void testAbruptHandshake() { + packets = retrievePackets("tcp-fsm/abrupt_handshake.pcap"); + + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(2)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + } + + @Test + public void testAbruptEstablished() { + packets = retrievePackets("tcp-fsm/abrupt_established.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + packets.remove(0); + packets.remove(0); + + // process established connection until abrupt end + for (TCPPacket packet : packets) { + stream.onEvent(packet); + if (stream.getState() != TcpStreamFSM.TcpState.ESTABLISHED){ + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + break; + } + } + } + + @Test + public void testAbruptFin1() { + packets = retrievePackets("tcp-fsm/abrupt_fin1.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + packets.remove(0); + packets.remove(0); + + // process established connection until graceful end + int count = 0; + for (TCPPacket packet : packets) { + count++; + stream.onEvent(packet); + if (stream.getState() != TcpStreamFSM.TcpState.ESTABLISHED){ + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + break; + } + } + // abrupt end in FIN_WAIT_1 + stream.onEvent(packets.get(count)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + + } + + @Test + public void testAbruptFin2() { + packets = retrievePackets("tcp-fsm/abrupt_fin2.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + packets.remove(0); + packets.remove(0); + + // process established connection until graceful end + int count = 0; + for (TCPPacket packet : packets) { + count++; + stream.onEvent(packet); + if (stream.getState() != TcpStreamFSM.TcpState.ESTABLISHED){ + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + break; + } + } + + stream.onEvent(packets.get(count)); + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_2, stream.getState()); + + // abrupt end in FIN_WAIT_2 + stream.onEvent(packets.get(++count)); + assertEquals(TcpStreamFSM.TcpState.CLOSED, stream.getState()); + + } + + @Test + public void testSynEndEstablished() { + packets = retrievePackets("tcp-fsm/established_syn_duplicate.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(2)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + + // syn packet but it is a duplicate, should have no effect on the FSM state + stream.onEvent(packets.get(3)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + + } + + @Test + public void testFin1SynDuplicate() { + packets = retrievePackets("tcp-fsm/fin1_syn_duplicate.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(2)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + stream.onEvent(packets.get(3)); + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + + // syn packet but it is a duplicate, should have no effect on the FSM state + stream.onEvent(packets.get(4)); + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + + } + + @Test + public void testEstablishedSynPortsReused() { + packets = retrievePackets("tcp-fsm/established_syn_ports_reused.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(2)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + + // 3 data packets + stream.onEvent(packets.get(3)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + stream.onEvent(packets.get(4)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + stream.onEvent(packets.get(5)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + + // new syn reusing ports (marked in wireshark), FSM should consider the connection closed and ports reused + stream.onEvent(packets.get(6)); + assertEquals(TcpStreamFSM.TcpState.CLOSED_PORTS_REUSED, stream.getState()); + + } + + @Test + public void testFin1SynPortsReused() { + packets = retrievePackets("tcp-fsm/fin1_syn_ports_reused.pcap"); + + // syn exchange + stream.onEvent(packets.get(0)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(1)); + assertEquals(TcpStreamFSM.TcpState.HANDSHAKE, stream.getState()); + stream.onEvent(packets.get(2)); + assertEquals(TcpStreamFSM.TcpState.ESTABLISHED, stream.getState()); + stream.onEvent(packets.get(3)); + assertEquals(TcpStreamFSM.TcpState.FIN_WAIT_1, stream.getState()); + + // new syn reusing ports (marked in wireshark), FSM should consider the connection closed and ports reused + stream.onEvent(packets.get(4)); + assertEquals(TcpStreamFSM.TcpState.CLOSED_PORTS_REUSED, stream.getState()); + + } + + private static ArrayList retrievePackets(String filename){ + ArrayList packets = new ArrayList<>(); + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream(filename)); + pcap.loop(packet -> { + if (packet.hasProtocol(Protocol.TCP)) { + packets.add((TCPPacket) packet.getPacket(Protocol.TCP)); + } + return true; + }); + + } catch (Exception e) { + e.printStackTrace(); + } + return packets; + } + +} diff --git a/pkts-streams/src/test/java/io/pkts/streams/impl/TcpStreamHandlerTest.java b/pkts-streams/src/test/java/io/pkts/streams/impl/TcpStreamHandlerTest.java new file mode 100644 index 00000000..bd98d1fa --- /dev/null +++ b/pkts-streams/src/test/java/io/pkts/streams/impl/TcpStreamHandlerTest.java @@ -0,0 +1,273 @@ +package io.pkts.streams.impl; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; + +import io.pkts.Pcap; +import io.pkts.packet.TCPPacket; +import io.pkts.streams.*; + +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * + * End-to-end tests for the {@link TcpStreamHandler} class. + * The methodology here is to look if the handler identifies the same number + * of streams as would wireshark for some captured or generated traffic. + * + * @author sebastien.amelinclx@gmail.com + */ +public class TcpStreamHandlerTest { + + TcpStreamHandler streamHandler; + + @Before + public void setUp(){ + streamHandler = new TcpStreamHandler(); + streamHandler.addStreamListener(new StreamListener() { + @Override + public void startStream(Stream stream, TCPPacket packet) { + TcpStream tcpStream = (TcpStream) stream; + //System.out.println("New stream n°"+tcpStream.getUuid()+ " has started"); + } + + @Override + public void packetReceived(Stream stream, TCPPacket packet) { + TcpStream tcpStream = (TcpStream) stream; + //System.out.println("New packet for stream n°"+tcpStream.getUuid()); + } + + @Override + public void endStream(Stream stream) { + TcpStream tcpStream = (TcpStream) stream; + //System.out.println("Stream n°"+tcpStream.getUuid()+ " has ended"); + } + }); + } + + /* + * + * General case tests with captured samples of traffic. + * + */ + @Test + public void testBaseUsage() { + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/base_usage_3_streams.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + Set keys = all_streams.keySet(); + assertEquals(3, keys.size()); + + Collection streams = all_streams.values(); + + assertEquals(380, streams.stream().toList().get(0).getPackets().size()); + assertEquals(11, streams.stream().toList().get(1).getPackets().size()); + assertEquals(9, streams.stream().toList().get(2).getPackets().size()); + + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + + } + + /** + * Test on captured web traffic containing 273 streams. + */ + @Test + public void testUserTraffic() { + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/user_traffic.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + Set keys = all_streams.keySet(); + assertEquals(273, keys.size()); + + } catch (Exception e){ + e.printStackTrace(); + } + + } + + // single stream that after closing with an RST packet receives a FIN packet previously unseen + @Test + public void testFinAfterRst() { + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/fin_after-rst.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(1, all_streams.size()); + + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + } + + // single stream that exchanges keep-alive packets after closing. + @Test + public void testKeepAlive() { + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/keep_alive_after_closed.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(1, all_streams.size()); + + } catch (Exception e){ + e.printStackTrace(); + } + + } + + // single stream that after closing receives an out-of-order packet. + @Test + public void testOutOfOrder() { + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/out_of_order.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(1, all_streams.size()); + + } catch (Exception e){ + e.printStackTrace(); + } + + } + + /* + * + * Test on corner cases with synthetic traffic. + * + */ + + /** + * Test on a capture with two streams, each with a full handshake and 20 data packets, + * with the second stream reusing the same ports as the first one. + */ + @Test + public void testReusingPorts(){ + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/ports_reused.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(2, all_streams.size()); + + ArrayList streams_tcp = new ArrayList(all_streams.values()); + + assertEquals(23, streams_tcp.get(0).getPackets().size()); + assertEquals(23, streams_tcp.get(1).getPackets().size()); + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + } + + /** + * Test on a capture with a duplicate syn packet. Should not split the stream because the second syn + * is only a retransmission of the first one. Wireshark marks it as a 'reused port' packet but also as + * a 'out-of-order' packet, but not as a 'retransmission' oddly enough. Despite this, it considers it part + * of the same stream on it's first pass. + * We expect the same behavior from the handler. + */ + @Test + public void testSynDuplicateAfterClosed(){ + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/syn_duplicate_after_closed.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(1, all_streams.size()); + + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + } + + /** + * Test on a capture with two streams, each with a full handshake and port reuse. + * The second stream has a data packet that could be a retransmission of the first stream. + * But, because it happens after a port reused, it is considered by wireshark as belonging + * to the second stream on it's first pass and marked as a 'spurious retransmission' on the second pass. + * We expect the same behavior from the handler. + */ + @Test + public void testSpuriousRetransmission(){ + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/spurious_retransmit.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(2, all_streams.size()); + + ArrayList streams_tcp = new ArrayList(all_streams.values()); + + assertEquals(9, streams_tcp.get(0).getPackets().size()); + assertEquals(4, streams_tcp.get(1).getPackets().size()); + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + } + + /** + * Test on a stream that closes with a RST packet and then receives 6 packets after 10 years of inactivity. + * Despite the obvious timeout, wireshark still considers the packets as part of the same stream. + * We expect the same behavior from the handler. + */ + @Test + public void testPacketBeyondTimeout(){ + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/passed_timeout.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(1, all_streams.size()); + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + } + + /** + * Test on a stream that gets a packet with a lower sequence number than the base sequence number of + * the stream. Despite this, wireshark still considers the packet as part of the same stream. + * We expect the same behavior from the handler. + */ + @Test + public void testLowerSeq(){ + try { + Pcap pcap = Pcap.openStream(StreamsTestBase.class.getResourceAsStream("tcp-streams/lower_seq.pcap")); + pcap.loop(streamHandler); + + Map all_streams = streamHandler.getStreams(); + + assertEquals(1, all_streams.size()); + } catch (Exception e){ + e.printStackTrace(); + fail(); + } + } + +} diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_established.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_established.pcap new file mode 100644 index 00000000..c714585a Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_established.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_fin1.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_fin1.pcap new file mode 100644 index 00000000..ebb4aeee Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_fin1.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_fin2.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_fin2.pcap new file mode 100644 index 00000000..7ace673b Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_fin2.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_handshake.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_handshake.pcap new file mode 100644 index 00000000..eb1d4ea9 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_handshake.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_init.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_init.pcap new file mode 100644 index 00000000..15faa665 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/abrupt_init.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_only.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_only.pcap new file mode 100644 index 00000000..296e6cb8 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_only.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_syn_duplicate.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_syn_duplicate.pcap new file mode 100644 index 00000000..3d43d39e Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_syn_duplicate.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_syn_ports_reused.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_syn_ports_reused.pcap new file mode 100644 index 00000000..a1311a7e Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/established_syn_ports_reused.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/few_established_only.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/few_established_only.pcap new file mode 100644 index 00000000..3f30a64f Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/few_established_only.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/fin1_syn_duplicate.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/fin1_syn_duplicate.pcap new file mode 100644 index 00000000..82d11aa7 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/fin1_syn_duplicate.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/fin1_syn_ports_reused.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/fin1_syn_ports_reused.pcap new file mode 100644 index 00000000..a505bbc9 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/fin1_syn_ports_reused.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_fin1_fin2.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_fin1_fin2.pcap new file mode 100644 index 00000000..b3752ea0 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_fin1_fin2.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_fin1_fin2_+_ack.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_fin1_fin2_+_ack.pcap new file mode 100644 index 00000000..d46ac0b5 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_fin1_fin2_+_ack.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_simultaneous.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_simultaneous.pcap new file mode 100644 index 00000000..ac9047be Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/graceful_simultaneous.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/start_fin.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/start_fin.pcap new file mode 100644 index 00000000..f477bbd8 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-fsm/start_fin.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/base_usage_3_streams.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/base_usage_3_streams.pcap new file mode 100644 index 00000000..e673361a Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/base_usage_3_streams.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/example_tcp_traffic.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/example_tcp_traffic.pcap new file mode 100644 index 00000000..e83f6f29 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/example_tcp_traffic.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/fin_after-rst.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/fin_after-rst.pcap new file mode 100644 index 00000000..c77d1995 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/fin_after-rst.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/keep_alive_after_closed.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/keep_alive_after_closed.pcap new file mode 100644 index 00000000..5aa96155 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/keep_alive_after_closed.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/lower_seq.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/lower_seq.pcap new file mode 100644 index 00000000..476cf167 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/lower_seq.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/out_of_order.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/out_of_order.pcap new file mode 100644 index 00000000..2ebda4bc Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/out_of_order.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/passed_timeout.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/passed_timeout.pcap new file mode 100644 index 00000000..d25c5559 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/passed_timeout.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/ports_reused.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/ports_reused.pcap new file mode 100644 index 00000000..297b010f Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/ports_reused.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/spurious_retransmit.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/spurious_retransmit.pcap new file mode 100644 index 00000000..521c5a84 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/spurious_retransmit.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/syn_duplicate_after_closed.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/syn_duplicate_after_closed.pcap new file mode 100644 index 00000000..82d11aa7 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/syn_duplicate_after_closed.pcap differ diff --git a/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/user_traffic.pcap b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/user_traffic.pcap new file mode 100644 index 00000000..bc7a13e9 Binary files /dev/null and b/pkts-streams/src/test/resources/io/pkts/streams/tcp-streams/user_traffic.pcap differ