Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tcp streams with hektor-fsm #148

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added pkts-examples/my_tcp_traffic.pcap
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.pkts.examples.streams;

import io.pkts.Pcap;
import io.pkts.framer.FramingException;
import io.pkts.packet.TCPPacket;
import io.pkts.streams.Stream;
import io.pkts.streams.StreamId;
import io.pkts.streams.StreamListener;
import io.pkts.streams.TcpStream;
import io.pkts.streams.impl.TcpStreamHandler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;

/**
* Simple example showing how to use streams.
*
* The core pcap support provided by pkts.io is only focusing on each individual
* packet but quite often your application may be interested in a stream of
* packets. A stream can mean different things for different protocols. E.g. for
* UDP, a stream in this context could be all packets sent and received from the
* same local and remote port-pair (which is how the stream support in pkts.io
* has defined a UDP stream).
*
* For other protocols, there may be other identifiers within the protocol that
* defines what a stream is. As an example, TCP is a connection oriented protocol that
* uses signaling packets (SYN, SYN-ACK, RST, FIN) to establish and end connections. A TCP
* stream is therefore equivalent to the exchange of packets within a TCP connection.
*
* This particular example shows how to setup pkts.io and its stream support to
* consume {@link TcpStream}s.
*
* @author [email protected]
*/
public class StreamsExample002 {

public static void main(final String[] args) throws IOException, FramingException {

// Step 1 - Open the pcap containing our traffic.
final Pcap pcap = Pcap.openStream("my_tcp_traffic.pcap");
// Step 2 - Instead of implementing our own PacketHandler we will be
// using a TcpStreamHandler provided for us by the io.pkts.streams
// library. It has a StreamHandler (which obviously
// implements the FrameHandler) that will detect new tcp streams
// and call a StreamListener when appropriate.
final TcpStreamHandler streamHandler = new TcpStreamHandler();

// Step 3 - In this simple example we will just supply a very basic
// StreamListener for TCP. All we will do is
// print to std out when a new event occurs for a stream.
streamHandler.addStreamListener(new StreamListener<TCPPacket>() {

@Override
public void startStream(final Stream<TCPPacket> stream, final TCPPacket packet) {

TcpStream tcpStream = (TcpStream) stream;

System.out.println("New TCP stream detected. Stream n°" + tcpStream.getUuid() + "\n" +
" Stream id: " + stream.getStreamIdentifier());
System.out.println("First packet seq num was: " + packet.getSequenceNumber());
}

@Override
public void packetReceived(final Stream<TCPPacket> stream, final TCPPacket packet) {
TcpStream tcpStream = (TcpStream) stream;
System.out.println("Received a new TCP packet for stream: " + tcpStream.getUuid());
}

@Override
public void endStream(final Stream<TCPPacket> stream) {
TcpStream tcpStream = (TcpStream) stream;
System.out.println("The stream ended. Stream n°" + tcpStream.getUuid());
}
});

// Step 4 - Call the loop function as usual but pass in the TcpStreamHandler
// instead of your own "raw" FrameHandler.
pcap.loop(streamHandler);

// Step 5 - Do whatever with the streams and packets inside
Map<StreamId, TcpStream> allStreams = streamHandler.getStreams();

ArrayList<TcpStream> streams = new ArrayList<TcpStream>(allStreams.values());

for(TcpStream stream : streams) {
stream.getPackets().forEach(packet -> {
System.out.println("Packet seq num: " + packet.getSequenceNumber());
});
}
}
}
22 changes: 20 additions & 2 deletions pkts-streams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,19 @@
</parent>

<name>Pkts Streams</name>
<groupId>io.pkts</groupId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>16</source>
<target>16</target>
</configuration>
</plugin>
</plugins>
</build>
<groupId>io.pkts</groupId>
<artifactId>pkts-streams</artifactId>
<packaging>jar</packaging>
<dependencies>
Expand All @@ -24,7 +36,13 @@
<artifactId>pkts-sdp</artifactId>
</dependency>

<dependency>
<dependency>
<groupId>io.hektor</groupId>
<artifactId>hektor-fsm</artifactId>
<version>0.0.5</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
Expand Down
33 changes: 33 additions & 0 deletions pkts-streams/src/main/java/io/pkts/streams/TcpStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.pkts.streams;

import io.pkts.packet.TCPPacket;
import io.pkts.streams.impl.tcpFSM.TcpStreamFSM.TcpState;

