Skip to content
This repository has been archived by the owner on Nov 25, 2024. It is now read-only.

Commit

Permalink
Use a custom FIFO queue for the RS input API (#1888)
Browse files Browse the repository at this point in the history
* Use a FIFO queue instead of a channel to reduce backpressure

* Make sure someone wakes up

* Tweaks

* Add comments
  • Loading branch information
neilalexander authored Jun 28, 2021
1 parent a6f7e83 commit 7c3991e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 6 deletions.
15 changes: 9 additions & 6 deletions roomserver/internal/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ type Inputer struct {
ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs
OutputRoomEventTopic string

workers sync.Map // room ID -> *inputWorker
workers sync.Map // room ID -> *inputWorker
}

type inputTask struct {
Expand All @@ -52,15 +51,19 @@ type inputTask struct {
type inputWorker struct {
r *Inputer
running atomic.Bool
input chan *inputTask
input *fifoQueue
}

// Guarded by a CAS on w.running
func (w *inputWorker) start() {
defer w.running.Store(false)
for {
select {
case task := <-w.input:
case <-w.input.wait():
task, ok := w.input.pop()
if !ok {
continue
}
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
if task.err == nil {
Expand Down Expand Up @@ -143,7 +146,7 @@ func (r *Inputer) InputRoomEvents(
// room - the channel will be quite small as it's just pointer types.
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
r: r,
input: make(chan *inputTask, 32),
input: newFIFOQueue(),
})
worker := w.(*inputWorker)

Expand All @@ -160,7 +163,7 @@ func (r *Inputer) InputRoomEvents(
if worker.running.CAS(false, true) {
go worker.start()
}
worker.input <- tasks[i]
worker.input.push(tasks[i])
}

// Wait for all of the workers to return results about our tasks.
Expand Down
64 changes: 64 additions & 0 deletions roomserver/internal/input/input_fifo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package input

import (
"sync"
)

type fifoQueue struct {
tasks []*inputTask
count int
mutex sync.Mutex
notifs chan struct{}
}

func newFIFOQueue() *fifoQueue {
q := &fifoQueue{
notifs: make(chan struct{}, 1),
}
return q
}

func (q *fifoQueue) push(frame *inputTask) {
q.mutex.Lock()
defer q.mutex.Unlock()
q.tasks = append(q.tasks, frame)
q.count++
select {
case q.notifs <- struct{}{}:
default:
}
}

// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
// You must check this value, even after calling wait().
func (q *fifoQueue) pop() (*inputTask, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count == 0 {
return nil, false
}
frame := q.tasks[0]
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.count--
if q.count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q.tasks = nil
}
return frame, true
}

// wait returns a channel which can be used to detect when an
// item is waiting in the queue.
func (q *fifoQueue) wait() <-chan struct{} {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count > 0 && len(q.notifs) == 0 {
ch := make(chan struct{})
close(ch)
return ch
}
return q.notifs
}

0 comments on commit 7c3991e

Please sign in to comment.