Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing messages fixes #1007

Merged
merged 5 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,12 @@ func (l *Listener) read() {
return
case <-timer.C:
if h, ok := hndl.handler.(PcapStatProvider); ok {
s, _ := h.Stats()
stats.Add("packets_received", int64(s.PacketsReceived))
stats.Add("packets_dropped", int64(s.PacketsDropped))
stats.Add("packets_if_dropped", int64(s.PacketsIfDropped))
s, err := h.Stats()
if err == nil {
stats.Add("packets_received", int64(s.PacketsReceived))
stats.Add("packets_dropped", int64(s.PacketsDropped))
stats.Add("packets_if_dropped", int64(s.PacketsIfDropped))
}
}
default:
data, ci, err := hndl.handler.ReadPacketData()
Expand Down Expand Up @@ -520,6 +522,9 @@ func (l *Listener) activatePcapFile() (err error) {
handle.Close()
return fmt.Errorf("BPF filter error: %q, filter: %s", e, l.BPFFilter)
}

fmt.Println("BPF Filter:", l.BPFFilter)

l.Handles["pcap_file"] = packetHandle{
handler: handle,
}
Expand Down
3 changes: 2 additions & 1 deletion output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ func (o *FileOutput) filename() string {

if nextChunk {
fileIndex++
o.currentFileSize = 0
}
}

Expand Down Expand Up @@ -309,6 +308,8 @@ func (o *FileOutput) closeLocked() error {
}

o.closed = true
o.currentFileSize = 0

return nil
}

