Skip to content

Commit

Permalink
Missing messages fixes (#1007)
Browse files Browse the repository at this point in the history
This PR contains multiple fixes:
- Handle TCP padding (zeroes at the end of TCP payload), and do not treat it as a body
- Handle requests with "Expect: 100-Continue" - the ones which require confirmation from the server, before sending the body
- Fix muti-packet headers parsing, if "truncated" header starts with malformed header format
- Fix replay of pcap files (Ignore Stats method since it is not supported)
- Fix output file chunk size detection
  • Loading branch information
buger committed Aug 30, 2021
1 parent 553e901 commit bb1bca6
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 36 deletions.
13 changes: 9 additions & 4 deletions capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,12 @@ func (l *Listener) read() {
return
case <-timer.C:
if h, ok := hndl.handler.(PcapStatProvider); ok {
s, _ := h.Stats()
stats.Add("packets_received", int64(s.PacketsReceived))
stats.Add("packets_dropped", int64(s.PacketsDropped))
stats.Add("packets_if_dropped", int64(s.PacketsIfDropped))
s, err := h.Stats()
if err == nil {
stats.Add("packets_received", int64(s.PacketsReceived))
stats.Add("packets_dropped", int64(s.PacketsDropped))
stats.Add("packets_if_dropped", int64(s.PacketsIfDropped))
}
}
default:
data, ci, err := hndl.handler.ReadPacketData()
Expand Down Expand Up @@ -520,6 +522,9 @@ func (l *Listener) activatePcapFile() (err error) {
handle.Close()
return fmt.Errorf("BPF filter error: %q, filter: %s", e, l.BPFFilter)
}

fmt.Println("BPF Filter:", l.BPFFilter)

l.Handles["pcap_file"] = packetHandle{
handler: handle,
}
Expand Down
3 changes: 2 additions & 1 deletion output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func (o *FileOutput) filename() string {

if nextChunk {
fileIndex++
o.currentFileSize = 0
}
}

Expand Down Expand Up @@ -307,6 +306,8 @@ func (o *FileOutput) closeLocked() error {
}

o.closed = true
o.currentFileSize = 0

return nil
}

Expand Down
71 changes: 43 additions & 28 deletions proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ func header(payload []byte, name []byte) (value []byte, headerStart, headerEnd,
headerEnd += headerStart
colonIndex = bytes.IndexByte(payload[headerStart:headerEnd], ':')
if colonIndex == -1 {
break
// Malformed header, skip, most likely packet with partial headers
headerStart = headerEnd + 1
continue
}
colonIndex += headerStart

if bytes.EqualFold(payload[headerStart:colonIndex], name) {
valueStart = colonIndex + 1
valueEnd = headerEnd - 2
Expand Down Expand Up @@ -448,75 +451,87 @@ type ProtocolStateSetter interface {
ProtocolState() interface{}
}

type httpProto struct {
body int // body index
headerStart int
headerParsed bool // we checked necessary headers
hasFullBody bool // all chunks has been parsed
isChunked bool // Transfer-Encoding: chunked
bodyLen int // Content-Length's value
hasTrailer bool // Trailer header?
type HTTPState struct {
Body int // body index
HeaderStart int
HeaderEnd int
HeaderParsed bool // we checked necessary headers
HasFullPayload bool // all chunks has been parsed
IsChunked bool // Transfer-Encoding: chunked
BodyLen int // Content-Length's value
HasTrailer bool // Trailer header?
Continue100 bool
}

// HasFullPayload checks if this message has full or valid payloads and returns true.
// Message param is optional but recommended on cases where 'data' is storing
// partial-to-full stream of bytes(packets).
func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool {
var state *httpProto
var state *HTTPState
if m != nil {
state, _ = m.ProtocolState().(*httpProto)
state, _ = m.ProtocolState().(*HTTPState)
}
if state == nil {
state = new(httpProto)
state = new(HTTPState)
if m != nil {
m.SetProtocolState(state)
}
}
if state.headerStart < 1 {
if state.HeaderStart < 1 {
for _, data := range payloads {
state.headerStart = MIMEHeadersStartPos(data)
if state.headerStart < 0 {
state.HeaderStart = MIMEHeadersStartPos(data)
if state.HeaderStart < 0 {
return false
} else {
break
}
}
}

if state.body < 1 {
if state.Body < 1 || state.HeaderEnd < 1 {
var pos int
for _, data := range payloads {
endPos := MIMEHeadersEndPos(data)
if endPos < 0 {
pos += len(data)
} else {
pos += endPos
state.HeaderEnd = pos
}

if endPos > 0 {
state.body = pos
state.Body = pos
break
}
}
}
if !state.headerParsed {

if state.HeaderEnd < 1 {
return false
}

if !state.HeaderParsed {
var pos int
for _, data := range payloads {
chunked := Header(data, []byte("Transfer-Encoding"))

if len(chunked) > 0 && bytes.Index(data, []byte("chunked")) > 0 {
state.isChunked = true
state.IsChunked = true
// trailers are generally not allowed in non-chunks body
state.hasTrailer = len(Header(data, []byte("Trailer"))) > 0
state.HasTrailer = len(Header(data, []byte("Trailer"))) > 0
} else {
contentLen := Header(data, []byte("Content-Length"))
state.bodyLen, _ = atoI(contentLen, 10)
state.BodyLen, _ = atoI(contentLen, 10)
}

pos += len(data)

if state.bodyLen > 0 || pos >= state.body {
state.headerParsed = true
if string(Header(data, []byte("Expect"))) == "100-continue" {
state.Continue100 = true
}

if state.BodyLen > 0 || pos >= state.Body {
state.HeaderParsed = true
break
}
}
Expand All @@ -526,22 +541,22 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool {
for _, data := range payloads {
bodyLen += len(data)
}
bodyLen -= state.body
bodyLen -= state.Body

if state.isChunked {
if state.IsChunked {
// check chunks
if bodyLen < 1 {
return false
}

// check trailer headers
if state.hasTrailer {
if state.HasTrailer {
if bytes.HasSuffix(payloads[len(payloads)-1], []byte("\r\n\r\n")) {
return true
}
} else {
if bytes.HasSuffix(payloads[len(payloads)-1], []byte("0\r\n\r\n")) {
state.hasFullBody = true
state.HasFullPayload = true
return true
}
}
Expand All @@ -550,7 +565,7 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool {
}

// check for content-length header
return state.bodyLen == bodyLen
return state.BodyLen == bodyLen
}

// this works with positive integers
Expand Down
6 changes: 6 additions & 0 deletions proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func TestHeader(t *testing.T) {
if val = Header(payload, []byte("host")); !bytes.Equal(val, []byte("www.w3.org")) {
t.Error("Should find lower case 1 word header")
}

payload = []byte("GT\r\nContent-Length: 10\r\n\r\n")

if val = Header(payload, []byte("Content-Length")); !bytes.Equal(val, []byte("10")) {
t.Error("Should find in partial payload")
}
}

func TestMIMEHeadersEndPos(t *testing.T) {
Expand Down
39 changes: 38 additions & 1 deletion tcp/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"time"
"unsafe"

"github.com/buger/goreplay/proto"
)

// TCPProtocol is a number to indicate type of protocol
Expand Down Expand Up @@ -168,6 +170,13 @@ func (m *Message) Data() []byte {
tmp, _ = copySlice(tmp, len(packetData[0]), packetData[1:]...)
}

// Remove Expect header, since its replay not fully supported
if state, ok := m.feedback.(*proto.HTTPState); ok {
if state.Continue100 {
tmp = proto.DeleteHeader(tmp, []byte("Expect"))
}
}

return tmp
}

Expand Down Expand Up @@ -281,7 +290,9 @@ func (parser *MessageParser) wait(index int) {
func (parser *MessageParser) parsePacket(pcapPkt *PcapPacket) *Packet {
pckt, err := ParsePacket(pcapPkt.Data, pcapPkt.LType, pcapPkt.LTypeLen, pcapPkt.Ci, false)
if err != nil {
stats.Add("packet_error", 1)
if _, empty := err.(EmptyPacket); !empty {
stats.Add("packet_error", 1)
}
return nil
}

Expand Down Expand Up @@ -373,12 +384,38 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) bool {
if parser.End != nil {
if parser.End(m) {
parser.Emit(m)
return true
}

parser.Fix100Continue(m)
}

return true
}

func (parser *MessageParser) Fix100Continue(m *Message) {
if state, ok := m.feedback.(*proto.HTTPState); ok && state.Continue100 {
delete(parser.m[m.Idx], m.packets[0].MessageID())

// Shift Ack by given offset
// Size of "HTTP/1.1 100 Continue\r\n\r\n" message
for _, p := range m.packets {
p.messageID = 0
p.Ack += 25
}

// If next section was aready approved and received, merge messages
if next, found := parser.m[m.Idx][m.packets[0].MessageID()]; found {
for _, p := range next.packets {
parser.addPacket(m, p)
}
}

// Re-add (or override) again with new message and ID
parser.m[m.Idx][m.packets[0].MessageID()] = m
}
}

func (parser *MessageParser) Read() *Message {
m := <-parser.messages
return m
Expand Down
11 changes: 10 additions & 1 deletion tcp/tcp_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,16 @@ func (pckt *Packet) parse(data []byte, lType, lTypeLen int, cp *gopacket.Capture
return ErrHdrLength("TCP opts")
}

if !allowEmpty && len(ndata[dOf:]) == 0 {
// There are case when packet have padding but dOf shows its not
empty := true
for i := 0; i < len(ndata[dOf:]); i++ {
if ndata[dOf:][i] != 0 {
empty = false
break
}
}

if !allowEmpty && empty {
return EmptyPacket("")
}

Expand Down
4 changes: 3 additions & 1 deletion tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ func TestMessageTimeoutReached(t *testing.T) {
packets := GetPackets(true, 1, 2, data[:])
p := NewMessageParser(nil, nil, nil, 10*time.Millisecond, true)
p.processPacket(packets[0])
time.Sleep(time.Second * 2)

time.Sleep(time.Millisecond * 200)

p.processPacket(packets[1])
m := p.Read()
if m.Length != 63<<10 {
Expand Down

0 comments on commit bb1bca6

Please sign in to comment.