From 6c6d373a7a3368d3e2dc3a57f623cf6dbe135bdb Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 30 Aug 2021 18:22:49 +0300 Subject: [PATCH 1/5] Missing messages fixes This PR contains multiple fixes: - Handle TCP padding (zeroes at the end of TCP payload), and do not treat it as a body - Handle requests with "Expect: 100-Continue" - the ones which require confirmation from the server, before sending the body - Fix muti-packet headers parsing, if "truncated" header starts with malformed header format - Fix replay of pcap files (Ignore Stats method since it is not supported) --- capture/capture.go | 13 ++++++--- proto/proto.go | 71 +++++++++++++++++++++++++++------------------ proto/proto_test.go | 6 ++++ tcp/tcp_message.go | 31 +++++++++++++++++++- tcp/tcp_packet.go | 11 ++++++- 5 files changed, 98 insertions(+), 34 deletions(-) diff --git a/capture/capture.go b/capture/capture.go index d155f43c..c6dfd4f0 100644 --- a/capture/capture.go +++ b/capture/capture.go @@ -397,10 +397,12 @@ func (l *Listener) read() { return case <-timer.C: if h, ok := hndl.handler.(PcapStatProvider); ok { - s, _ := h.Stats() - stats.Add("packets_received", int64(s.PacketsReceived)) - stats.Add("packets_dropped", int64(s.PacketsDropped)) - stats.Add("packets_if_dropped", int64(s.PacketsIfDropped)) + s, err := h.Stats() + if err == nil { + stats.Add("packets_received", int64(s.PacketsReceived)) + stats.Add("packets_dropped", int64(s.PacketsDropped)) + stats.Add("packets_if_dropped", int64(s.PacketsIfDropped)) + } } default: data, ci, err := hndl.handler.ReadPacketData() @@ -520,6 +522,9 @@ func (l *Listener) activatePcapFile() (err error) { handle.Close() return fmt.Errorf("BPF filter error: %q, filter: %s", e, l.BPFFilter) } + + fmt.Println("BPF Filter:", l.BPFFilter) + l.Handles["pcap_file"] = packetHandle{ handler: handle, } diff --git a/proto/proto.go b/proto/proto.go index 5f51d1c5..6f9a11b3 100644 --- a/proto/proto.go +++ b/proto/proto.go @@ -77,9 +77,12 @@ func header(payload []byte, name []byte) (value []byte, headerStart, headerEnd, headerEnd += headerStart colonIndex = bytes.IndexByte(payload[headerStart:headerEnd], ':') if colonIndex == -1 { - break + // Malformed header, skip, most likely packet with partial headers + headerStart = headerEnd + 1 + continue } colonIndex += headerStart + if bytes.EqualFold(payload[headerStart:colonIndex], name) { valueStart = colonIndex + 1 valueEnd = headerEnd - 2 @@ -448,34 +451,36 @@ type ProtocolStateSetter interface { ProtocolState() interface{} } -type httpProto struct { - body int // body index - headerStart int - headerParsed bool // we checked necessary headers - hasFullBody bool // all chunks has been parsed - isChunked bool // Transfer-Encoding: chunked - bodyLen int // Content-Length's value - hasTrailer bool // Trailer header? +type HTTPState struct { + Body int // body index + HeaderStart int + HeaderEnd int + HeaderParsed bool // we checked necessary headers + HasFullPayload bool // all chunks has been parsed + IsChunked bool // Transfer-Encoding: chunked + BodyLen int // Content-Length's value + HasTrailer bool // Trailer header? + Continue100 bool } // HasFullPayload checks if this message has full or valid payloads and returns true. // Message param is optional but recommended on cases where 'data' is storing // partial-to-full stream of bytes(packets). func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { - var state *httpProto + var state *HTTPState if m != nil { - state, _ = m.ProtocolState().(*httpProto) + state, _ = m.ProtocolState().(*HTTPState) } if state == nil { - state = new(httpProto) + state = new(HTTPState) if m != nil { m.SetProtocolState(state) } } - if state.headerStart < 1 { + if state.HeaderStart < 1 { for _, data := range payloads { - state.headerStart = MIMEHeadersStartPos(data) - if state.headerStart < 0 { + state.HeaderStart = MIMEHeadersStartPos(data) + if state.HeaderStart < 0 { return false } else { break @@ -483,7 +488,7 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { } } - if state.body < 1 { + if state.Body < 1 || state.HeaderEnd < 1 { var pos int for _, data := range payloads { endPos := MIMEHeadersEndPos(data) @@ -491,32 +496,42 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { pos += len(data) } else { pos += endPos + state.HeaderEnd = pos } if endPos > 0 { - state.body = pos + state.Body = pos break } } } - if !state.headerParsed { + + if state.HeaderEnd < 1 { + return false + } + + if !state.HeaderParsed { var pos int for _, data := range payloads { chunked := Header(data, []byte("Transfer-Encoding")) if len(chunked) > 0 && bytes.Index(data, []byte("chunked")) > 0 { - state.isChunked = true + state.IsChunked = true // trailers are generally not allowed in non-chunks body - state.hasTrailer = len(Header(data, []byte("Trailer"))) > 0 + state.HasTrailer = len(Header(data, []byte("Trailer"))) > 0 } else { contentLen := Header(data, []byte("Content-Length")) - state.bodyLen, _ = atoI(contentLen, 10) + state.BodyLen, _ = atoI(contentLen, 10) } pos += len(data) - if state.bodyLen > 0 || pos >= state.body { - state.headerParsed = true + if string(Header(data, []byte("Expect"))) == "100-continue" { + state.Continue100 = true + } + + if state.BodyLen > 0 || pos >= state.Body { + state.HeaderParsed = true break } } @@ -526,22 +541,22 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { for _, data := range payloads { bodyLen += len(data) } - bodyLen -= state.body + bodyLen -= state.Body - if state.isChunked { + if state.IsChunked { // check chunks if bodyLen < 1 { return false } // check trailer headers - if state.hasTrailer { + if state.HasTrailer { if bytes.HasSuffix(payloads[len(payloads)-1], []byte("\r\n\r\n")) { return true } } else { if bytes.HasSuffix(payloads[len(payloads)-1], []byte("0\r\n\r\n")) { - state.hasFullBody = true + state.HasFullPayload = true return true } } @@ -550,7 +565,7 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { } // check for content-length header - return state.bodyLen == bodyLen + return state.BodyLen == bodyLen } // this works with positive integers diff --git a/proto/proto_test.go b/proto/proto_test.go index 111bedcf..5466fa3f 100644 --- a/proto/proto_test.go +++ b/proto/proto_test.go @@ -56,6 +56,12 @@ func TestHeader(t *testing.T) { if val = Header(payload, []byte("host")); !bytes.Equal(val, []byte("www.w3.org")) { t.Error("Should find lower case 1 word header") } + + payload = []byte("GT\r\nContent-Length: 10\r\n\r\n") + + if val = Header(payload, []byte("Content-Length")); !bytes.Equal(val, []byte("10")) { + t.Error("Should find in partial payload") + } } func TestMIMEHeadersEndPos(t *testing.T) { diff --git a/tcp/tcp_message.go b/tcp/tcp_message.go index 5232e56e..ec6c5889 100644 --- a/tcp/tcp_message.go +++ b/tcp/tcp_message.go @@ -9,6 +9,8 @@ import ( "sort" "time" "unsafe" + + "github.com/buger/goreplay/proto" ) // TCPProtocol is a number to indicate type of protocol @@ -272,7 +274,9 @@ func (parser *MessageParser) wait() { func (parser *MessageParser) parsePacket(pcapPkt *PcapPacket) *Packet { pckt, err := ParsePacket(pcapPkt.Data, pcapPkt.LType, pcapPkt.LTypeLen, pcapPkt.Ci, false) if err != nil { - stats.Add("packet_error", 1) + if _, empty := err.(EmptyPacket); !empty { + stats.Add("packet_error", 1) + } return nil } @@ -342,6 +346,30 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) bool { if parser.End != nil { if parser.End(m) { parser.Emit(m) + } else { + // Expect: 100-continue handling + if state, ok := m.feedback.(*proto.HTTPState); ok { + if state.Continue100 { + delete(parser.m, m.packets[0].MessageID()) + + // Shift Ack by given offset + // Size of "HTTP/1.1 100 Continue\r\n\r\n" message + for _, p := range m.packets { + p.messageID = 0 + p.Ack += 25 + } + + // If next section was aready approved and received, merge messages + if next, found := parser.m[m.packets[0].MessageID()]; found { + for _, p := range next.packets { + parser.addPacket(m, p) + } + } + + // Re-add (or override) again with new message and ID + parser.m[m.packets[0].MessageID()] = m + } + } } } @@ -378,6 +406,7 @@ func (parser *MessageParser) timer(now time.Time) { m.TimedOut = true stats.Add("message_timeout_count", 1) failMsg++ + if parser.End == nil || parser.allowIncompete { parser.Emit(m) } diff --git a/tcp/tcp_packet.go b/tcp/tcp_packet.go index c7f76416..ac833e7e 100644 --- a/tcp/tcp_packet.go +++ b/tcp/tcp_packet.go @@ -169,7 +169,16 @@ func (pckt *Packet) parse(data []byte, lType, lTypeLen int, cp *gopacket.Capture return ErrHdrLength("TCP opts") } - if !allowEmpty && len(ndata[dOf:]) == 0 { + // There are case when packet have padding but dOf shows its not + empty := true + for i := 0; i < len(ndata[dOf:]); i++ { + if ndata[dOf:][i] != 0 { + empty = false + break + } + } + + if !allowEmpty && empty { return EmptyPacket("") } From fcb215bcc454c9444e3a38988eb0c7ae93eef8ba Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 30 Aug 2021 18:50:05 +0300 Subject: [PATCH 2/5] Ensure that Except header removed in recordeed payload Otherwise replay will fail. At the moment it is not fully replay `Expect: 100 Continue` scheme, but possible to record and make it look like standard POST request. --- tcp/tcp_message.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tcp/tcp_message.go b/tcp/tcp_message.go index ec6c5889..b360f58e 100644 --- a/tcp/tcp_message.go +++ b/tcp/tcp_message.go @@ -168,6 +168,13 @@ func (m *Message) Data() []byte { tmp, _ = copySlice(tmp, len(packetData[0]), packetData[1:]...) } + // Remove Expect header, since its replay not fully supported + if state, ok := m.feedback.(*proto.HTTPState); ok { + if state.Continue100 { + tmp = proto.DeleteHeader(tmp, []byte("Expect")) + } + } + return tmp } @@ -406,7 +413,6 @@ func (parser *MessageParser) timer(now time.Time) { m.TimedOut = true stats.Add("message_timeout_count", 1) failMsg++ - if parser.End == nil || parser.allowIncompete { parser.Emit(m) } From ad65c854b7bf0e19ee81523c6d1f4d2778dff87c Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 30 Aug 2021 19:11:50 +0300 Subject: [PATCH 3/5] Fix output file chunks sizes Should reset file chunk size only when we doing final file close --- output_file.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/output_file.go b/output_file.go index 6be27e9d..10af12b9 100644 --- a/output_file.go +++ b/output_file.go @@ -195,7 +195,6 @@ func (o *FileOutput) filename() string { if nextChunk { fileIndex++ - o.currentFileSize = 0 } } @@ -309,6 +308,8 @@ func (o *FileOutput) closeLocked() error { } o.closed = true + o.currentFileSize = 0 + return nil } From 333293611e7d4170f95322e921e7c12f366a6620 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 30 Aug 2021 19:20:35 +0300 Subject: [PATCH 4/5] Fix message timeout test --- tcp/tcp_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tcp/tcp_test.go b/tcp/tcp_test.go index e82b4150..c570f52a 100644 --- a/tcp/tcp_test.go +++ b/tcp/tcp_test.go @@ -217,7 +217,7 @@ func TestMessageTimeoutReached(t *testing.T) { packets := GetPackets(true, 1, 2, data[:]) p := NewMessageParser(nil, nil, nil, 10*time.Millisecond, true) p.processPacket(packets[0]) - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 200) p.processPacket(packets[1]) m := p.Read() if m.Length != 63<<10 { From 256d7f8216872c0ae9abf02ade742bed96710a97 Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Mon, 30 Aug 2021 19:28:53 +0300 Subject: [PATCH 5/5] Reduce cognitive complexity --- tcp/tcp_message.go | 50 ++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/tcp/tcp_message.go b/tcp/tcp_message.go index b360f58e..cf3a1f6d 100644 --- a/tcp/tcp_message.go +++ b/tcp/tcp_message.go @@ -353,36 +353,38 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) bool { if parser.End != nil { if parser.End(m) { parser.Emit(m) - } else { - // Expect: 100-continue handling - if state, ok := m.feedback.(*proto.HTTPState); ok { - if state.Continue100 { - delete(parser.m, m.packets[0].MessageID()) - - // Shift Ack by given offset - // Size of "HTTP/1.1 100 Continue\r\n\r\n" message - for _, p := range m.packets { - p.messageID = 0 - p.Ack += 25 - } - - // If next section was aready approved and received, merge messages - if next, found := parser.m[m.packets[0].MessageID()]; found { - for _, p := range next.packets { - parser.addPacket(m, p) - } - } - - // Re-add (or override) again with new message and ID - parser.m[m.packets[0].MessageID()] = m - } - } + return true } + + parser.Fix100Continue(m) } return true } +func (parser *MessageParser) Fix100Continue(m *Message) { + if state, ok := m.feedback.(*proto.HTTPState); ok && state.Continue100 { + delete(parser.m, m.packets[0].MessageID()) + + // Shift Ack by given offset + // Size of "HTTP/1.1 100 Continue\r\n\r\n" message + for _, p := range m.packets { + p.messageID = 0 + p.Ack += 25 + } + + // If next section was aready approved and received, merge messages + if next, found := parser.m[m.packets[0].MessageID()]; found { + for _, p := range next.packets { + parser.addPacket(m, p) + } + } + + // Re-add (or override) again with new message and ID + parser.m[m.packets[0].MessageID()] = m + } +} + func (parser *MessageParser) Read() *Message { m := <-parser.messages return m