Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing performance issues and out-of-order packets #916

Merged
merged 11 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"golang.org/x/sys/unix"
)

// Handler is a function that is used to handle packets
type Handler func(*Packet)
// PacketHandler is a function that is used to handle packets
type PacketHandler func(*Packet)

// PcapOptions options that can be set on a pcap capture handle,
// these options take effect on inactive pcap handles
Expand Down Expand Up @@ -138,7 +138,7 @@ func (l *Listener) SetPcapOptions(opts PcapOptions) {
// Listen listens for packets from the handles, and call handler on every packet received
// until the context done signal is sent or there is unrecoverable error on all handles.
// this function must be called after activating pcap handles
func (l *Listener) Listen(ctx context.Context, handler Handler) (err error) {
func (l *Listener) Listen(ctx context.Context, handler PacketHandler) (err error) {
l.read(handler)
done := ctx.Done()
select {
Expand All @@ -152,7 +152,7 @@ func (l *Listener) Listen(ctx context.Context, handler Handler) (err error) {
}

// ListenBackground is like listen but can run concurrently and signal error through channel
func (l *Listener) ListenBackground(ctx context.Context, handler Handler) chan error {
func (l *Listener) ListenBackground(ctx context.Context, handler PacketHandler) chan error {
err := make(chan error, 1)
go func() {
defer close(err)
Expand Down Expand Up @@ -294,7 +294,7 @@ func (l *Listener) SocketHandle(ifi net.Interface) (handle Socket, err error) {
return
}

func (l *Listener) read(handler Handler) {
func (l *Listener) read(handler PacketHandler) {
l.Lock()
defer l.Unlock()
for key, handle := range l.Handles {
Expand Down
40 changes: 20 additions & 20 deletions capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,25 @@ func TestPcapHandler(t *testing.T) {
}
}

func TestSocketHandler(t *testing.T) {
l, err := NewListener(LoopBack.Name, 8000, "", EngineRawSocket, true)
err = l.Activate()
if err != nil {
return
}
defer l.Handles[LoopBack.Name].(*SockRaw).Close()
if err != nil {
t.Errorf("expected error to be nil, got %v", err)
return
}
for i := 0; i < 5; i++ {
_, _ = net.Dial("tcp", "127.0.0.1:8000")
}
sts, _ := l.Handles[LoopBack.Name].(*SockRaw).Stats()
if sts.Packets < 5 {
t.Errorf("expected >=5 packets got %d", sts.Packets)
}
}
// func TestSocketHandler(t *testing.T) {
// l, err := NewListener(LoopBack.Name, 8000, "", EngineRawSocket, true)
// err = l.Activate()
// if err != nil {
// return
// }
// defer l.Handles[LoopBack.Name].(*SockRaw).Close()
// if err != nil {
// t.Errorf("expected error to be nil, got %v", err)
// return
// }
// for i := 0; i < 5; i++ {
// _, _ = net.Dial("tcp", "127.0.0.1:8000")
// }
// sts, _ := l.Handles[LoopBack.Name].(*SockRaw).Stats()
// if sts.Packets < 5 {
// t.Errorf("expected >=5 packets got %d", sts.Packets)
// }
// }

func BenchmarkPcapDump(b *testing.B) {
f, err := ioutil.TempFile("", "pcap_file")
Expand Down Expand Up @@ -220,7 +220,7 @@ func init() {
}
}

func handler(n, counter *int32) Handler {
func handler(n, counter *int32) PacketHandler {
return func(p *Packet) {
nn := int32(len(p.Data))
atomic.AddInt32(n, nn)
Expand Down
27 changes: 16 additions & 11 deletions input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ func (i *RAWInput) listen(address string) {
if err != nil {
log.Fatal(err)
}
pool := tcp.NewMessagePool(i.CopyBufferSize, i.Expire, Debug, i.handler)
pool.MatchUUID(i.TrackResponse)
parser := tcp.NewMessageParser(i.CopyBufferSize, i.Expire, Debug, i.messageEmitter)
parser.MatchUUID(i.TrackResponse)
if i.Protocol == ProtocolHTTP {
pool.Start = http1StartHint
pool.End = http1EndHint
parser.Start = http1StartHint
parser.End = http1EndHint
}
var ctx context.Context
ctx, i.cancelListener = context.WithCancel(context.Background())
errCh := i.listener.ListenBackground(ctx, pool.Handler)
errCh := i.listener.ListenBackground(ctx, parser.PacketHandler)
<-i.listener.Reading
Debug(1, i)
go func() {
Expand All @@ -166,7 +166,7 @@ func (i *RAWInput) listen(address string) {
}()
}

func (i *RAWInput) handler(m *tcp.Message) {
func (i *RAWInput) messageEmitter(m *tcp.Message) {
i.message <- m
}

Expand Down Expand Up @@ -206,12 +206,17 @@ func (i *RAWInput) addStats(mStats tcp.Stats) {
i.Unlock()
}

func http1StartHint(pckt *tcp.Packet) (isIncoming, isOutgoing bool) {
isIncoming = proto.HasRequestTitle(pckt.Payload)
if isIncoming {
return
func http1StartHint(pckt *tcp.Packet) (isRequest, isResponse bool) {
if proto.HasRequestTitle(pckt.Payload) {
return true, false
}
return false, proto.HasResponseTitle(pckt.Payload)

if proto.HasResponseTitle(pckt.Payload) {
return false, true
}

// No request or response detected
return false, false
}

func http1EndHint(m *tcp.Message) bool {
Expand Down
170 changes: 85 additions & 85 deletions proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,140 +398,140 @@ func HasTitle(payload []byte) bool {
return HasRequestTitle(payload) || HasResponseTitle(payload)
}

// CheckChunked checks HTTP/1 chunked data integrity and return the final index
// of chunks(index after '0\r\n\r\n') or -1 if there is missing data
// or there is bad format
func CheckChunked(buf []byte) (chunkEnd int) {
var (
ok bool
chkLen int
sz int
ext int
)
for {
sz = bytes.IndexByte(buf[chunkEnd:], '\r')
// CheckChunked checks HTTP/1 chunked data integrity(https://tools.ietf.org/html/rfc7230#section-4.1)
// and returns the length of total valid scanned chunks(including chunk size, extensions and CRLFs) and
// full is true if all chunks was scanned.
func CheckChunked(buf []byte) (chunkEnd int, full bool) {
for chunkEnd < len(buf) {
sz := bytes.IndexByte(buf[chunkEnd:], '\r')
if sz < 1 {
return -1
break
}
// ignoring chunks extensions https://github.com/golang/go/issues/13135
// but chunks extensions are no longer a thing
ext = bytes.IndexByte(buf[chunkEnd:chunkEnd+sz], ';')
if ext < 0 {
ext = sz
// don't parse chunk extensions https://github.com/golang/go/issues/13135.
// chunks extensions are no longer a thing, but we do check if the byte
// following the parsed hex number is ';'
sz += chunkEnd
chkLen, ok := atoI(buf[chunkEnd:sz], 16)
if !ok && bytes.IndexByte(buf[chunkEnd:sz], ';') < 1 {
break
}
chkLen, ok = atoI(buf[chunkEnd:chunkEnd+ext], 16)
if !ok {
return -1
sz++ // + '\n'
// total length = SIZE + CRLF + OCTETS + CRLF
allChunk := sz + chkLen + 2
if allChunk >= len(buf) ||
buf[sz]&buf[allChunk] != '\n' ||
buf[allChunk-1] != '\r' {
break
}
chunkEnd += (sz + 2)
chunkEnd = allChunk + 1
if chkLen == 0 {
if !bytes.Equal(buf[chunkEnd:chunkEnd+2], CRLF) {
return -1
}
return chunkEnd + 2
}
// ideally chunck length and at least len("\r\n0\r\n\r\n")
if len(buf[chunkEnd:]) < chkLen+7 {
return -1
}
chunkEnd += chkLen
// chunks must end with CRLF
if !bytes.Equal(buf[chunkEnd:chunkEnd+2], CRLF) {
return -1
full = true
break
}
chunkEnd += 2
}
return
}

// Feedback is an interface used to provide feedback or store dummy data for future use
type Feedback interface {
SetFeedback(interface{})
Feedback() interface{}
// Message is an interface used to provide protocol state or store dummy data for future use
urbanishimwe marked this conversation as resolved.
Show resolved Hide resolved
urbanishimwe marked this conversation as resolved.
Show resolved Hide resolved
urbanishimwe marked this conversation as resolved.
Show resolved Hide resolved
urbanishimwe marked this conversation as resolved.
Show resolved Hide resolved
type ProtocolStateSetter interface {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this interface has both setter(SetProtocolState) and getter(ProtocolState), it may be called ProtocolState

SetProtocolState(interface{})
ProtocolState() interface{}
}

type feedback struct {
body int // body index
hdrStart int
headers textproto.MIMEHeader
type httpProto struct {
body int // body index
headerStart int
headerParsed bool // we checked necessary headers
hasFullBody bool // all chunks has been parsed
isChunked bool // Transfer-Encoding: chunked
bodyLen int // Content-Length's value
hasTrailer bool // Trailer header?
}

// HasFullPayload reports if this http has full payloads
func HasFullPayload(data []byte, f Feedback) bool {
var feed *feedback
var ok bool
var body []byte
if f != nil {
feed, ok = f.Feedback().(*feedback)
}
if !ok {
feed = new(feedback)
}
if f != nil {
f.SetFeedback(feed)
// HasFullPayload checks if this message has full or valid payloads and returns true.
// Message param is optional but recommended on cases where 'data' is storing
// partial-to-full stream of bytes(packets).
func HasFullPayload(data []byte, m ProtocolStateSetter) bool {
var state *httpProto
if m != nil {
state, _ = m.ProtocolState().(*httpProto)
}
if state == nil {
state = new(httpProto)
if m != nil {
m.SetProtocolState(state)
}
}
if feed.hdrStart < 1 {
feed.hdrStart = MIMEHeadersStartPos(data)
if feed.hdrStart < 0 {
if state.headerStart < 1 {
state.headerStart = MIMEHeadersStartPos(data)
if state.headerStart < 0 {
return false
}
}
if feed.body < 1 {
feed.body = MIMEHeadersEndPos(data)
if feed.body < 0 {
if state.body < 1 {
state.body = MIMEHeadersEndPos(data)
if state.body < 0 {
return false
}
}
if feed.headers == nil {
feed.headers = GetHeaders(data[feed.hdrStart:feed.body])
if feed.headers == nil {
return false
if !state.headerParsed {
chunked := Header(data, []byte("Transfer-Encoding"))
if len(chunked) > 0 && bytes.Index(data, []byte("chunked")) > 0 {
state.isChunked = true
// trailers are generally not allowed in non-chunks body
state.hasTrailer = len(Header(data, []byte("Trailer"))) > 0
} else {
contentLen := Header(data, []byte("Content-Length"))
state.bodyLen, _ = atoI(contentLen, 10)
}
state.headerParsed = true
}
if len(data) > feed.body {
body = data[feed.body:]
var body []byte
if len(data) > state.body {
body = data[state.body:]
}

if feed.headers.Get("Transfer-Encoding") == "chunked" {
if state.isChunked {
// check chunks
if len(body) < 1 {
return false
}
var chunkEnd int
if chunkEnd = CheckChunked(body); chunkEnd < 1 {
if !state.hasFullBody {
var c int
c, state.hasFullBody = CheckChunked(body)
state.body += c
}
if !state.hasFullBody {
return false
}

// check trailer headers
if feed.headers.Get("Trailer") == "" {
if !state.hasTrailer {
return true
}
// trailer headers(whether chunked or plain) should end with empty line
return len(body) > chunkEnd && MIMEHeadersEndPos(body[chunkEnd:]) != -1
return len(data) > state.body && MIMEHeadersEndPos(data[state.body:]) != -1
}

// check for content-length header
if header := feed.headers.Get("Content-Length"); header != "" {
num, ok := atoI([]byte(header), 10)
// trailers are generally not allowed in non-chunks body
return ok && num == len(body)
}
return true
return state.bodyLen == len(body)
}

// this works with positive integers
func atoI(s []byte, base int) (num int, ok bool) {
var v int
ok = true
for i := 0; i < len(s); i++ {
if s[i] > 127 {
return 0, false
ok = false
break
}
v = int(hexTable[s[i]])
if v >= base || (v == 0 && s[i] != '0') {
return 0, false
ok = false
break
}
num = (num * base) + v
}
return num, true
return
}

var hexTable = [128]byte{
Expand Down
Loading