/**
* An {@link TcpStream} represents a stream of {@link TCPPacket}s belonging to the same connection.
* A TCP conversation is identified it's 5-tuple (src-ip, src-port, dest-ip, dest-port, protocol=TCP), and
* should follow a valid state progression for the lifetime of the connection (see RFC 793). This means that for
* the same 5-tuple, if an event occurs that indicates the connection is no longer valid (such as a new SYN exchange
* or data exchange when the connection should be closed), the stream should be ended and a new stream should be
* potentially created for the new connection.
*
* @author [email protected]
*/
public interface TcpStream extends Stream<TCPPacket> {

String getSrcAddr();

String getDestAddr();

int getSrcPort();

int getDestPort();

void addPacket(TCPPacket packet);

TcpState getState();

long getUuid();

boolean Ended();
}
135 changes: 135 additions & 0 deletions pkts-streams/src/main/java/io/pkts/streams/impl/DefaultTcpStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package io.pkts.streams.impl;

import io.hektor.fsm.FSM;
import io.hektor.fsm.TransitionListener;
import io.pkts.frame.PcapGlobalHeader;
import io.pkts.packet.TCPPacket;
import io.pkts.streams.StreamId;
import io.pkts.streams.TcpStream;
import io.pkts.streams.impl.tcpFSM.TcpStreamContext;
import io.pkts.streams.impl.tcpFSM.TcpStreamData;
import io.pkts.streams.impl.tcpFSM.TcpStreamFSM;
import io.pkts.streams.impl.tcpFSM.TcpStreamFSM.TcpState;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;

/**
* @author [email protected]
*/
public class DefaultTcpStream implements TcpStream {

private final PcapGlobalHeader globalHeader;

private final TransportStreamId id;
private final long uuid;

private final PriorityQueue<TCPPacket> packets; // packets are ordered by arrival time and can have the same arrival time
private final FSM fsm;

public DefaultTcpStream(PcapGlobalHeader globalHeader, TransportStreamId id, long uuid, TransitionListener<TcpState> synListener){
this.globalHeader = globalHeader;
this.id = id;
this.uuid = uuid;
this.packets = new PriorityQueue<TCPPacket>(new PacketComparator());
this.fsm = TcpStreamFSM.definition.newInstance(uuid, new TcpStreamContext(), new TcpStreamData(), null, synListener);
fsm.start();
}
@Override
public List<TCPPacket> getPackets() {
return new ArrayList<TCPPacket>(this.packets);
}

@Override
public long getDuration() {
return this.getTimeOfLastPacket() - this.getTimeOfFirstPacket();
}

@Override
public long getTimeOfFirstPacket() {
if (this.packets.isEmpty()) {
return -1;
}

return packets.peek().getArrivalTime();
}

@Override
public long getTimeOfLastPacket() {
if (this.packets.isEmpty()) {
return -1;
}

TCPPacket last = null;
for (TCPPacket packet : packets){
last = packet;
}

return last.getArrivalTime();
}

@Override
public StreamId getStreamIdentifier() {
return this.id;
}

@Override
public void write(OutputStream out) throws IOException {
throw new UnsupportedOperationException("Writing out a DefaultTCPStream is Unsupported");
}

@Override
public String getSrcAddr() {
return id.getSourceAddress();
}

@Override
public String getDestAddr() {
return id.getDestinationAddress();
}

@Override
public int getSrcPort() {
return id.getSourcePort();
}

@Override
public int getDestPort() {
return id.getDestinationPort();
}

@Override
public void addPacket(TCPPacket packet){
fsm.onEvent(packet);
// if new syn exchange, a new stream will be started by the synListener
// in that case, no need to add the new syn packet to the stream
if (fsm.getState() != TcpState.CLOSED_PORTS_REUSED){
packets.add(packet);
}
}

@Override
public TcpState getState(){
return (TcpState) fsm.getState();
}

@Override
public long getUuid(){
return this.uuid;
}

/**
* A TCP stream is ended when the connection ends, but even in this state
* a stream can receive new arriving packets. So, the stream is ended in the
* closed state, but also when the ports are reused which is the terminal state
* of the {@link TcpStreamFSM}.
*/
@Override
public boolean Ended() {
return fsm.getState() == TcpState.CLOSED || fsm.getState() == TcpState.CLOSED_PORTS_REUSED;
}

}
Loading