Expand Down
71 changes: 43 additions & 28 deletions proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ func header(payload []byte, name []byte) (value []byte, headerStart, headerEnd,
headerEnd += headerStart
colonIndex = bytes.IndexByte(payload[headerStart:headerEnd], ':')
if colonIndex == -1 {
break
// Malformed header, skip, most likely packet with partial headers
headerStart = headerEnd + 1
continue
}
colonIndex += headerStart

if bytes.EqualFold(payload[headerStart:colonIndex], name) {
valueStart = colonIndex + 1
valueEnd = headerEnd - 2
Expand Down Expand Up @@ -448,75 +451,87 @@ type ProtocolStateSetter interface {
ProtocolState() interface{}
}

type httpProto struct {
body int // body index
headerStart int
headerParsed bool // we checked necessary headers
hasFullBody bool // all chunks has been parsed
isChunked bool // Transfer-Encoding: chunked
bodyLen int // Content-Length's value
hasTrailer bool // Trailer header?
type HTTPState struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type HTTPState should have comment or be unexported

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type HTTPState should have comment or be unexported

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type HTTPState should have comment or be unexported

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type HTTPState should have comment or be unexported

Body int // body index
HeaderStart int
HeaderEnd int
HeaderParsed bool // we checked necessary headers
HasFullPayload bool // all chunks has been parsed
IsChunked bool // Transfer-Encoding: chunked
BodyLen int // Content-Length's value
HasTrailer bool // Trailer header?
Continue100 bool
}

// HasFullPayload checks if this message has full or valid payloads and returns true.
// Message param is optional but recommended on cases where 'data' is storing
// partial-to-full stream of bytes(packets).
func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool {
var state *httpProto
var state *HTTPState
if m != nil {
state, _ = m.ProtocolState().(*httpProto)
state, _ = m.ProtocolState().(*HTTPState)
}
if state == nil {
state = new(httpProto)
state = new(HTTPState)
if m != nil {
m.SetProtocolState(state)
}
}
if state.headerStart < 1 {
if state.HeaderStart < 1 {
for _, data := range payloads {
state.headerStart = MIMEHeadersStartPos(data)
if state.headerStart < 0 {
state.HeaderStart = MIMEHeadersStartPos(data)
if state.HeaderStart < 0 {
return false
} else {
break
}
}
}

if state.body < 1 {
if state.Body < 1 || state.HeaderEnd < 1 {
var pos int
for _, data := range payloads {
endPos := MIMEHeadersEndPos(data)
if endPos < 0 {
pos += len(data)
} else {
pos += endPos
state.HeaderEnd = pos
}

if endPos > 0 {
state.body = pos
state.Body = pos
break
}
}
}
if !state.headerParsed {

if state.HeaderEnd < 1 {
return false
}

if !state.HeaderParsed {
var pos int
for _, data := range payloads {
chunked := Header(data, []byte("Transfer-Encoding"))

if len(chunked) > 0 && bytes.Index(data, []byte("chunked")) > 0 {
state.isChunked = true
state.IsChunked = true
// trailers are generally not allowed in non-chunks body
state.hasTrailer = len(Header(data, []byte("Trailer"))) > 0
state.HasTrailer = len(Header(data, []byte("Trailer"))) > 0
} else {
contentLen := Header(data, []byte("Content-Length"))
state.bodyLen, _ = atoI(contentLen, 10)
state.BodyLen, _ = atoI(contentLen, 10)
}

pos += len(data)

if state.bodyLen > 0 || pos >= state.body {
state.headerParsed = true
if string(Header(data, []byte("Expect"))) == "100-continue" {
state.Continue100 = true
}

if state.BodyLen > 0 || pos >= state.Body {
state.HeaderParsed = true
break
}
}
Expand All @@ -526,22 +541,22 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool {
for _, data := range payloads {
bodyLen += len(data)
}
bodyLen -= state.body
bodyLen -= state.Body

if state.isChunked {
if state.IsChunked {
// check chunks
if bodyLen < 1 {
return false
}

// check trailer headers
if state.hasTrailer {
if state.HasTrailer {
if bytes.HasSuffix(payloads[len(payloads)-1], []byte("\r\n\r\n")) {
return true
}
} else {
if bytes.HasSuffix(payloads[len(payloads)-1], []byte("0\r\n\r\n")) {
state.hasFullBody = true
state.HasFullPayload = true
return true
}
}
Expand All @@ -550,7 +565,7 @@ func HasFullPayload(m ProtocolStateSetter, payloads ...[]byte) bool {
}

// check for content-length header
return state.bodyLen == bodyLen
return state.BodyLen == bodyLen
}

// this works with positive integers
Expand Down
6 changes: 6 additions & 0 deletions proto/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func TestHeader(t *testing.T) {
if val = Header(payload, []byte("host")); !bytes.Equal(val, []byte("www.w3.org")) {
t.Error("Should find lower case 1 word header")
}

payload = []byte("GT\r\nContent-Length: 10\r\n\r\n")

if val = Header(payload, []byte("Content-Length")); !bytes.Equal(val, []byte("10")) {
t.Error("Should find in partial payload")
}
}

func TestMIMEHeadersEndPos(t *testing.T) {
Expand Down
39 changes: 38 additions & 1 deletion tcp/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sort"
"time"
"unsafe"

"github.com/buger/goreplay/proto"
)

// TCPProtocol is a number to indicate type of protocol
Expand Down Expand Up @@ -166,6 +168,13 @@ func (m *Message) Data() []byte {
tmp, _ = copySlice(tmp, len(packetData[0]), packetData[1:]...)
}

// Remove Expect header, since its replay not fully supported
if state, ok := m.feedback.(*proto.HTTPState); ok {
if state.Continue100 {
tmp = proto.DeleteHeader(tmp, []byte("Expect"))
}
}

return tmp
}

Expand Down Expand Up @@ -272,7 +281,9 @@ func (parser *MessageParser) wait() {
func (parser *MessageParser) parsePacket(pcapPkt *PcapPacket) *Packet {
pckt, err := ParsePacket(pcapPkt.Data, pcapPkt.LType, pcapPkt.LTypeLen, pcapPkt.Ci, false)
if err != nil {
stats.Add("packet_error", 1)
if _, empty := err.(EmptyPacket); !empty {
stats.Add("packet_error", 1)
}
return nil
}

Expand Down Expand Up @@ -342,12 +353,38 @@ func (parser *MessageParser) addPacket(m *Message, pckt *Packet) bool {
if parser.End != nil {
if parser.End(m) {
parser.Emit(m)
return true
}

parser.Fix100Continue(m)
}

return true
}

func (parser *MessageParser) Fix100Continue(m *Message) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method MessageParser.Fix100Continue should have comment or be unexported

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method MessageParser.Fix100Continue should have comment or be unexported

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method MessageParser.Fix100Continue should have comment or be unexported

if state, ok := m.feedback.(*proto.HTTPState); ok && state.Continue100 {
delete(parser.m, m.packets[0].MessageID())

// Shift Ack by given offset
// Size of "HTTP/1.1 100 Continue\r\n\r\n" message
for _, p := range m.packets {
p.messageID = 0
p.Ack += 25
}

// If next section was aready approved and received, merge messages
if next, found := parser.m[m.packets[0].MessageID()]; found {
for _, p := range next.packets {
parser.addPacket(m, p)
}
}

// Re-add (or override) again with new message and ID
parser.m[m.packets[0].MessageID()] = m
}
}

func (parser *MessageParser) Read() *Message {
m := <-parser.messages
return m
Expand Down
11 changes: 10 additions & 1 deletion tcp/tcp_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,16 @@ func (pckt *Packet) parse(data []byte, lType, lTypeLen int, cp *gopacket.Capture
return ErrHdrLength("TCP opts")
}

if !allowEmpty && len(ndata[dOf:]) == 0 {
// There are case when packet have padding but dOf shows its not
empty := true
for i := 0; i < len(ndata[dOf:]); i++ {
if ndata[dOf:][i] != 0 {
empty = false
break
}
}

if !allowEmpty && empty {
return EmptyPacket("")
}

Expand Down
2 changes: 1 addition & 1 deletion tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestMessageTimeoutReached(t *testing.T) {
packets := GetPackets(true, 1, 2, data[:])
p := NewMessageParser(nil, nil, nil, 10*time.Millisecond, true)
p.processPacket(packets[0])
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 200)
p.processPacket(packets[1])
m := p.Read()
if m.Length != 63<<10 {
Expand Down