Skip to content

Commit

Permalink
TCP proxy: bytes counters
Browse files Browse the repository at this point in the history
  • Loading branch information
setaou committed Sep 5, 2023
1 parent 1f008fe commit 8269d68
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
32 changes: 32 additions & 0 deletions metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,36 @@ var (
},
[]string{"address", "proxy"},
)

FeBytesIn = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mlb_frontend_bytes_in",
Help: "The number of inwards bytes processed by frontend",
},
[]string{"address", "proxy"},
)

FeBytesOut = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mlb_frontend_bytes_out",
Help: "The number of outwards bytes processed by frontend",
},
[]string{"address", "proxy"},
)

BeBytesIn = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mlb_backend_bytes_in",
Help: "The number of inwards bytes processed by backend",
},
[]string{"address", "proxy"},
)

BeBytesOut = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "mlb_backend_bytes_out",
Help: "The number of outwards bytes processed by backend",
},
[]string{"address", "proxy"},
)
)
13 changes: 10 additions & 3 deletions proxy/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (p *ProxyTCP) listen(address string, wg *sync.WaitGroup, ctx context.Contex
}()
}

func (p *ProxyTCP) pipe(input net.Conn, output net.Conn, done chan bool, input_timeout time.Duration, output_timeout time.Duration) {
func (p *ProxyTCP) pipe(input net.Conn, output net.Conn, done chan bool, input_timeout time.Duration, output_timeout time.Duration, counter *int) {
// Error handler
defer func() {
if r := recover(); r != nil {
Expand All @@ -173,6 +173,7 @@ func (p *ProxyTCP) pipe(input net.Conn, output net.Conn, done chan bool, input_t
input.SetReadDeadline(time.Now().Add(input_timeout))
}
nbytes, err := input.Read(buffer)
*counter += nbytes
if err == io.EOF || errors.Is(err, net.ErrClosed) {
return
}
Expand Down Expand Up @@ -270,9 +271,10 @@ func (p *ProxyTCP) handle_connection(conn_front net.Conn) {
// Pipe the connections both ways
done_front_back := make(chan bool)
done_back_front := make(chan bool)
var bytes_in, bytes_out int

go p.pipe(conn_front, conn_back, done_front_back, p.client_timeout, p.server_timeout)
go p.pipe(conn_back, conn_front, done_back_front, p.server_timeout, p.client_timeout)
go p.pipe(conn_front, conn_back, done_front_back, p.client_timeout, p.server_timeout, &bytes_in)
go p.pipe(conn_back, conn_front, done_back_front, p.server_timeout, p.client_timeout, &bytes_out)

// Wait for one pipe to end or the context to be cancelled
select {
Expand All @@ -281,6 +283,11 @@ func (p *ProxyTCP) handle_connection(conn_front net.Conn) {
case <-ctx.Done():
}

metrics.FeBytesIn.WithLabelValues(frontend_address, p.id).Add(float64(bytes_in))
metrics.FeBytesOut.WithLabelValues(frontend_address, p.id).Add(float64(bytes_out))
metrics.BeBytesIn.WithLabelValues(backend_address, p.id).Add(float64(bytes_in))
metrics.BeBytesOut.WithLabelValues(backend_address, p.id).Add(float64(bytes_out))

// Ensure both ends are closed so both pipes will exit
conn_front.Close()
conn_back.Close()
Expand Down

0 comments on commit 8269d68

Please sign in to comment.