Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka Protocol and refactor socket logic #158

Merged
merged 25 commits into from
Jul 12, 2024
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
40300ce
parse kafka headers on kernel
kenanfarukcakir Jun 12, 2024
b163148
go mod
kenanfarukcakir Jun 14, 2024
b103054
parse kafka on kernel
kenanfarukcakir Jun 14, 2024
2ab0a60
decode kafka produceRequests and fetchResponses
kenanfarukcakir Jun 14, 2024
e826cf1
add kafka decoder
kenanfarukcakir Jun 21, 2024
6e756cd
remove unnecessary code for kafka decoder
kenanfarukcakir Jun 21, 2024
6fbb359
refactor aggregator
kenanfarukcakir Jun 21, 2024
015d34e
refactor aggregator
kenanfarukcakir Jul 1, 2024
71da0e3
go mod
kenanfarukcakir Jul 1, 2024
119d24a
add log hook
kenanfarukcakir Jul 1, 2024
6e8e51e
persist kafka events
kenanfarukcakir Jul 1, 2024
2f23949
refactor socket map
kenanfarukcakir Jul 2, 2024
d74a5c5
upgrade go version
kenanfarukcakir Jul 2, 2024
bed2d60
refactor socket creation logic
kenanfarukcakir Jul 3, 2024
7de89e7
increase chan sizes
kenanfarukcakir Jul 3, 2024
a327f44
put failed events back to queue
kenanfarukcakir Jul 3, 2024
c55307d
send connections to backend
kenanfarukcakir Jul 3, 2024
48026dd
only create socketmaps for container procs
kenanfarukcakir Jul 4, 2024
d370692
get container pids from cri
kenanfarukcakir Jul 5, 2024
f140049
refactor sock finding logic
kenanfarukcakir Jul 5, 2024
ebf8de1
send egress traffic for parsed requests
kenanfarukcakir Jul 5, 2024
969ea5d
add periodic check for short lived procs
kenanfarukcakir Jul 5, 2024
ed117e5
update debian version
kenanfarukcakir Jul 8, 2024
291079b
pg query len check
kenanfarukcakir Jul 9, 2024
51535fd
Merge pull request #157 from getanteon/feat/kafka
fatihbaltaci Jul 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor socket creation logic
kenanfarukcakir committed Jul 3, 2024
commit bed2d6006fe3d6d33cfea305b39d9a9a01441675
53 changes: 27 additions & 26 deletions aggregator/cluster.go
Original file line number Diff line number Diff line change
@@ -17,7 +17,8 @@ type ClusterInfo struct {

// Pid -> SocketMap
// pid -> fd -> {saddr, sport, daddr, dport}
SocketMaps []*SocketMap // index symbolizes pid
SocketMaps []*SocketMap // index symbolizes pid
socketMapsmu sync.Mutex

// Below mutexes guard socketMaps, set to mu inside SocketMap struct
// Used to find the correct mutex for the process, some pids can share the same mutex
@@ -34,13 +35,6 @@ func newClusterInfo(liveProcCount int) *ClusterInfo {
}
ci.signalChan = make(chan uint32)
sockMaps := make([]*SocketMap, maxPid+1) // index=pid
// initialize sockMaps
for i := range sockMaps {
sockMaps[i] = &SocketMap{
mu: nil,
M: nil,
}
}
ci.SocketMaps = sockMaps
ci.muIndex = atomic.Uint64{}

@@ -75,30 +69,34 @@ func (ci *ClusterInfo) SignalSocketMapCreation(pid uint32) {
func (ci *ClusterInfo) handleSocketMapCreation() {
for pid := range ci.signalChan {
ctxPid := context.WithValue(context.Background(), log.LOG_CONTEXT, fmt.Sprint(pid))
log.Logger.Debug().
Ctx(ctxPid).
Str("func", "handleSocketMapCreation").
Uint32("pid", pid).
Msg("")
if ci.SocketMaps[pid].mu == nil {
ci.muIndex.Add(1)
i := (ci.muIndex.Load()) % uint64(len(ci.muArray))
ci.muArray[i] = &sync.RWMutex{}
ci.SocketMaps[pid].mu = ci.muArray[i]
ci.SocketMaps[pid].pid = pid
ci.SocketMaps[pid].M = make(map[uint64]*SocketLine)
ci.SocketMaps[pid].waitingFds = make(chan uint64, 1000)
ci.SocketMaps[pid].processedFds = make(map[uint64]struct{})
ci.SocketMaps[pid].closeCh = make(chan struct{}, 1)
ci.SocketMaps[pid].ctx = ctxPid
go ci.SocketMaps[pid].ProcessSocketLineCreationRequests()

if ci.SocketMaps[pid] != nil {
continue
}

sockMap := &SocketMap{
mu: nil, // set below
pid: pid,
M: map[uint64]*SocketLine{},
waitingFds: make(chan uint64, 1000),
processedFds: map[uint64]struct{}{},
processedFdsmu: sync.RWMutex{},
closeCh: make(chan struct{}, 1),
ctx: ctxPid,
}

ci.muIndex.Add(1)
i := (ci.muIndex.Load()) % uint64(len(ci.muArray))
ci.muArray[i] = &sync.RWMutex{}
sockMap.mu = ci.muArray[i]
ci.SocketMaps[pid] = sockMap
go sockMap.ProcessSocketLineCreationRequests()
}
}

func (ci *ClusterInfo) clearProc(pid uint32) {
sm := ci.SocketMaps[pid]
if sm.mu == nil {
if sm == nil {
return
}

@@ -107,4 +105,7 @@ func (ci *ClusterInfo) clearProc(pid uint32) {
sm.closeCh <- struct{}{}
sm.M = nil
sm.mu.Unlock()

// reset
ci.SocketMaps[pid] = nil
}
187 changes: 69 additions & 118 deletions aggregator/data.go
Original file line number Diff line number Diff line change
@@ -62,10 +62,10 @@ type Aggregator struct {
ctxForKafka context.Context

// listen to events from different sources
k8sChan <-chan interface{}
ebpfChan <-chan interface{}
ebpfProcChan <-chan interface{}
ebpfTcpChan <-chan interface{}
k8sChan chan interface{}
ebpfChan chan interface{}
ebpfProcChan chan interface{}
ebpfTcpChan chan interface{}
tlsAttachSignalChan chan uint32

// store the service map
@@ -136,7 +136,7 @@ func init() {
}
}

func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
func NewAggregator(parentCtx context.Context, k8sChan chan interface{},
events chan interface{},
procEvents chan interface{},
tcpEvents chan interface{},
@@ -455,28 +455,29 @@ func (a *Aggregator) processTcpConnect(ctx context.Context, d *tcp_state.TcpConn
var ok bool

sockMap = a.clusterInfo.SocketMaps[d.Pid]
var skLine *SocketLine

if sockMap.mu == nil || sockMap.M == nil {
if sockMap == nil {
// signal socket map creation and requeue event
log.Logger.Warn().Ctx(ctx).
Uint32("pid", d.Pid).Str("func", "processTcpConnect").Str("event", "ESTABLISHED").Msg("socket map not initialized")

go a.clusterInfo.SignalSocketMapCreation(d.Pid)
a.ebpfTcpChan <- d
return
}

var skLine *SocketLine
// if sockMap.mu == nil || sockMap.M == nil {
// return
// }

sockMap.mu.RLock()
skLine, ok = sockMap.M[d.Fd]
sockMap.mu.RUnlock()
if !ok {
go sockMap.SignalSocketLine(ctx, d.Fd) // signal for creation
for {
sockMap.mu.RLock()
skLine, ok = sockMap.M[d.Fd]
sockMap.mu.RUnlock()

if !ok {
time.Sleep(100 * time.Millisecond)
}
}
// requeue connect event
a.ebpfTcpChan <- d
return
}

skLine.AddValue(
@@ -490,59 +491,59 @@ func (a *Aggregator) processTcpConnect(ctx context.Context, d *tcp_state.TcpConn
Dport: d.DPort,
},
)
}
// } else if d.Type_ == tcp_state.EVENT_TCP_CLOSED {
// var sockMap *SocketMap
// var ok bool

} else if d.Type_ == tcp_state.EVENT_TCP_CLOSED {
var sockMap *SocketMap
var ok bool

// filter out localhost connections
if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" {
return
}
// // filter out localhost connections
// if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" {
// return
// }

sockMap = a.clusterInfo.SocketMaps[d.Pid]
// sockMap = a.clusterInfo.SocketMaps[d.Pid]

var skLine *SocketLine
// var skLine *SocketLine

if sockMap.mu == nil || sockMap.M == nil {
log.Logger.Warn().Ctx(ctx).
Uint32("pid", d.Pid).Str("func", "processTcpConnect").Str("event", "CLOSED").Msg("socket map not initialized")
return
}
// if sockMap.mu == nil || sockMap.M == nil {
// log.Logger.Warn().Ctx(ctx).
// Uint32("pid", d.Pid).Str("func", "processTcpConnect").Str("event", "CLOSED").Msg("socket map not initialized")
// return
// }

skLine, ok = sockMap.M[d.Fd]
if !ok {
return
}
// skLine, ok = sockMap.M[d.Fd]
// if !ok {
// return
// }

// If connection is established before, add the close event
skLine.AddValue(
d.Timestamp, // get connection close timestamp from ebpf
nil, // closed
)
// // If connection is established before, add the close event
// skLine.AddValue(
// d.Timestamp, // get connection close timestamp from ebpf
// nil, // closed
// )

connKey := a.getConnKey(d.Pid, d.Fd)
// connKey := a.getConnKey(d.Pid, d.Fd)

// remove h2Parser if exists
a.h2ParserMu.Lock()
h2Parser, ok := a.h2Parsers[connKey]
if ok {
h2Parser.clientHpackDecoder.Close()
h2Parser.serverHpackDecoder.Close()
}
delete(a.h2Parsers, connKey)
a.h2ParserMu.Unlock()

// remove pgStmt if exists
a.pgStmtsMu.Lock()
for key, _ := range a.pgStmts {
if strings.HasPrefix(key, connKey) {
delete(a.pgStmts, key)
}
}
a.pgStmtsMu.Unlock()
// // remove h2Parser if exists
// a.h2ParserMu.Lock()
// h2Parser, ok := a.h2Parsers[connKey]
// if ok {
// h2Parser.clientHpackDecoder.Close()
// h2Parser.serverHpackDecoder.Close()
// }
// delete(a.h2Parsers, connKey)
// a.h2ParserMu.Unlock()

// // remove pgStmt if exists
// a.pgStmtsMu.Lock()
// for key, _ := range a.pgStmts {
// if strings.HasPrefix(key, connKey) {
// delete(a.pgStmts, key)
// }
// }
// a.pgStmtsMu.Unlock()

}
// }
}

func parseHttpPayload(request string) (method string, path string, httpVersion string, hostHeader string) {
@@ -1319,15 +1320,6 @@ func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event
}

func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
// // other protocols events come as whole, but http2 events come as frames
// // we need to aggregate frames to get the whole request
// defer func() {
// if r := recover(); r != nil {
// // TODO: we need to fix this properly
// log.Logger.Debug().Msgf("probably a http2 frame sent on a closed chan: %v", r)
// }
// }()

switch d.Protocol {
case l7_req.L7_PROTOCOL_HTTP2:
a.processHttp2Event(d)
@@ -1397,16 +1389,6 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_
return skInfo
}

// func (a *Aggregator) removeFromClusterInfo(pid uint32) {
// sockMap := a.clusterInfo.SocketMaps[pid]
// if sockMap.mu == nil {
// return
// }
// sockMap.mu.Lock()
// sockMap.M = nil
// sockMap.mu.Unlock()
// }

// This is a mitigation for the case a tcp event is missed
// func (a *Aggregator) updateSocketMap(ctx context.Context) {
// ticker := time.NewTicker(3 * time.Minute)
@@ -1438,56 +1420,23 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_
// }
// }

// func (a *Aggregator) fetchSocketOnNotFound(ctx context.Context, d *l7_req.L7Event) bool {
// a.liveProcessesMu.Lock()

// a.liveProcesses[d.Pid] = struct{}{}
// sockMap := a.clusterInfo.SocketMaps[d.Pid]
// // pid does not exists
// // acquire sockMap lock

// // in case of reference to mu is nil, pid exec event did not come yet
// // create a new mutex for the pid
// // to avoid race around the mutex, we need to lock the liveProcessesMu
// if sockMap.mu == nil {
// log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: pid not found")

// a.muIndex.Add(1)
// a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{}
// a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))]
// }
// // a.liveProcessesMu.Unlock()

// if a.clusterInfo.SocketMaps[pid].mu.

// // go try reading from kernel files
// err := sockMap.M[d.Fd].getConnectionInfo()
// if err != nil {
// log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Err(err).Msg("fetchSocketOnNotFound: failed to get connection info")
// return false
// } else {
// log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: connection info found")
// return true
// }

// }

func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *SockInfo {
sockMap := a.clusterInfo.SocketMaps[d.Pid]
// acquire sockMap lock

if sockMap.mu == nil || sockMap.M == nil {
if sockMap == nil {
go a.clusterInfo.SignalSocketMapCreation(d.Pid)
log.Logger.Warn().Ctx(ctx).
Int("pid", int(d.Pid)).Str("func", "findRelatedSocket").Msg("socket map not initialized")
return nil
}

sockMap.mu.RLock()
skLine, ok := sockMap.M[d.Fd]
sockMap.mu.RUnlock()
if !ok {
// start new socket line, find already established connections
go sockMap.SignalSocketLine(ctx, d.Fd)
// log.Logger.Warn().Ctx(ctx).Uint32("pid", d.Pid).Uint64("fd", d.Fd).Str("func", "findRelatedSocket").Msg("signal socket line creation called")
return nil // TODO: a retry queue for this event ?
return nil
}

return a.fetchSkInfo(ctx, skLine, d)
@@ -1635,6 +1584,7 @@ func (a *Aggregator) sendOpenConnection(sl *SocketLine) {
}
}

// TODO: connection send is made here, sendOpenConnection must be called, refactor this func and its calling place
func (a *Aggregator) clearSocketLines(ctx context.Context) {
ticker := time.NewTicker(120 * time.Second)
skLineCh := make(chan *SocketLine, 1000)
@@ -1655,6 +1605,7 @@ func (a *Aggregator) clearSocketLines(ctx context.Context) {

for range ticker.C {
for _, sockMap := range a.clusterInfo.SocketMaps {
// TODO: check here
if sockMap.mu == nil {
continue
}
59 changes: 29 additions & 30 deletions aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
@@ -50,7 +50,6 @@ func NewSocketLine(ctx context.Context, pid uint32, fd uint64, fetch bool) *Sock
log.Logger.Error().Ctx(ctx).Err(err).Msg("getConnectionInfo failed")
}
}
log.Logger.Debug().Ctx(ctx).Msg("returning from NewSocketLine")
return skLine
}

@@ -61,48 +60,48 @@ func (nl *SocketLine) ClearAll() {
}

func (nl *SocketLine) AddValue(timestamp uint64, sockInfo *SockInfo) {
log.Logger.Debug().Ctx(nl.ctx).
Any("pid", sockInfo.Pid).
Any("fd", sockInfo.Fd).
Any("ts", timestamp).
Msg("AddValue-start")
nl.mu.Lock()
defer nl.mu.Unlock()

log.Logger.Debug().Ctx(nl.ctx).
Any("pid", sockInfo.Pid).
Any("fd", sockInfo.Fd).
Any("ts", timestamp).
Msg("AddValue-start1")

// ignore close events
if sockInfo == nil {
log.Logger.Debug().Ctx(nl.ctx).
Any("pid", sockInfo.Pid).
Any("fd", sockInfo.Fd).
Any("ts", timestamp).
Msg("AddValue-1")
// log.Logger.Debug().Ctx(nl.ctx).
// Any("pid", nl.pid).
// Any("fd", nl.fd).
// Any("ts", timestamp).
// Msg("AddValue-1")
return
}

// log.Logger.Debug().Ctx(nl.ctx).
// Any("pid", sockInfo.Pid).
// Any("fd", sockInfo.Fd).
// Any("ts", timestamp).
// Msg("AddValue-start")
nl.mu.Lock()
defer nl.mu.Unlock()

// log.Logger.Debug().Ctx(nl.ctx).
// Any("pid", sockInfo.Pid).
// Any("fd", sockInfo.Fd).
// Any("ts", timestamp).
// Msg("AddValue-start1")

// if last element is equal to the current element, ignore
if len(nl.Values) > 0 {
last := nl.Values[len(nl.Values)-1].SockInfo
if last != nil && last.Saddr == sockInfo.Saddr && last.Sport == sockInfo.Sport && last.Daddr == sockInfo.Daddr && last.Dport == sockInfo.Dport {
log.Logger.Debug().Ctx(nl.ctx).
Any("pid", sockInfo.Pid).
Any("fd", sockInfo.Fd).
Any("ts", timestamp).
Msg("AddValue-2")
// log.Logger.Debug().Ctx(nl.ctx).
// Any("pid", sockInfo.Pid).
// Any("fd", sockInfo.Fd).
// Any("ts", timestamp).
// Msg("AddValue-2")
return
}
}

log.Logger.Debug().Ctx(nl.ctx).
Any("pid", sockInfo.Pid).
Any("fd", sockInfo.Fd).
Any("ts", timestamp).
Msg("AddValue-end")
// log.Logger.Debug().Ctx(nl.ctx).
// Any("pid", sockInfo.Pid).
// Any("fd", sockInfo.Fd).
// Any("ts", timestamp).
// Msg("AddValue-end")
nl.Values = insertIntoSortedSlice(nl.Values, &TimestampedSocket{Timestamp: timestamp, SockInfo: sockInfo})
}

137 changes: 17 additions & 120 deletions aggregator/socket.go
Original file line number Diff line number Diff line change
@@ -26,9 +26,10 @@ type SocketMap struct {
M map[uint64]*SocketLine `json:"fdToSockLine"` // fd -> SockLine
waitingFds chan uint64

processedFds map[uint64]struct{}
closeCh chan struct{}
ctx context.Context
processedFds map[uint64]struct{}
processedFdsmu sync.RWMutex
closeCh chan struct{}
ctx context.Context
}

// only one worker can create socket lines for a particular process(socketmap)
@@ -38,11 +39,9 @@ func (sm *SocketMap) ProcessSocketLineCreationRequests() {
case <-sm.closeCh:
return
case fd := <-sm.waitingFds:
log.Logger.Debug().Ctx(sm.ctx).
Msgf("pid=%d,fd=%d came for socket line creation", sm.pid, fd)
if _, ok := sm.M[fd]; !ok {
sm.createSocketLine(fd)
log.Logger.Info().Ctx(sm.ctx).
sm.createSocketLine(fd, true)
log.Logger.Debug().Ctx(sm.ctx).
Uint32("pid", sm.pid).
Uint64("fd", fd).
Msgf("created socket line for fd:%d", fd)
@@ -52,126 +51,24 @@ func (sm *SocketMap) ProcessSocketLineCreationRequests() {
}

func (sm *SocketMap) SignalSocketLine(ctx context.Context, fd uint64) {
sm.processedFdsmu.RLock()
if _, ok := sm.processedFds[fd]; ok {
sm.processedFdsmu.RUnlock()
return
} else {
sm.processedFdsmu.RUnlock()

sm.processedFdsmu.Lock()
sm.processedFds[fd] = struct{}{}
sm.processedFdsmu.Unlock()
}
log.Logger.Debug().Ctx(ctx).Uint32("pid", sm.pid).Uint64("fd", fd).Msg("signaling socket creation..")
sm.processedFds[fd] = struct{}{}

sm.waitingFds <- fd
}

func (sm *SocketMap) createSocketLine(fd uint64) {
// TODO: get fetch boolean
log.Logger.Debug().Ctx(sm.ctx).
Uint32("pid", sm.pid).
Uint64("fd", fd).
Msg("createSocketLine called..")
skLine := NewSocketLine(sm.ctx, sm.pid, fd, true)
log.Logger.Debug().Ctx(sm.ctx).
Uint32("pid", sm.pid).
Uint64("fd", fd).
Msg("createSocketLine acquiring lock..")
func (sm *SocketMap) createSocketLine(fd uint64, fetch bool) {
skLine := NewSocketLine(sm.ctx, sm.pid, fd, fetch)
sm.mu.Lock()
log.Logger.Debug().Ctx(sm.ctx).
Uint32("pid", sm.pid).
Uint64("fd", fd).
Msg("createSocketLine inside lock..")
sm.M[fd] = skLine
sm.mu.Unlock()
log.Logger.Debug().Ctx(sm.ctx).
Uint32("pid", sm.pid).
Uint64("fd", fd).
Msg("createSocketLine ended..")
}

// get all tcp sockets for the pid
// iterate through all sockets
// create a new socket line for each socket
// add it to the socket map
// func (sm *SocketMap) fetchExistingSockets() {
// socks := map[string]sock{}

// // Get the sockets for the process.
// var err error
// for _, f := range []string{"tcp", "tcp6"} {
// sockPath := strings.Join([]string{"/proc", fmt.Sprint(sm.pid), "net", f}, "/")

// ss, err := readSockets(sockPath)
// if err != nil {
// continue
// }

// for _, s := range ss {
// socks[s.Inode] = sock{TcpSocket: s}
// }
// }

// // Get the file descriptors for the process.
// fdDir := strings.Join([]string{"/proc", fmt.Sprint(sm.pid), "fd"}, "/")
// fdEntries, err := os.ReadDir(fdDir)
// if err != nil {
// return
// }

// fds := make([]Fd, 0, len(fdEntries))
// for _, entry := range fdEntries {
// fd, err := strconv.ParseUint(entry.Name(), 10, 64)
// if err != nil {
// continue
// }
// dest, err := os.Readlink(path.Join(fdDir, entry.Name()))
// if err != nil {
// continue
// }
// var socketInode string
// if strings.HasPrefix(dest, "socket:[") && strings.HasSuffix(dest, "]") {
// socketInode = dest[len("socket:[") : len(dest)-1]
// }
// fds = append(fds, Fd{Fd: fd, Dest: dest, SocketInode: socketInode})
// }

// // Match the sockets to the file descriptors.
// for _, fd := range fds {
// if fd.SocketInode != "" {
// // add to values
// s := socks[fd.SocketInode].TcpSocket
// sockInfo := &SockInfo{
// Pid: sm.pid,
// Fd: fd.Fd,
// Saddr: s.SAddr.IP().String(),
// Sport: s.SAddr.Port(),
// Daddr: s.DAddr.IP().String(),
// Dport: s.DAddr.Port(),
// }

// if sockInfo.Saddr == "zero IP" || sockInfo.Daddr == "zero IP" || sockInfo.Sport == 0 || sockInfo.Dport == 0 {
// continue
// }

// skLine := NewSocketLine(sm.pid, fd.Fd)
// skLine.AddValue(0, sockInfo)

// if sm.mu == nil {
// return
// }

// sm.mu.Lock()
// if sm.M == nil {
// sm.M = make(map[uint64]*SocketLine)
// }
// sm.M[fd.Fd] = skLine
// sm.mu.Unlock()
// }
// }
// }

// func (sm *SocketMap) retrieveSocket(fd uint64) {
// sm.mu.Lock()
// if sl, ok := sm.M[fd]; ok {
// sl.getConnectionInfo()
// } else {
// sm.M[fd] = NewSocketLine(sm.pid, fd)
// sl.getConnectionInfo()
// }
// sm.mu.Unlock()
// }
4 changes: 3 additions & 1 deletion datastore/backend.go
Original file line number Diff line number Diff line change
@@ -547,7 +547,9 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s
return
}

// log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend")
// if endpoint == reqEndpoint {
// log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend")
// }
err = b.DoRequest(httpReq)
if err != nil {
log.Logger.Error().Msgf("backend persist error at ep %s : %v", endpoint, err)