diff --git a/capture/capture.go b/capture/capture.go index d155f43c..c6dfd4f0 100644 --- a/capture/capture.go +++ b/capture/capture.go @@ -397,10 +397,12 @@ func (l *Listener) read() { return case <-timer.C: if h, ok := hndl.handler.(PcapStatProvider); ok { - s, _ := h.Stats() - stats.Add("packets_received", int64(s.PacketsReceived)) - stats.Add("packets_dropped", int64(s.PacketsDropped)) - stats.Add("packets_if_dropped", int64(s.PacketsIfDropped)) + s, err := h.Stats() + if err == nil { + stats.Add("packets_received", int64(s.PacketsReceived)) + stats.Add("packets_dropped", int64(s.PacketsDropped)) + stats.Add("packets_if_dropped", int64(s.PacketsIfDropped)) + } } default: data, ci, err := hndl.handler.ReadPacketData() @@ -520,6 +522,9 @@ func (l *Listener) activatePcapFile() (err error) { handle.Close() return fmt.Errorf("BPF filter error: %q, filter: %s", e, l.BPFFilter) } + + fmt.Println("BPF Filter:", l.BPFFilter) + l.Handles["pcap_file"] = packetHandle{ handler: handle, } diff --git a/output_file.go b/output_file.go index 5d33cb0b..c8dd5769 100644 --- a/output_file.go +++ b/output_file.go @@ -193,7 +193,6 @@ func (o *FileOutput) filename() string { if nextChunk { fileIndex++ - o.currentFileSize = 0 } } @@ -307,6 +306,8 @@ func (o *FileOutput) closeLocked() error { } o.closed = true + o.currentFileSize = 0 + return nil } diff --git a/proto/proto.go b/proto/proto.go index 5f51d1c5..6f9a11b3 100644 --- a/proto/proto.go +++ b/proto/proto.go @@ -77,9 +77,12 @@ func header(payload []byte, name []byte) (value []byte, headerStart, headerEnd, headerEnd += headerStart colonIndex = bytes.IndexByte(payload[headerStart:headerEnd], ':') if colonIndex == -1 { - break + // Malformed header, skip, most likely packet with partial headers + headerStart = headerEnd + 1 + continue } colonIndex += headerStart + if bytes.EqualFold(payload[headerStart:colonIndex], name) { valueStart = colonIndex + 1 valueEnd = headerEnd - 2 @@ -448,34 +451,36 @@ type ProtocolStateSetter interface { ProtocolState() interface{} } -type httpProto struct { - body int // body index - headerStart int - headerParsed bool // we checked necessary headers - hasFullBody bool // all chunks has been parsed - isChunked bool // Transfer-Encoding: chunked - bodyLen int // Content-Length's value - hasTrailer bool // Trailer header? +type HTTPState struct { + Body int // body index + HeaderStart int + HeaderEnd int + HeaderParsed bool // we checked necessary headers + HasFullPayload bool // all chunks has been parsed + IsChunked bool // Transfer-Encoding: chunked + BodyLen int // Content-Length's value + HasTrailer bool // Trailer header? + Continue100 bool } // HasFullPayload checks if this message has full or valid payloads and returns true. // Message param is optional but recommended on cases where 'data' is storing // partial-to-full stream of bytes(packets). func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { - var state *httpProto + var state *HTTPState if m != nil { - state, _ = m.ProtocolState().(*httpProto) + state, _ = m.ProtocolState().(*HTTPState) } if state == nil { - state = new(httpProto) + state = new(HTTPState) if m != nil { m.SetProtocolState(state) } } - if state.headerStart < 1 { + if state.HeaderStart < 1 { for _, data := range payloads { - state.headerStart = MIMEHeadersStartPos(data) - if state.headerStart < 0 { + state.HeaderStart = MIMEHeadersStartPos(data) + if state.HeaderStart < 0 { return false } else { break @@ -483,7 +488,7 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { } } - if state.body < 1 { + if state.Body < 1 || state.HeaderEnd < 1 { var pos int for _, data := range payloads { endPos := MIMEHeadersEndPos(data) @@ -491,32 +496,42 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { pos += len(data) } else { pos += endPos + state.HeaderEnd = pos } if endPos > 0 { - state.body = pos + state.Body = pos break } } } - if !state.headerParsed { + + if state.HeaderEnd < 1 { + return false + } + + if !state.HeaderParsed { var pos int for _, data := range payloads { chunked := Header(data, []byte("Transfer-Encoding")) if len(chunked) > 0 && bytes.Index(data, []byte("chunked")) > 0 { - state.isChunked = true + state.IsChunked = true // trailers are generally not allowed in non-chunks body - state.hasTrailer = len(Header(data, []byte("Trailer"))) > 0 + state.HasTrailer = len(Header(data, []byte("Trailer"))) > 0 } else { contentLen := Header(data, []byte("Content-Length")) - state.bodyLen, _ = atoI(contentLen, 10) + state.BodyLen, _ = atoI(contentLen, 10) } pos += len(data) - if state.bodyLen > 0 || pos >= state.body { - state.headerParsed = true + if string(Header(data, []byte("Expect"))) == "100-continue" { + state.Continue100 = true + } + + if state.BodyLen > 0 || pos >= state.Body { + state.HeaderParsed = true break } } @@ -526,22 +541,22 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { for _, data := range payloads { bodyLen += len(data) } - bodyLen -= state.body + bodyLen -= state.Body - if state.isChunked { + if state.IsChunked { // check chunks if bodyLen < 1 { return false } // check trailer headers - if state.hasTrailer { + if state.HasTrailer { if bytes.HasSuffix(payloads[len(payloads)-1], []byte("\r\n\r\n")) { return true } } else { if bytes.HasSuffix(payloads[len(payloads)-1], []byte("0\r\n\r\n")) { - state.hasFullBody = true + state.HasFullPayload = true return true } } @@ -550,7 +565,7 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool { } // check for content-length header - return state.bodyLen == bodyLen + return state.BodyLen == bodyLen } // this works with positive integers diff --git a/proto/proto_test.go b/proto/proto_test.go index 111bedcf..5466fa3f 100644 --- a/proto/proto_test.go +++ b/proto/proto_test.go @@ -56,6 +56,12 @@ func TestHeader(t *testing.T) { if val = Header(payload, []byte("host")); !bytes.Equal(val, []byte("www.w3.org")) { t.Error("Should find lower case 1 word header") } + + payload = []byte("GT\r\nContent-Length: 10\r\n\r\n") + + if val = Header(payload, []byte("Content-Length")); !bytes.Equal(val, []byte("10")) { + t.Error("Should find in partial payload") + } } func TestMIMEHeadersEndPos(t *testing.T) { diff --git a/tcp/tcp_message.go b/tcp/tcp_message.go index 614b5413..9f0b12db 100644 --- a/tcp/tcp_message.go +++ b/tcp/tcp_message.go @@ -10,6 +10,8 @@ import ( "sync" "time" "unsafe" + + "github.com/buger/goreplay/proto" ) // TCPProtocol is a number to indicate type of protocol @@ -168,6 +170,13 @@ func (m *Message) Data() []byte { tmp, _ = copySlice(tmp, len(packetData[0]), packetData[1:]...) } + // Remove Expect header, since its replay not fully supported + if state, ok := m.feedback.(*proto.HTTPState); ok { + if state.Continue100 { + tmp = proto.DeleteHeader(tmp, []byte("Expect")) + } + } + return tmp } @@ -281,7 +290,9 @@ func (parser *MessageParser) wait(index int) { func (parser *MessageParser) parsePacket(pcapPkt *PcapPacket) *Packet { pckt, err := ParsePacket(pcapPkt.Data, pcapPkt.LType, pcapPkt.LTypeLen, pcapPkt.Ci, false) if err != nil { - stats.Add("packet_error", 1) + if _, empty := err.(EmptyPacket); !empty { + stats.Add("packet_error", 1) + } return nil } @@ -373,12 +384,38 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) bool { if parser.End != nil { if parser.End(m) { parser.Emit(m) + return true } + + parser.Fix100Continue(m) } return true } +func (parser *MessageParser) Fix100Continue(m *Message) { + if state, ok := m.feedback.(*proto.HTTPState); ok && state.Continue100 { + delete(parser.m[m.Idx], m.packets[0].MessageID()) + + // Shift Ack by given offset + // Size of "HTTP/1.1 100 Continue\r\n\r\n" message + for _, p := range m.packets { + p.messageID = 0 + p.Ack += 25 + } + + // If next section was aready approved and received, merge messages + if next, found := parser.m[m.Idx][m.packets[0].MessageID()]; found { + for _, p := range next.packets { + parser.addPacket(m, p) + } + } + + // Re-add (or override) again with new message and ID + parser.m[m.Idx][m.packets[0].MessageID()] = m + } +} + func (parser *MessageParser) Read() *Message { m := <-parser.messages return m diff --git a/tcp/tcp_packet.go b/tcp/tcp_packet.go index c7f76416..ac833e7e 100644 --- a/tcp/tcp_packet.go +++ b/tcp/tcp_packet.go @@ -169,7 +169,16 @@ func (pckt *Packet) parse(data []byte, lType, lTypeLen int, cp *gopacket.Capture return ErrHdrLength("TCP opts") } - if !allowEmpty && len(ndata[dOf:]) == 0 { + // There are case when packet have padding but dOf shows its not + empty := true + for i := 0; i < len(ndata[dOf:]); i++ { + if ndata[dOf:][i] != 0 { + empty = false + break + } + } + + if !allowEmpty && empty { return EmptyPacket("") } diff --git a/tcp/tcp_test.go b/tcp/tcp_test.go index 8f063a5d..4dfd3d3c 100644 --- a/tcp/tcp_test.go +++ b/tcp/tcp_test.go @@ -217,7 +217,9 @@ func TestMessageTimeoutReached(t *testing.T) { packets := GetPackets(true, 1, 2, data[:]) p := NewMessageParser(nil, nil, nil, 10*time.Millisecond, true) p.processPacket(packets[0]) - time.Sleep(time.Second * 2) + + time.Sleep(time.Millisecond * 200) + p.processPacket(packets[1]) m := p.Read() if m.Length != 63<<10 {