Skip to content

Commit

Permalink
Track bucket instead of sizes (#913)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Nov 20, 2024
1 parent cf32712 commit d6b99b5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
7 changes: 3 additions & 4 deletions disperser/encoder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"
"time"

"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
Expand Down Expand Up @@ -107,9 +106,9 @@ func (m *Metrics) ObserveLatency(stage string, duration time.Duration) {
m.Latency.WithLabelValues(stage).Observe(float64(duration.Milliseconds()))
}

func (m *Metrics) ObserveQueue(queueStats map[int]int) {
for blobSize, num := range queueStats {
m.BlobQueue.With(prometheus.Labels{"size_bucket": common.BlobSizeBucket(blobSize)}).Set(float64(num))
func (m *Metrics) ObserveQueue(queueStats map[string]int) {
for bucket, num := range queueStats {
m.BlobQueue.With(prometheus.Labels{"size_bucket": bucket}).Set(float64(num))
}
}

Expand Down
16 changes: 8 additions & 8 deletions disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/disperser"
pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder"
"github.com/Layr-Labs/eigenda/disperser/common"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc"
Expand All @@ -30,7 +31,7 @@ type EncoderServer struct {
runningRequests chan struct{}
requestPool chan blobRequest

queueStats map[int]int
queueStats map[string]int
queueLock sync.Mutex
}

Expand All @@ -47,7 +48,7 @@ func NewEncoderServer(config ServerConfig, logger logging.Logger, prover encodin

runningRequests: make(chan struct{}, config.MaxConcurrentRequests),
requestPool: make(chan blobRequest, config.RequestPoolSize),
queueStats: make(map[int]int),
queueStats: make(map[string]int),
}
}

Expand Down Expand Up @@ -92,16 +93,15 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques
blobSize := len(req.GetData())
select {
case s.requestPool <- blobRequest{blobSizeByte: blobSize}:
s.queueLock.Lock()
s.queueStats[common.BlobSizeBucket(blobSize)]++
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()
default:
s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData()))
s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests)
return nil, errors.New("too many requests")
}
s.queueLock.Lock()
s.queueStats[blobSize]++
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()

s.runningRequests <- struct{}{}
defer s.popRequest()

Expand All @@ -126,7 +126,7 @@ func (s *EncoderServer) popRequest() {
blobRequest := <-s.requestPool
<-s.runningRequests
s.queueLock.Lock()
s.queueStats[blobRequest.blobSizeByte]--
s.queueStats[common.BlobSizeBucket(blobRequest.blobSizeByte)]--
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()
}
Expand Down

0 comments on commit d6b99b5

Please sign in to comment.