diff --git a/aggregator/data.go b/aggregator/data.go index 51b758f..371298c 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -1421,7 +1421,7 @@ func (a *Aggregator) sendOpenConnection(sl *SocketLine) { } func (a *Aggregator) clearSocketLines(ctx context.Context) { - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(120 * time.Second) skLineCh := make(chan *SocketLine, 1000) go func() { diff --git a/aggregator/sock_num_line.go b/aggregator/sock_num_line.go index 561d8a2..abc495d 100644 --- a/aggregator/sock_num_line.go +++ b/aggregator/sock_num_line.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/ddosify/alaz/log" "inet.af/netaddr" ) @@ -88,10 +89,26 @@ func (nl *SocketLine) DeleteUnused() { nl.mu.Lock() defer nl.mu.Unlock() - if len(nl.Values) == 0 { + if len(nl.Values) <= 1 { return } + // if two open sockets are alined, delete the first one + // in case first ones close event did not arrive + result := make([]TimestampedSocket, 0) + i := 0 + for i < len(nl.Values)-1 { + if nl.Values[i].SockInfo != nil && nl.Values[i+1].SockInfo != nil { + result = append(result, nl.Values[i+1]) + log.Logger.Debug().Msgf("deleting socket line %v", nl.Values[i]) + i = i + 2 + } else { + result = append(result, nl.Values[i]) + i++ + } + } + nl.Values = result + var lastMatchedReqTime uint64 = 0 // traverse the slice backwards @@ -102,7 +119,7 @@ func (nl *SocketLine) DeleteUnused() { } // assumedInterval is inversely proportional to the number of requests being discarded - assumedInterval := uint64(1 * time.Minute) + assumedInterval := uint64(5 * time.Minute) // delete all values that // closed and its LastMatch + assumedInterval < lastMatchedReqTime diff --git a/ebpf/tcp_state/tcp.go b/ebpf/tcp_state/tcp.go index ea608aa..8aaf8c7 100644 --- a/ebpf/tcp_state/tcp.go +++ b/ebpf/tcp_state/tcp.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "time" "unsafe" "github.com/ddosify/alaz/ebpf/c" @@ -206,6 +207,7 @@ func findEndIndex(b [100]uint8) (endIndex int) { // returns when program is detached func (tsp *TcpStateProg) Consume(ctx context.Context, ch chan interface{}) { + printTs := true for { read := func() { record, err := tsp.tcpConnectEvents.Read() @@ -223,9 +225,9 @@ func (tsp *TcpStateProg) Consume(ctx context.Context, ch chan interface{}) { bpfEvent := (*BpfTcpEvent)(unsafe.Pointer(&record.RawSample[0])) - if bpfEvent.Pid == 3738744 { - log.Logger.Debug().Uint64("ts", bpfEvent.Timestamp). - Str("type", TcpStateConversion(bpfEvent.Type).String()).Uint64("fd", bpfEvent.Fd).Msg("tcp event of pid 3738744") + if printTs { + log.Logger.Info().Uint64("now", uint64(time.Now().UnixNano())).Msgf("first-bpf-timestamp: %d", bpfEvent.Timestamp) + printTs = false } go func() {