Skip to content

Commit

Permalink
persist kafka events
Browse files Browse the repository at this point in the history
  • Loading branch information
kenanfarukcakir committed Jul 1, 2024
1 parent 119d24a commit 6e8e51e
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 18 deletions.
5 changes: 3 additions & 2 deletions config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type BackendDSConfig struct {
GpuMetricsExport bool
MetricsExportInterval int // in seconds

ReqBufferSize int
ConnBufferSize int
ReqBufferSize int
ConnBufferSize int
KafkaEventBufferSize int
}
129 changes: 113 additions & 16 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,13 @@ type BackendDS struct {
c *http.Client
batchSize uint64

reqChanBuffer chan *ReqInfo
connChanBuffer chan *ConnInfo
reqInfoPool *poolutil.Pool[*ReqInfo]
aliveConnPool *poolutil.Pool[*ConnInfo]
reqChanBuffer chan *ReqInfo
connChanBuffer chan *ConnInfo
kafkaChanBuffer chan *KafkaEventInfo

reqInfoPool *poolutil.Pool[*ReqInfo]
aliveConnPool *poolutil.Pool[*ConnInfo]
kafkaEventInfoPool *poolutil.Pool[*KafkaEventInfo]

traceEventQueue *list.List
traceEventMu sync.RWMutex
Expand All @@ -169,16 +172,17 @@ type BackendDS struct {
}

const (
podEndpoint = "/pod/"
svcEndpoint = "/svc/"
rsEndpoint = "/replicaset/"
depEndpoint = "/deployment/"
epEndpoint = "/endpoint/"
containerEndpoint = "/container/"
dsEndpoint = "/daemonset/"
ssEndpoint = "/statefulset/"
reqEndpoint = "/requests/"
connEndpoint = "/connections/"
podEndpoint = "/pod/"
svcEndpoint = "/svc/"
rsEndpoint = "/replicaset/"
depEndpoint = "/deployment/"
epEndpoint = "/endpoint/"
containerEndpoint = "/container/"
dsEndpoint = "/daemonset/"
ssEndpoint = "/statefulset/"
reqEndpoint = "/requests/"
connEndpoint = "/connections/"
kafkaEventEndpoint = "/events/kafka/"

traceEventEndpoint = "/dist_tracing/traffic/"

Expand Down Expand Up @@ -291,9 +295,11 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}),
kafkaEventInfoPool: newKafkaEventPool(func() *KafkaEventInfo { return &KafkaEventInfo{} }, func(r *KafkaEventInfo) {}),
traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize),
kafkaChanBuffer: make(chan *KafkaEventInfo, conf.ReqBufferSize),
podEventChan: make(chan interface{}, 5*resourceChanSize),
svcEventChan: make(chan interface{}, 2*resourceChanSize),
rsEventChan: make(chan interface{}, 2*resourceChanSize),
Expand All @@ -313,7 +319,8 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe

func (ds *BackendDS) Start() {
go ds.sendReqsInBatch(ds.batchSize)
go ds.sendConnsInBatch(ds.batchSize)
go ds.sendConnsInBatch(ds.batchSize / 2)
go ds.sendKafkaEventsInBatch(ds.batchSize / 2)
go ds.sendTraceEventsInBatch(10 * ds.batchSize)

// events are resynced every 60 seconds on k8s informers
Expand Down Expand Up @@ -458,6 +465,18 @@ func convertReqsToPayload(batch []*ReqInfo) RequestsPayload {
}
}

func convertKafkaEventsToPayload(batch []*KafkaEventInfo) KafkaEventInfoPayload {
return KafkaEventInfoPayload{
Metadata: Metadata{
MonitoringID: MonitoringID,
IdempotencyKey: string(uuid.NewUUID()),
NodeID: NodeID,
AlazVersion: tag,
},
KafkaEvents: batch,
}
}

