-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathscheduler.lisp
130 lines (114 loc) · 4.63 KB
/
scheduler.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
(in-package #:scheduler)
(defstruct node timestamp task)
(defclass scheduler ()
((name
:initarg :name
:initform nil
:reader sched-name)
(mutex
:reader mutex)
(condition-var
:initform #-ecl (bt:make-condition-variable)
#+ecl (bt-sem:make-semaphore)
:reader condition-var)
(in-queue
:initform (pileup:make-heap #'<= :size 100 :key #'node-timestamp)
:reader in-queue)
(sched-thread
:initform nil
:accessor sched-thread)
(status
:initform :stop
:accessor sched-status)
(ahead
:initarg :sched-ahead
:initform .3
:accessor sched-ahead)
(timestamp
:initarg :timestamp
:initform #'unix-time
:reader timestamp
:documentation
"This Function is get current scheduler time. That must based on seconds.")))
(defmethod initialize-instance :after ((self scheduler) &key)
;;; pilep:heap include lock. so scheduler use that lock.
(with-slots (mutex in-queue) self
#-ecl (setf mutex (slot-value in-queue 'pileup::lock))
#+ecl (setf mutex (bt:make-recursive-lock))))
;;; timed wait -----------------------------------------------------------------------------------------
(defun condition-wait (condition-variable lock)
#-ecl (bt:condition-wait condition-variable lock)
#+ecl
(progn
(bt:release-lock lock)
(unwind-protect (bt-sem:wait-on-semaphore condition-variable)
(bt:acquire-lock lock t))))
(defun condition-timed-wait (condition-variable lock time)
#+sbcl (unless (sb-thread:condition-wait condition-variable lock :timeout time)
(bt:acquire-lock lock t))
#-sbcl
(progn
(bt:release-lock lock)
(unwind-protect
#+ccl (ccl:timed-wait-on-semaphore condition-variable time)
#+ecl(bt-sem:wait-on-semaphore condition-variable :timeout time)
(bt:acquire-lock lock t))))
;;; -----------------------------------------------------------------------------------------------------
(defun sched-time (scheduler)
(funcall (timestamp scheduler)))
(defun sched-quant (scheduler quantized-time &optional (offset-time 0.0d0))
"Return a time which quantized to given a quantized-time."
(let ((time (+ offset-time (sched-time scheduler))))
(+ time (- quantized-time (mod time quantized-time)))))
(defun sched-run (scheduler)
(when (eql (sched-status scheduler) :stop)
(setf (sched-thread scheduler)
(bt:make-thread
(lambda ()
(labels ((run ()
(handler-case
(loop
(loop :while (pileup:heap-empty-p (in-queue scheduler))
:do (condition-wait (condition-var scheduler) (mutex scheduler)))
(loop :while (not (pileup:heap-empty-p (in-queue scheduler)))
:do (let ((timeout (- (node-timestamp (pileup:heap-top (in-queue scheduler))) (sched-time scheduler))))
(unless (plusp timeout) (return))
(condition-timed-wait (condition-var scheduler) (mutex scheduler) timeout)))
(loop :while (and (not (pileup:heap-empty-p (in-queue scheduler)))
(>= (sched-time scheduler) (node-timestamp (pileup:heap-top (in-queue scheduler)))))
:do (funcall (node-task (pileup:heap-pop (in-queue scheduler))))))
(error (c) (format t "~&Error \"~a\" in scheduler thread~%" c)
(run)))))
(set-thread-realtime-priority) ;thread-boost!!
(bt:with-lock-held ((mutex scheduler))
(setf (sched-status scheduler) :running)
(sched-clear scheduler)
(run))))
:name (format nil "~@[~a ~]scheduler thread" (sched-name scheduler))))
:running))
(defun sched-add (scheduler time f &rest args)
"Insert task and time-info to scheduler queue. scheduler have ahead of time value(default to 0.3).
'(- time (sched-ahead scheduler)) is actual time it runs to f."
(bt:with-recursive-lock-held ((mutex scheduler))
(pileup:heap-insert (make-node :timestamp (- time (sched-ahead scheduler))
:task (lambda () (apply f args)))
(in-queue scheduler))
#-ecl (bt:condition-notify (condition-var scheduler))
#+ecl (bt-sem:signal-semaphore (condition-var scheduler)))
(values))
(defun sched-clear (scheduler)
"Clear to scheduler queue."
(bt:with-recursive-lock-held ((mutex scheduler))
(let ((queue (in-queue scheduler)))
(loop :while (not (pileup:heap-empty-p queue))
:do (pileup:heap-pop queue)))
#-ecl (bt:condition-notify (condition-var scheduler))
#+ecl (bt-sem:signal-semaphore (condition-var scheduler)))
(values))
(defun sched-stop (scheduler)
"Stop the scheduler."
(when (eql (sched-status scheduler) :running)
(bt:destroy-thread (sched-thread scheduler))
;; when kill thread by destroy-thread then join-thread not work in sbcl
#-sbcl (bt:join-thread (sched-thread scheduler))
(setf (sched-status scheduler) :stop)))