-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing performance issues and out-of-order packets #916
Conversation
Since there are a lot of mutex usage now, maybe think about using https://github.com/cornelk/hashmap instead. See the difference:
|
The main thing to ensure here is that packet "capture" happens in one thread, and packet "processing" in another. |
00a86fd
to
6ea7244
Compare
} | ||
} | ||
|
||
func (parser *MessageParser) Close() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method MessageParser.Close should have comment or be unexported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this naming makes more sense, thanks!
Too smart, and can cause to unexpected errors.
// Message is an interface used to provide feedback or store dummy data for future use | ||
type Message interface { | ||
// Message is an interface used to provide protocol state or store dummy data for future use | ||
type ProtocolStateSetter interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this interface has both setter(SetProtocolState
) and getter(ProtocolState
), it may be called ProtocolState
Should have way better performance, and can fix a few bugs
proto/proto.go
Outdated
@@ -19,6 +19,7 @@ package proto | |||
import ( | |||
"bufio" | |||
"bytes" | |||
_ "fmt" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a blank import should be only in a main or test package, or have a comment justifying it
@@ -129,3 +143,7 @@ func (o *TCPOutput) connect(address string) (conn net.Conn, err error) { | |||
func (o *TCPOutput) String() string { | |||
return fmt.Sprintf("TCP output %s, limit: %d", o.address, o.limit) | |||
} | |||
|
|||
func (o *TCPOutput) Close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method TCPOutput.Close should have comment or be unexported
state.headerStart = MIMEHeadersStartPos(data) | ||
if state.headerStart < 0 { | ||
return false | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if block ends with a return statement, so drop this else and outdent its block
@@ -129,3 +143,7 @@ func (o *TCPOutput) connect(address string) (conn net.Conn, err error) { | |||
func (o *TCPOutput) String() string { | |||
return fmt.Sprintf("TCP output %s, limit: %d", o.address, o.limit) | |||
} | |||
|
|||
func (o *TCPOutput) Close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method TCPOutput.Close should have comment or be unexported
state.headerStart = MIMEHeadersStartPos(data) | ||
if state.headerStart < 0 { | ||
return false | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if block ends with a return statement, so drop this else and outdent its block
parser.Emit(m) | ||
} | ||
|
||
func (parser *MessageParser) Emit(m *Message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method MessageParser.Emit should have comment or be unexported
@@ -117,171 +157,176 @@ func (m *Message) Sort() { | |||
sort.SliceStable(m.packets, func(i, j int) bool { return m.packets[i].Seq < m.packets[j].Seq }) | |||
} | |||
|
|||
// Handler message handler | |||
type Handler func(*Message) | |||
func (m *Message) Finalize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Message.Finalize should have comment or be unexported
return false | ||
} | ||
|
||
func (m *Message) PacketData() [][]byte { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Message.PacketData should have comment or be unexported
} | ||
|
||
// Packets returns packets of the message | ||
func (m *Message) Packets() []*Packet { | ||
return m.packets | ||
} | ||
|
||
func (m *Message) MissingChunk() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Message.MissingChunk should have comment or be unexported
return | ||
} | ||
|
||
func (pckt *Packet) MessageID() uint64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method Packet.MessageID should have comment or be unexported
### Reducing CPU context switching and number of goroutines. Packet capture and packet processing now use only two goroutines which helps to minimize CPU context switches. Spawning too many goroutines is harmful here. ### Optimized packet capture - allocated memory only when required, and only for data which is used Using ZeroCopy methods from libpcap library to avoid unnecessary allocations. Now memory gets allocated ONLY for the valid packets, and only for the packets which have the data. E.g. no SYN/FIN packets are used now. Additionally we now use `sync.Pool` for re-using packet objects, which helps to re-use already allocated memory. ### Simplification and optimization of request/response detection There is no SYN/FIN packets anymore etc. Now only packet payload is used to detect start and end of the packet. More over payload detection now does not require generating a total “message” buffer, and works with individual packet payloads. Message payloads now concatenated from packets only in the end when message is dispatched. Also, before checking if message is complete, added additional check if all received packets in the valid order, e.g. if their SEQ is valid, and no packets are missing. Reworked chunked encoding validation, and now it does not need expensive operation of re-calculating all the chunks. Now it “trust” that client gives valid chunk body, check if packets are in the right order (e.g. SEQ match), and checks if message ends with the right suffix. All is done with 0 allocations. Parsing all Headers using `proto.GetHeaders` was proved to be very slow. Now we only parse the headers we need(and do it only once). Packets gets matched together using ACK, which on high RPS removed chances of duplicating IDs. Additionally, even if packets are received out of order, now it will properly sort them, before dispatching the message. ### Changes in ID generation algorithm Message ID generation and relations between request and response IDs is fully rewritten. Responses now do not have to lookup for request data in order to get the same ID. ID no rely on the fact that SEQ of the first packet of the response should be the same as ACK of the request. If previously Message ID contained random values, like current timestamp, now it has a consistent algorithm which is based on TCP stream id (SrcPort + DstPort + SrcIP/DstIP) and current ACK/SEQ number (to distinguish multiple messages within the same stream). ### BPF filter optimizations When tracking response it now uses a more accurate BPF rule to filter only needed traffic. ### Misc The packet code is now fully moved to tcp/Packet, so packet processing done only once in one place. TCP output now has a 5 second timeout, and has a proper Close method. Fully switching to go modules and removing vendoring.
Reducing CPU context switching and number of goroutines.
Packet capture and packet processing now use only two goroutines which helps to minimize CPU context switches. Spawning too many goroutines is harmful here.
Optimized packet capture - allocated memory only when required, and only for data which is used
Using ZeroCopy methods from libpcap library to avoid unnecessary allocations. Now memory gets allocated ONLY for the valid packets, and only for the packets which have the data. E.g. no SYN/FIN packets are used now. Additionally we now use
sync.Pool
for re-using packet objects, which helps to re-use already allocated memory.Simplification and optimization of request/response detection
There is no SYN/FIN packets anymore etc. Now only packet payload is used to detect start and end of the packet. More over payload detection now does not require generating a total “message” buffer, and works with individual packet payloads.
Message payloads now concatenated from packets only in the end when message is dispatched. Also, before checking if message is complete, added additional check if all received packets in the valid order, e.g. if their SEQ is valid, and no packets are missing.
Reworked chunked encoding validation, and now it does not need expensive operation of re-calculating all the chunks. Now it “trust” that client gives valid chunk body, check if packets are in the right order (e.g. SEQ match), and checks if message ends with the right suffix. All is done with 0 allocations.
Parsing all Headers using
proto.GetHeaders
was proved to be very slow. Now we only parse the headers we need(and do it only once).Packets gets matched together using ACK, which on high RPS removed chances of duplicating IDs. Additionally, even if packets are received out of order, now it will properly sort them, before dispatching the message.
Changes in ID generation algorithm
Message ID generation and relations between request and response IDs is fully rewritten. Responses now do not have to lookup for request data in order to get the same ID. ID no rely on the fact that SEQ of the first packet of the response should be the same as ACK of the request. If previously Message ID contained random values, like current timestamp, now it has a consistent algorithm which is based on TCP stream id (SrcPort + DstPort + SrcIP/DstIP) and current ACK/SEQ number (to distinguish multiple messages within the same stream).
BPF filter optimizations
When tracking response it now uses a more accurate BPF rule to filter only needed traffic.
Misc
The packet code is now fully moved to tcp/Packet, so packet processing done only once in one place.
TCP output now has a 5 second timeout, and has a proper Close method.
Fully switching to go modules and removing vendoring.