Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Handle context being closed better #8

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 72 additions & 15 deletions timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,20 @@ type Timers struct {
// from outside of it on multitple timers. And it is easier to just use this then redo half the work it does
// to make that safe
taskQueue *taskqueue.TaskQueue
// used to synchronize around context closing
taskQueueCh chan struct{}
}

var (
_ modules.Module = &RootModule{}
_ modules.Instance = &Timers{}
)

const (
setTimeoutName = "setTimeout"
setIntervalName = "setInterval"
)

// New returns a pointer to a new RootModule instance.
func New() *RootModule {
return &RootModule{}
Expand Down Expand Up @@ -92,8 +99,7 @@ func (e *Timers) clearTimeout(id uint64) {

func (e *Timers) freeEventLoopIfPossible() {
if e.queue.length() == 0 && e.taskQueue != nil {
e.taskQueue.Close()
e.taskQueue = nil
e.closeTaskQueue()
}
}

Expand Down Expand Up @@ -138,24 +144,27 @@ func (e *Timers) timerInitialization(
return err
}

e.runAfterTimeout(timeout, task, id)
name := setTimeoutName
if repeat {
name = setIntervalName
}

e.runAfterTimeout(&timer{
id: id,
task: task,
nextTrigger: time.Now().Add(time.Duration(timeout * float64(time.Millisecond))),
name: name,
})
}

// https://html.spec.whatwg.org/multipage/timers-and-user-prompts.html#run-steps-after-a-timeout
// Notes:
// orderingId is not really used in this case
// id is also required for us unlike how it is defined. Maybe in the future if this moves to core it will be expanded
func (e *Timers) runAfterTimeout(timeout float64, task func() error, id uint64) {
delay := time.Duration(timeout * float64(time.Millisecond))
triggerTime := time.Now().Add(delay)
e.timers[id] = triggerTime
// Notes: this just takes timers as makes the implementation way easier and we do not currently need
// most of the functionality provided
func (e *Timers) runAfterTimeout(t *timer) {
e.timers[t.id] = t.nextTrigger

// as we have only one orderingId we have one queue
index := e.queue.add(&timer{
id: id,
task: task,
nextTrigger: triggerTime,
})
index := e.queue.add(t)

if index != 0 {
return // not a timer at the very beginning
Expand Down Expand Up @@ -186,18 +195,66 @@ func (e *Timers) setupTaskTimeout() {
delay := -time.Since(e.timers[e.queue.first().id])
if e.taskQueue == nil {
e.taskQueue = taskqueue.New(e.vu.RegisterCallback)
e.setupTaskQueueCloserOnIterationEnd()
}
q := e.taskQueue
e.queue.head = time.AfterFunc(delay, func() {
q.Queue(e.runFirstTask)
})
}

func (e *Timers) closeTaskQueue() {
// this only runs on the event loop
if e.taskQueueCh == nil {
return
}
ch := e.taskQueueCh
// so that we do not execute it twice
e.taskQueueCh = nil

// wait for this to happen so we don't need to hit the event loop again
// instead this just closes the queue
ch <- struct{}{}
<-ch
}

func (e *Timers) setupTaskQueueCloserOnIterationEnd() {
ctx := e.vu.Context()
q := e.taskQueue
ch := make(chan struct{})
e.taskQueueCh = ch
go func() {
select { // wait for one of the two
case <-ctx.Done():
// lets report timers won't be executed and clean the fields for the next execution
// we need to do this on the event loop as we don't want to have a race
q.Queue(func() error {
logger := e.vu.State().Logger
for _, timer := range e.queue.queue {
logger.Warnf("%s %d was stopped because the VU iteration was interrupted", timer.name, timer.id)
}

// TODO: use `clear` when we only support go 1.21 and above
e.timers = make(map[uint64]time.Time)
e.queue = new(timerQueue)
e.taskQueue = nil
return nil
})
case <-ch:
e.taskQueue = nil
close(ch)
}
e.queue.stopTimer()
q.Close()
}()
}

// this is just a small struct to keep the internals of a timer
type timer struct {
id uint64
nextTrigger time.Time
task func() error
name string
}

// this is just a list of timers that should be ordered once after the other
Expand Down
Loading