Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Protocol and refactor socket logic #158

Merged
merged 25 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
40300ce
parse kafka headers on kernel
kenanfarukcakir Jun 12, 2024
b163148
go mod
kenanfarukcakir Jun 14, 2024
b103054
parse kafka on kernel
kenanfarukcakir Jun 14, 2024
2ab0a60
decode kafka produceRequests and fetchResponses
kenanfarukcakir Jun 14, 2024
e826cf1
add kafka decoder
kenanfarukcakir Jun 21, 2024
6e756cd
remove unnecessary code for kafka decoder
kenanfarukcakir Jun 21, 2024
6fbb359
refactor aggregator
kenanfarukcakir Jun 21, 2024
015d34e
refactor aggregator
kenanfarukcakir Jul 1, 2024
71da0e3
go mod
kenanfarukcakir Jul 1, 2024
119d24a
add log hook
kenanfarukcakir Jul 1, 2024
6e8e51e
persist kafka events
kenanfarukcakir Jul 1, 2024
2f23949
refactor socket map
kenanfarukcakir Jul 2, 2024
d74a5c5
upgrade go version
kenanfarukcakir Jul 2, 2024
bed2d60
refactor socket creation logic
kenanfarukcakir Jul 3, 2024
7de89e7
increase chan sizes
kenanfarukcakir Jul 3, 2024
a327f44
put failed events back to queue
kenanfarukcakir Jul 3, 2024
c55307d
send connections to backend
kenanfarukcakir Jul 3, 2024
48026dd
only create socketmaps for container procs
kenanfarukcakir Jul 4, 2024
d370692
get container pids from cri
kenanfarukcakir Jul 5, 2024
f140049
refactor sock finding logic
kenanfarukcakir Jul 5, 2024
ebf8de1
send egress traffic for parsed requests
kenanfarukcakir Jul 5, 2024
969ea5d
add periodic check for short lived procs
kenanfarukcakir Jul 5, 2024
ed117e5
update debian version
kenanfarukcakir Jul 8, 2024
291079b
pg query len check
kenanfarukcakir Jul 9, 2024
51535fd
Merge pull request #157 from getanteon/feat/kafka
fatihbaltaci Jul 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.1-bullseye as builder
FROM golang:1.22.4-bullseye AS builder
WORKDIR /app
COPY . ./
RUN apt update
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.default
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.1-bullseye as builder
FROM golang:1.22.4-bullseye AS builder
WORKDIR /app
COPY . ./
RUN apt update
Expand All @@ -8,7 +8,7 @@ ENV GOCACHE=/root/.cache/go-build
RUN go mod tidy -v
RUN --mount=type=cache,target="/root/.cache/go-build" GOOS=linux go build -ldflags="-X 'github.com/ddosify/alaz/datastore.tag=$VERSION'" -o alaz

FROM debian:12.5-slim
FROM debian:12.6-slim
RUN apt-get update && apt-get install -y procps ca-certificates && rm -rf /var/lib/apt/lists/*

COPY --chown=0:0 --from=builder /app/alaz ./bin/
Expand Down
110 changes: 110 additions & 0 deletions aggregator/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package aggregator

import (
"context"
"fmt"
"sync"
"sync/atomic"

"github.com/ddosify/alaz/log"
"k8s.io/apimachinery/pkg/types"
)

type ClusterInfo struct {
k8smu sync.RWMutex
PodIPToPodUid map[string]types.UID `json:"podIPToPodUid"`
ServiceIPToServiceUid map[string]types.UID `json:"serviceIPToServiceUid"`

// Pid -> SocketMap
// pid -> fd -> {saddr, sport, daddr, dport}
SocketMaps []*SocketMap // index symbolizes pid
socketMapsmu sync.Mutex

// Below mutexes guard socketMaps, set to mu inside SocketMap struct
// Used to find the correct mutex for the process, some pids can share the same mutex
muIndex atomic.Uint64
muArray []*sync.RWMutex

signalChan chan uint32 // pids are signaled on this channel to notify clusterInfo struct to initialize a SocketMap
}

func newClusterInfo(liveProcCount int) *ClusterInfo {
ci := &ClusterInfo{
PodIPToPodUid: map[string]types.UID{},
ServiceIPToServiceUid: map[string]types.UID{},
}
ci.signalChan = make(chan uint32)
sockMaps := make([]*SocketMap, maxPid+1) // index=pid
ci.SocketMaps = sockMaps
ci.muIndex = atomic.Uint64{}

// initialize mutex array

// normally, mutex per pid is straightforward solution
// on regular systems, maxPid is around 32768
// so, we allocate 32768 mutexes, which is 32768 * 24 bytes = 786KB
// but on 64-bit systems, maxPid can be 4194304
// and we don't want to allocate 4194304 mutexes, it adds up to 4194304 * 24 bytes = 100MB
// So, some process will have to share the mutex

// assume liveprocesses can increase up to 100 times of current count
// if processes exceeds the count of mutex, they will share the mutex
countMuArray := liveProcCount * 100
if countMuArray > maxPid {
countMuArray = maxPid
}
// for 2k processes, 200k mutex => 200k * 24 bytes = 4.80MB
// in case of maxPid is 32678, 32678 * 24 bytes = 784KB, pick the smaller one
ci.muArray = make([]*sync.RWMutex, countMuArray)
go ci.handleSocketMapCreation()
return ci
}

func (ci *ClusterInfo) SignalSocketMapCreation(pid uint32) {
ci.signalChan <- pid
}

// events will be processed sequentially here in one goroutine.
// in order to prevent race.
func (ci *ClusterInfo) handleSocketMapCreation() {
for pid := range ci.signalChan {
if ci.SocketMaps[pid] != nil {
continue
}

ctxPid := context.WithValue(context.Background(), log.LOG_CONTEXT, fmt.Sprint(pid))

sockMap := &SocketMap{
mu: nil, // set below
pid: pid,
M: map[uint64]*SocketLine{},
waitingFds: make(chan uint64, 1000),
processedFds: map[uint64]struct{}{},
processedFdsmu: sync.RWMutex{},
closeCh: make(chan struct{}, 1),
ctx: ctxPid,
}

ci.muIndex.Add(1)
i := (ci.muIndex.Load()) % uint64(len(ci.muArray))
ci.muArray[i] = &sync.RWMutex{}
sockMap.mu = ci.muArray[i]
ci.SocketMaps[pid] = sockMap
go sockMap.ProcessSocketLineCreationRequests()
}
}

func (ci *ClusterInfo) clearProc(pid uint32) {
sm := ci.SocketMaps[pid]
if sm == nil {
return
}

// stop waiting for socketline creation requests
sm.mu.Lock()
sm.closeCh <- struct{}{}
sm.mu.Unlock()

// reset
ci.SocketMaps[pid] = nil
}
Loading