func convertConnsToPayload(batch []*ConnInfo) ConnInfoPayload {
return ConnInfoPayload{
Metadata: Metadata{
Expand Down Expand Up @@ -528,7 +547,7 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s
return
}

log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend")
// log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend")
err = b.DoRequest(httpReq)
if err != nil {
log.Logger.Error().Msgf("backend persist error at ep %s : %v", endpoint, err)
Expand Down Expand Up @@ -609,6 +628,47 @@ func (b *BackendDS) sendReqsInBatch(batchSize uint64) {

}

func (b *BackendDS) sendKafkaEventsInBatch(batchSize uint64) {
t := time.NewTicker(5 * time.Second)
defer t.Stop()

send := func() {
batch := make([]*KafkaEventInfo, 0, batchSize)
loop := true

for i := 0; (i < int(batchSize)) && loop; i++ {
select {
case req := <-b.kafkaChanBuffer:
batch = append(batch, req)
case <-time.After(50 * time.Millisecond):
loop = false
}
}

if len(batch) == 0 {
return
}

kEventsPayload := convertKafkaEventsToPayload(batch)
go b.sendToBackend(http.MethodPost, kEventsPayload, kafkaEventEndpoint)

for _, req := range batch {
b.kafkaEventInfoPool.Put(req)
}
}

for {
select {
case <-b.ctx.Done():
log.Logger.Info().Msg("stopping sending kafka events to backend")
return
case <-t.C:
send()
}
}

}

func (b *BackendDS) sendConnsInBatch(batchSize uint64) {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
Expand Down Expand Up @@ -724,6 +784,14 @@ func newTraceInfoPool(factory func() *TraceInfo, close func(*TraceInfo)) *poolut
}
}

func newKafkaEventPool(factory func() *KafkaEventInfo, close func(*KafkaEventInfo)) *poolutil.Pool[*KafkaEventInfo] {
return &poolutil.Pool[*KafkaEventInfo]{
Items: make(chan *KafkaEventInfo, 1000),
Factory: factory,
Close: close,
}
}

func (b *BackendDS) PersistAliveConnection(aliveConn *AliveConnection) error {
// get a connInfo from the pool
oc := b.aliveConnPool.Get()
Expand Down Expand Up @@ -773,6 +841,35 @@ func (b *BackendDS) PersistRequest(request *Request) error {
return nil
}

func (b *BackendDS) PersistKafkaEvent(ke *KafkaEvent) error {
// get a reqInfo from the pool
kafkaInfo := b.kafkaEventInfoPool.Get()

// overwrite the reqInfo, all fields must be set in order to avoid conflict
kafkaInfo[0] = ke.StartTime
kafkaInfo[1] = ke.Latency
kafkaInfo[2] = ke.FromIP
kafkaInfo[3] = ke.FromType
kafkaInfo[4] = ke.FromUID
kafkaInfo[5] = ke.FromPort
kafkaInfo[6] = ke.ToIP
kafkaInfo[7] = ke.ToType
kafkaInfo[8] = ke.ToUID
kafkaInfo[9] = ke.ToPort
kafkaInfo[10] = ke.Topic
kafkaInfo[11] = ke.Partition
kafkaInfo[12] = ke.Key
kafkaInfo[13] = ke.Value
kafkaInfo[14] = ke.Type
kafkaInfo[15] = ke.Tls
kafkaInfo[16] = ke.Seq
kafkaInfo[17] = ke.Tid

b.kafkaChanBuffer <- kafkaInfo

return nil
}

func (b *BackendDS) PersistTraceEvent(trace *l7_req.TraceEvent) error {
if trace == nil {
return fmt.Errorf("trace event is nil")
Expand Down
2 changes: 2 additions & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type DataStore interface {

PersistRequest(request *Request) error

PersistKafkaEvent(request *KafkaEvent) error

PersistTraceEvent(trace *l7_req.TraceEvent) error

PersistAliveConnection(trace *AliveConnection) error
Expand Down
101 changes: 101 additions & 0 deletions datastore/dto.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,74 @@ type AliveConnection struct {
ToPort uint16
}

type DirectionalEvent interface {
SetFromUID(string)
SetFromIP(string)
SetFromType(string)
SetFromPort(uint16)

SetToUID(string)
SetToIP(string)
SetToType(string)
SetToPort(uint16)

ReverseDirection()
}

type KafkaEvent struct {
StartTime int64
Latency uint64 // in ns
FromIP string
FromType string
FromUID string
FromPort uint16
ToIP string
ToType string
ToUID string
ToPort uint16
Topic string
Partition uint32
Key string
Value string
Type string // PUBLISH or CONSUME
Tls bool
Tid uint32
Seq uint32
}

func (ke *KafkaEvent) SetFromUID(uid string) {
ke.FromUID = uid
}
func (ke *KafkaEvent) SetFromIP(ip string) {
ke.FromIP = ip
}
func (ke *KafkaEvent) SetFromType(typ string) {
ke.FromType = typ
}
func (ke *KafkaEvent) SetFromPort(port uint16) {
ke.FromPort = port
}

func (ke *KafkaEvent) SetToUID(uid string) {
ke.ToUID = uid
}
func (ke *KafkaEvent) SetToIP(ip string) {
ke.ToIP = ip
}
func (ke *KafkaEvent) SetToType(typ string) {
ke.ToType = typ
}
func (ke *KafkaEvent) SetToPort(port uint16) {
ke.ToPort = port
}

func (req *KafkaEvent) ReverseDirection() {
req.FromIP, req.ToIP = req.ToIP, req.FromIP
req.FromPort, req.ToPort = req.ToPort, req.FromPort
req.FromUID, req.ToUID = req.ToUID, req.FromUID
req.FromType, req.ToType = req.ToType, req.FromType
}

type Request struct {
StartTime int64
Latency uint64 // in ns
Expand All @@ -125,6 +193,39 @@ type Request struct {
Seq uint32
}

func (r *Request) SetFromUID(uid string) {
r.FromUID = uid
}
func (r *Request) SetFromIP(ip string) {
r.FromIP = ip
}
func (r *Request) SetFromType(typ string) {
r.FromType = typ
}
func (r *Request) SetFromPort(port uint16) {
r.FromPort = port
}

func (r *Request) SetToUID(uid string) {
r.ToUID = uid
}
func (r *Request) SetToIP(ip string) {
r.ToIP = ip
}
func (r *Request) SetToType(typ string) {
r.ToType = typ
}
func (r *Request) SetToPort(port uint16) {
r.ToPort = port
}

func (req *Request) ReverseDirection() {
req.FromIP, req.ToIP = req.ToIP, req.FromIP
req.FromPort, req.ToPort = req.ToPort, req.FromPort
req.FromUID, req.ToUID = req.ToUID, req.FromUID
req.FromType, req.ToType = req.ToType, req.FromType
}

type BackendResponse struct {
Msg string `json:"msg"`
Errors []struct {
Expand Down
25 changes: 25 additions & 0 deletions datastore/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,31 @@ type TracePayload struct {
Traces []*TraceInfo `json:"traffic"`
}

// 0) StartTime
// 1) Latency
// 2) Source IP
// 3) Source Type
// 4) Source ID
// 5) Source Port
// 6) Destination IP
// 7) Destination Type
// 8) Destination ID
// 9) Destination Port
// 10) Topic
// 11) Partition
// 12) Key
// 13) Value
// 14) Type
// 15) Encrypted (bool)
// 16) Seq
// 17) Tid
type KafkaEventInfo [18]interface{}

type KafkaEventInfoPayload struct {
Metadata Metadata `json:"metadata"`
KafkaEvents []*KafkaEventInfo `json:"kafka_events"`
}

func convertPodToPodEvent(pod Pod, eventType string) PodEvent {
return PodEvent{
UID: pod.UID,
Expand Down

0 comments on commit 6e8e51e

Please sign in to comment.