Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

[wip] FairQ: An implementation of K8s Workqueue #21

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions k8s/fairqueue/dedupe_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package fairqueue

// This is a non thread-safe version of of the workqueue.Type. It has been liberally copied
// with some functions eliminated. The thread-safe version should be built on top of this implementation.
type dedupingQueue struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t

// dirty defines all of the items that need to be processed.
dirty set

// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
}

type empty struct{}
type t interface{}
type set map[t]empty

func (s set) has(item t) bool {
_, exists := s[item]
return exists
}

func (s set) insert(item t) {
s[item] = empty{}
}

func (s set) delete(item t) {
delete(s, item)
}

// Add marks item as needing processing.
func (q *dedupingQueue) Add(item interface{}) {
if q.dirty.has(item) {
return
}

q.dirty.insert(item)
if q.processing.has(item) {
return
}

q.queue = append(q.queue, item)
}

// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *dedupingQueue) Len() int {
return len(q.queue)
}

// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *dedupingQueue) Get() (item interface{}) {
if len(q.queue) == 0 {
return nil
}

item, q.queue = q.queue[0], q.queue[1:]

q.processing.insert(item)
q.dirty.delete(item)

return item
}

// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing. Returns if the item was re-added for processing.
func (q *dedupingQueue) Done(item interface{}) bool {
if q.processing.has(item) {
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
return true
}
}
return false
}

func newDedupingQueue() *dedupingQueue {
return &dedupingQueue{
dirty: set{},
processing: set{},
}
}
98 changes: 98 additions & 0 deletions k8s/fairqueue/dedupe_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package fairqueue

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDedupingQueue(t *testing.T) {
t.Run("get-empty", func(t *testing.T) {

q := newDedupingQueue()
assert.Equal(t, nil, q.Get())
})

t.Run("add-normal", func(t *testing.T) {
q := newDedupingQueue()
assert.False(t, q.dirty.has("x"))
q.Add("x")
assert.True(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 1, len(q.queue))
})

t.Run("add-when-processing", func(t *testing.T) {
q := newDedupingQueue()
q.Add("x")
assert.True(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 1, len(q.queue))
assert.Equal(t, 1, q.Len())

assert.Equal(t, "x", q.Get())

assert.False(t, q.dirty.has("x"))
assert.True(t, q.processing.has("x"))
assert.Equal(t, 0, len(q.queue))
assert.Equal(t, 0, q.Len())

q.Add("x")
assert.True(t, q.dirty.has("x"))
assert.True(t, q.processing.has("x"))
assert.Equal(t, 0, len(q.queue))
assert.Equal(t, 0, q.Len())

q.Done("x")
assert.True(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 1, len(q.queue))
assert.Equal(t, 1, q.Len())

})

t.Run("add-when-dirty", func(t *testing.T) {
q := newDedupingQueue()
q.Add("x")
assert.True(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 1, len(q.queue))
assert.Equal(t, 1, q.Len())

q.Add("x")
assert.True(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 1, len(q.queue))
assert.Equal(t, 1, q.Len())

q.Done("x")
assert.True(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 1, len(q.queue))
assert.Equal(t, 1, q.Len())

})

t.Run("done-processing-one", func(t *testing.T) {
q := newDedupingQueue()
q.Add("x")
assert.True(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 1, len(q.queue))
assert.Equal(t, 1, q.Len())

assert.Equal(t, "x", q.Get())

assert.False(t, q.dirty.has("x"))
assert.True(t, q.processing.has("x"))
assert.Equal(t, 0, len(q.queue))
assert.Equal(t, 0, q.Len())

q.Done("x")
assert.False(t, q.dirty.has("x"))
assert.False(t, q.processing.has("x"))
assert.Equal(t, 0, len(q.queue))
assert.Equal(t, 0, q.Len())

})
}
4 changes: 4 additions & 0 deletions k8s/fairqueue/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package fairqueue

// A Queue implementation that provides fairness of work execution across namespaces. Beneficial in guaranteeing that a
// resource in a namespace will not be affected by other noisy namespaces.
103 changes: 103 additions & 0 deletions k8s/fairqueue/indexed_circular_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package fairqueue

import "container/ring"

// This is an implementation of a circular buffer using the container/ring package in golang
// This buffer is specialized to allow accessing a specific entry by its key.
// Iterating through the buffer using the Next method is stateful and always returns the sequentially next item in the buffer
type IndexedCircularBuffer struct {
head *ring.Ring
current *ring.Ring
index map[string]*ring.Ring
}

// Returns sequentially next item in the Circular buffer. This method is stateful
func (c *IndexedCircularBuffer) Next() interface{} {
if c.current.Next() == c.head {
c.current = c.head
}
v := c.current.Next()
c.current = v
return v.Value
}

// Returns the value in the circular buffer that matches the given Key. If the key is not found, returns a false
func (c *IndexedCircularBuffer) Get(key string) (interface{}, bool) {
v, ok := c.index[key]
if !ok {
return nil, false
}
return v.Value, true
}

// Returns the value in the circular buffer that matches the given Key and true indicating that it existed.
// If the key is not found, uses the provided function to get a default item, adds it to the buffer and returns it with a false
// indicating that the value was created
func (c *IndexedCircularBuffer) GetOrDefault(key string, defaultItemGetter func() interface{}) (interface{}, bool) {
v, ok := c.index[key]
if !ok {
i := defaultItemGetter()
c.Add(key, i)
return i, false
}
return v.Value, true
}

// Adds a new Key,item at the last position in the buffer in an order preserving way
func (c *IndexedCircularBuffer) Add(key string, item interface{}) bool {
if _, ok := c.index[key]; ok {
return false
}
r := ring.New(1)
r.Value = item
c.index[key] = r
last := c.head.Prev()
last.Link(r)
r.Link(c.head)
return true
}

// Returns a length of the circular buffer
func (c *IndexedCircularBuffer) Len() int {
return len(c.index)
}

// Iterates over all the elements in the circular buffer in the order of insertion
func (c *IndexedCircularBuffer) Range(do func(v interface{}) bool) {
for ptr := c.head.Next(); ptr != c.head; ptr = ptr.Next() {
if do(ptr.Value) == false {
return
}
}
}

// Checks if the current is pointing to the first element in the buffer
func (c *IndexedCircularBuffer) IsCurrentAtHead() bool {
return c.current == c.head
}

// Iterates through the buffer order-preserving. Stops iterating when the provided function returns false or
func (c *IndexedCircularBuffer) RangeNext(do func(v interface{}) bool) {
ptr := c.current.Next()
for ; ptr != c.current; ptr = ptr.Next() {
if ptr != c.head {
if do(ptr.Value) == false {
// Move the current to the iterated location
c.current = ptr
return
}
}
}
if ptr != c.head {
_ = do(ptr.Value)
}
}

func NewIndexedCircularBuffer() *IndexedCircularBuffer {
head := ring.New(1)
return &IndexedCircularBuffer{
head: head,
current: head,
index: map[string]*ring.Ring{},
}
}
Loading