diff --git a/engine/engine.go b/engine/engine.go index c838e0d..56f5ed3 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -102,9 +102,11 @@ func (e *engine) dispatch(p io.Packet) bool { _ = e.io.SetVerdict(p, io.VerdictAcceptStream, nil) return true } + // Convert to gopacket.Packet + packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) + packet.Metadata().Timestamp = p.Timestamp() // Load balance by stream ID index := p.StreamID() % uint32(len(e.workers)) - packet := gopacket.NewPacket(data, layerType, gopacket.DecodeOptions{Lazy: true, NoCopy: true}) e.workers[index].Feed(&workerPacket{ StreamID: p.StreamID(), Packet: packet, diff --git a/io/interface.go b/io/interface.go index 35aa886..af7e1e7 100644 --- a/io/interface.go +++ b/io/interface.go @@ -3,6 +3,7 @@ package io import ( "context" "net" + "time" ) type Verdict int @@ -24,6 +25,8 @@ const ( type Packet interface { // StreamID is the ID of the stream the packet belongs to. StreamID() uint32 + // Timestamp is the time the packet was received. + Timestamp() time.Time // Data is the raw packet data, starting with the IP header. Data() []byte } diff --git a/io/nfqueue.go b/io/nfqueue.go index 543f247..e84a0bb 100644 --- a/io/nfqueue.go +++ b/io/nfqueue.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "syscall" + "time" "github.com/coreos/go-iptables/iptables" "github.com/florianl/go-nfqueue" @@ -189,6 +190,12 @@ func (n *nfqueuePacketIO) Register(ctx context.Context, cb PacketCallback) error streamID: ctIDFromCtBytes(*a.Ct), data: *a.Payload, } + // Use timestamp from attribute if available, otherwise use current time as fallback + if a.Timestamp != nil { + p.timestamp = *a.Timestamp + } else { + p.timestamp = time.Now() + } return okBoolToInt(cb(p, nil)) }, func(e error) int { @@ -312,15 +319,20 @@ func (n *nfqueuePacketIO) setupIpt(local, rst, remove bool) error { var _ Packet = (*nfqueuePacket)(nil) type nfqueuePacket struct { - id uint32 - streamID uint32 - data []byte + id uint32 + streamID uint32 + timestamp time.Time + data []byte } func (p *nfqueuePacket) StreamID() uint32 { return p.streamID } +func (p *nfqueuePacket) Timestamp() time.Time { + return p.timestamp +} + func (p *nfqueuePacket) Data() []byte { return p.data }