Skip to content

Commit

Permalink
Try to capture RS input backpressure metric
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Jul 2, 2021
1 parent 2647f6e commit a9ddbfa
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions roomserver/internal/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
)
Expand All @@ -52,6 +53,7 @@ type inputWorker struct {
r *Inputer
running atomic.Bool
input *fifoQueue
roomID string
}

// Guarded by a CAS on w.running
Expand All @@ -64,6 +66,9 @@ func (w *inputWorker) start() {
if !ok {
continue
}
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": task.event.Event.RoomID(),
}).Observe(float64(w.input.count))
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil {
Expand Down Expand Up @@ -120,6 +125,20 @@ func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) er
return errs
}

func init() {
prometheus.MustRegister(processRoomEventDuration)
}

var roomserverInputBackpressure = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "dendrite",
Subsystem: "roomserver",
Name: "input_backpressure",
Help: "How many events are queued for input for a given room",
},
[]string{"room_id"},
)

// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
_ context.Context,
Expand Down Expand Up @@ -164,6 +183,9 @@ func (r *Inputer) InputRoomEvents(
go worker.start()
}
worker.input.push(tasks[i])
roomserverInputBackpressure.With(prometheus.Labels{
"room_id": roomID,
}).Observe(float64(worker.input.count))
}

// Wait for all of the workers to return results about our tasks.
Expand Down

0 comments on commit a9ddbfa

Please sign in to comment.