From 6b1ff1ed8816aa39765afb4c7d63a732ffcf256a Mon Sep 17 00:00:00 2001 From: Don Ward Date: Sun, 24 Nov 2024 12:38:51 +0000 Subject: [PATCH] Add a ThreadPool class to the threads package. The new class is documented in Appendix C of the Unicon book. Tests have been added to tests/threads to test priority queuing and the queuing of methods. --- doc/book/thread.tex | 18 +- doc/book/ucl.tex | 137 ++++++++ tests/thread/methodinvocation.icn | 106 ++++++ tests/thread/stand/methodinvocation.std | 1 + tests/thread/stand/testprioritypool-down.std | 1 + tests/thread/stand/testprioritypool-up.std | 1 + tests/thread/testprioritypool-down.icn | 132 ++++++++ tests/thread/testprioritypool-up.icn | 132 ++++++++ uni/lib/thread.icn | 327 ++++++++++++++++++- 9 files changed, 850 insertions(+), 5 deletions(-) create mode 100644 tests/thread/methodinvocation.icn create mode 100644 tests/thread/stand/methodinvocation.std create mode 100644 tests/thread/stand/testprioritypool-down.std create mode 100644 tests/thread/stand/testprioritypool-up.std create mode 100644 tests/thread/testprioritypool-down.icn create mode 100644 tests/thread/testprioritypool-up.icn diff --git a/doc/book/thread.tex b/doc/book/thread.tex index b1c62dea5..65736d8f0 100644 --- a/doc/book/thread.tex +++ b/doc/book/thread.tex @@ -1439,6 +1439,7 @@ \subsection*{Private communication channels} } \subsection*{A simple thread pool} +\label{SimpleThreadPool} In some cases the explicit creation of a thread for each concurrent activity is the simplest and most transparent way of writing the program, especially if the threads need access to the local variables of the procedure that created them. @@ -1450,7 +1451,7 @@ \subsection*{A simple thread pool} \bigskip\hrule\vspace{0.1cm} \noindent\makebox[1.5in][l]{\texttt{MakePool(n)}\vspace{0.75in}} Create a pool of \texttt{n} worker threads. The default value for \texttt{n} is -2 + the number of processors reported in \texttt{\&features}.There is usually +2 + the number of processors reported in \texttt{\&features}. There is usually not much to be gained by having many more active threads than the number of available processors (unless a significant number are idle, waiting for an event to happen). @@ -1489,8 +1490,19 @@ \subsection*{A simple thread pool} The thread pool is minimalist by design. There are a number of extra facilities that could, perhaps, be added --- cancellation of a task, place a task at the -front of the queue, rather than the rear --- but these are left as an exercise -for the reader who needs them. +front of the queue, rather than the rear. % --- but these are left as an exercise +%for the reader who needs them. +A \texttt{ThreadPool} class, which is also in the \texttt{threads} package +and described on page \pageref{ThreadPoolClass}, provides some extra +functionality. The main features are +\begin{itemize} +\item + It allows several pools to be created, with different characteristics. +\item + Tasks may have a priority that controls whereabouts in the queue the + task is placed: higher priority tasks will be started before lower + priority tasks. +\end{itemize} \subsection*{Thread-local storage} It is notable that thread--local storage can be implemented in Unicon without diff --git a/doc/book/ucl.tex b/doc/book/ucl.tex index 0bca9fe36..72a774dcb 100644 --- a/doc/book/ucl.tex +++ b/doc/book/ucl.tex @@ -956,3 +956,140 @@ \section{GUI Classes} \texttt{set\_which\_one(x)} sets the currently displayed OverlayItem; default is the first.\\ \texttt{add(c:OverlayItem)} adds the given OverlayItem to the OverlaySet. + +\newpage +\section{The \texttt{ThreadPool} class} +\label{ThreadPoolClass} +The \texttt{ThreadPool} class is a development of the original (global) thread pool +in the \texttt{threads} package described on page \pageref{SimpleThreadPool}. +The interface is broadly the same, with methods in place of the procedures of +the global pool, plus extra methods for additional functionality. +The main extra facilities provided are +\begin{itemize} +\item + More than one pool may be created with different characteristics -- number + of worker threads, initial string, block and stack size, priority queuing + (or not). +\item + Tasks may have an optional numerical priority (real or integer) that controls + the position that the task is placed in the queue to be dispatched. +\item + Worker threads may be added after the pool is created. One possible use + is to create a pool with no workers, build up a body of work by adding + tasks to the queue in an arbitrary order (but using priority to control + the position in the queue) and, once all the tasks have been added, add + worker threads to execute the tasks. +\end{itemize} +Note that the task priority is not a runtime execution priority: it only +controls the order in which tasks are started. + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[2.5in][l]{\texttt{ThreadPool(n, pr, blk, str, stk)}\vspace{0.75in}} + +Create an instance of a thread pool with \texttt{n} worker threads (\texttt{n} +can be zero which means create no threads). +If \texttt{n} is null the number of threads will be the default of 2 + the +number of processors reported in \texttt{\&features}. + +If \texttt{pr} is negative then tasks with lower numerical values of priority +will be dequeued before those having a higher value; i.e. \texttt{0} is the most +important. +% +If \texttt{pr} is positive then tasks with a higher numerical priority will be +dequeued before those with a lower value. +% +If \texttt{pr} is zero the pool has no priority. +% +If \texttt{pr} is not supplied, its default value is \texttt{-1}. +% +Tasks with no value of priority are dequeued after tasks with a priority, +irrespective of the value of \texttt{pr}. + +\texttt{blk}, \texttt{str} and \texttt{stk} are optional and provide non-default +values for the block space, string space, and stack space for each worker +thread.\\ + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1.5in][l]{\texttt{MakePool(n)}\vspace{0.75in}} + +Create \texttt{n} worker threads and add them to the pool. If \texttt{n} is +zero or absent, the default number will be added. The threads will have the +amount of stack space, block space and string space specified in the arguments +supplied to the class constructor. + +%\pagebreak[2] +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[2.5in][l]{\texttt{Dispatch([pr,] proc, params, ...)}\vspace{0.75in}} + +Queue a task to be executed by a thread from the pool. If a thread is available +the procedure will be called immediately with the supplied parameters, otherwise +it will be called when a thread becomes available. If \texttt{pr} is not +supplied, The task has the lowest possible priority. The position in the queue +depends on whether the pool was created with priority or without. +If the pool has priority queueing, the task will be placed after any higher +priority tasks and before any lower priority tasks. +If the pool has no priority a ``simple'' rule applies: tasks with a positive +priority will be placed at the front (LIFO) and zero (or no) priority tasks will +be placed at the back (FIFO). + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[2.5in][l]{\texttt{DispatchMethod([pr,] obj, name, params, ...)}\vspace{0.75in}} + +Queue a method (given by a class instance and a method name) to queue a task to +be executed by a thread from the pool. The treatment of the optional priority +value is the same as \texttt{Dispatch}. + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1in][l]{\texttt{Active()}\vspace{0.75in}} + +Returns the number of worker threads in the pool that are not idle.\\ + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1in][l]{\texttt{Capacity()}\vspace{0.75in}} + +Returns the number of worker threads in the pool.\\ + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1in][l]{\texttt{Cancel()}\vspace{0.75in}} + +Removes all tasks from the queue. Tasks that are active will continue. + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1in][l]{\texttt{isIdle()}\vspace{0.75in}} + +Succeeds if no worker threads are active and there are no tasks in the queue.\\ + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1.5in][l]{\texttt{ClosePool()}\vspace{0.75in}} + +Shuts down the pool after remaining tasks have finished +(including those that are in the queue). \texttt{ClosePool} does not return +until the pool has been shut down and all the threads have finished, which +provides a simple way of synchronizing the concurrent activities with the +controller thread (often \texttt{\&main}). +\texttt{ClosePool} should not be called from a thread in the pool: in particular, +\texttt{DispatchMethod(self,ClosePool)} will deadlock because the task will be +waiting for itself to terminate.\\ + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1.5in][l]{\texttt{Running(depth)}\vspace{0.75in}} + +Returns a list of the procedures that are being run by the active worker threads +(i.e. the procedures that were supplied to \texttt{Dispatch}) The stack of each +worker is searched up to a limit of \texttt{depth} frames for a variable that is +known to exist in the top level procedure of a worker thread, so a task with deeply +nested procedures might be missed. The default value for \texttt{depth} is 10.\\ + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1.5in][l]{\texttt{Snapshot(depth)}\vspace{0.75in}} + +Calls \texttt{Running(depth)}, formats the result and prints it out to the +standard error output. + +\bigskip\hrule\vspace{0.1cm} +\noindent\makebox[1.5in][l]{\texttt{DefaultPoolSize()}\vspace{0.75in}} + +Returns the default number of threads in a pool (also available by calling the +\linebreak% forbid hyphenation of default_pool_size +\texttt{default\_pool\_size()} procedure. + diff --git a/tests/thread/methodinvocation.icn b/tests/thread/methodinvocation.icn new file mode 100644 index 000000000..547452914 --- /dev/null +++ b/tests/thread/methodinvocation.icn @@ -0,0 +1,106 @@ +# Test that methods can be queued to a thread pool by using CallMethod and DispatchMethod +# +# Don Ward +# November 2024 + +import threads + +global f + +# -------------------------------------------------------------------------------- +# A test class that adds to and subtracts from an integer total. +# It expects n requests (given as a parameter to the constructor) and after +# that many requests a listener thread sends the total to the main thread. +class AddSub (total, mtx, listener) + + method add(n) + write(f, "Doing add ", n) + critical mtx: total +:= n + @>> \listener + end + + method sub(n) + write(f, "Doing subtract ", n) + critical mtx: total -:= n + @>> \listener + end + + method result(n) + while 0 <= (n -:= 1) do <<@ # Wait until add or sub have been called n times + total @>> &main + end + + initially(n) + listener := thread CallMethod(self, "result", n) + mtx := mutex() + total := 0 +end + + +# -------------------------------------------------------------------------------- +procedure main ( args ) + local AS, TP, total := 0, tasks, val, theAnswer + local resultFile := "./local/methodinvocation-results.out" + + if /(f := open(resultFile, "w")) then { + write(&errout, "Cannot open ", resultFile, " ( ", &errortext, " )") + f := &errout + } + + if not (tasks := numeric(\args[1])) then tasks := 50 + AS := AddSub(tasks) + TP := ThreadPool(0, 0) # A pool with no priority and no workers + + # Add the specified number of add or sub requests to the task queue. + # Split them approximately evenly between add and sub. + # Also split them between using DispatchMethod(...) and Dispatch(CallMethod, ....) + every 1 to tasks do { + val := ?100 + if 0 = val%2 then { + write(f, "adding ", val) + total +:= val + if val > 50 then TP.Dispatch(CallMethod, AS, "add", val) else TP.DispatchMethod(AS, "add", val) + } else { + write(f, "subtracting ", val) + total -:= val + if val < 50 then TP.Dispatch(CallMethod, AS, "sub", val) else TP.DispatchMethod(AS, "sub", val) + } + } + + PrQ(TP, "Before MakePool()", f) + + TP.MakePool(0) # Add the default number of threads to the pool + TP.ClosePool() # Wait for them to finish + theAnswer := <<@ # and for the final total + + if theAnswer = total then { + write(&errout, "No errors") + } else { + write(&errout, "Total = ", total, " The answer = ", theAnswer) + } +end + +procedure PrQ(p, mess, f) + local n, t, proc, np, priority + + writes(f, \mess, ": ") + write(f, "No. of tasks ", n := *(p.ToDo)) + every t := p.ToDo[1 to n] do { + priority := t[1] + if numeric(priority) then { + proc := t[2]; np := *t - 2 + } else { + proc := priority; priority := &null; np := *t -1 + } + writes(f, left(\priority | "none",6)) + writes(f, image(proc), " ", left(np || plural(" parameter", np), 15)) + writes(f, " [") + n := if \priority then 3 else 2 + every writes(f, " ", image(t[n to *t])) + write(f, " ]") + } +end + +procedure plural (s,n ) + if n = 1 then return s else return s || "s" +end diff --git a/tests/thread/stand/methodinvocation.std b/tests/thread/stand/methodinvocation.std new file mode 100644 index 000000000..4269126fc --- /dev/null +++ b/tests/thread/stand/methodinvocation.std @@ -0,0 +1 @@ +No errors diff --git a/tests/thread/stand/testprioritypool-down.std b/tests/thread/stand/testprioritypool-down.std new file mode 100644 index 000000000..481748b61 --- /dev/null +++ b/tests/thread/stand/testprioritypool-down.std @@ -0,0 +1 @@ +no errors diff --git a/tests/thread/stand/testprioritypool-up.std b/tests/thread/stand/testprioritypool-up.std new file mode 100644 index 000000000..481748b61 --- /dev/null +++ b/tests/thread/stand/testprioritypool-up.std @@ -0,0 +1 @@ +no errors diff --git a/tests/thread/testprioritypool-down.icn b/tests/thread/testprioritypool-down.icn new file mode 100644 index 000000000..db263ed64 --- /dev/null +++ b/tests/thread/testprioritypool-down.icn @@ -0,0 +1,132 @@ +# Queue a series of tasks to a priority thread pool and make sure they +# are executed in reverse numerical order of priority (lowest first). +# +# Don Ward +# November 2024 + +import threads + +global priority, order +global errs +global tests + +$define LOWEST_PR 999999999 +$define HIGHEST_PR 0 + +procedure main ( args ) + local pr, TP, n := 50, i := 0 + local f, resultFile := "./local/tpp-down--queue.out" + + errs := tests := 0 + TP := ThreadPool(0) # Create a priority pool with no workers + + if /(f := open(resultFile, "w")) then { + write(&errout, "Cannot open ", resultFile, " ( ", &errortext, " )") + f := &errout + } + + if *args > 0 then { # Queue a series of tasks in the priority order specified + priority := LOWEST_PR + n := *args + every pr := !args do { + # pr is a string. We must convert it to a number because (believe it or not) + # max("-7", 0) returns "-7" + if pr := numeric(pr) then { + priority >:= pr + # Special case -ve priorities in the argument to P to avoid a false failure + TP.Dispatch(pr, P, max(pr,0), i+:=1) + } else { + TP.Dispatch(P,LOWEST_PR, i+:=1) + priority >:= 0 + } + } + PrQ(TP, "Task list", &output) + } else { # Queue a series of tasks with random priorities + priority := 100 + writes(f, "Tasks: ") + every i:= 1 to n do { + if 0 = (i%10) then writes(f, "\n ") + pr := ?priority + # Arbitrarily treat some priorities as zero + if 11 <= pr <= 21 then { + writes(f, " (", pr, ")") + TP.Dispatch(P, LOWEST_PR, i, pr) + } else if 42 <= pr <= 52 then { + writes(f, " null(", pr, ")") + TP.Dispatch(&null, P, LOWEST_PR, i, pr) + } else { + writes(f, " ", pr) + TP.Dispatch(pr, P, pr, i) + } + } + write(f) + + PrQ(TP, "Task list", f) + } + + priority := HIGHEST_PR + order := 0 + + TP.MakePool(1) # Add a single worker thread, so tasks are done one at a time. + TP.ClosePool() # Wait for every task to finish + + if tests ~= n then { + write(&errout, n - tests, " tests not performed") + } else if errs > 0 then { + write(&errout, errs, " errors") + write(&errout, "see ", resultFile, " for details of the task queue") + exit(1) + } else { + write(&errout, "no errors") + exit(0) + } + +end + +# ---------------------------------------------------------------------- +# Check that the priority is monotically non decreasing +# Don't need mutual exclusion: there is only one thread in the pool. +procedure P(x, sequence) + if x < priority then { + write(&errout, "Priority inversion: old = ", priority, " new = ", x) + errs +:= 1 + } + if (x = priority) & (sequence < order) then { + write(&errout, "non FIFO at : priority = ", x) + errs +:= 1 + } + priority := x + order := sequence + tests +:= 1 +end + +# ---------------------------------------------------------------------- +# Print the work queue of the pool to a file +# In the output the first parameter is the priority, the second is the order +# of adding to the queue. If there is a third parameter, it is the original +# priority before setting it to zero. Priority "none" meaans Dispatch was used. +procedure PrQ(p, mess, f) + local n, t, proc, np, priority + + writes(f, \mess, ": ") + write(f, "No. of tasks ", n := *(p.ToDo)) + every t := p.ToDo[1 to n] do { + priority := t[1] + if numeric(priority) then { + proc := t[2]; np := *t - 2 + } else { + proc := priority; priority := &null; np := *t -1 + } + writes(f, left(\priority | "none",6)) + writes(f, image(proc), " ", left(np || plural(" parameter", np), 15)) + writes(f, " [") + n := if \priority then 3 else 2 + every writes(f, " ", image(t[n to *t])) + write(f, " ]") + } +end + +procedure plural (s,n ) + if n = 1 then return s else return s || "s" +end + diff --git a/tests/thread/testprioritypool-up.icn b/tests/thread/testprioritypool-up.icn new file mode 100644 index 000000000..a627b2758 --- /dev/null +++ b/tests/thread/testprioritypool-up.icn @@ -0,0 +1,132 @@ +# Queue a series of tasks to a priority thread pool and make sure they +# are executed in numerical order of priority (highest first). +# +# Don Ward +# November 2024 + +import threads + +global priority, order +global errs +global tests + +$define LOWEST_PR 0 +$define HIGHEST_PR 999999999 + +procedure main ( args ) + local pr, TP, n := 50, i := 0 + local f, resultFile := "./local/tpp-up--queue.out" + + errs := tests := 0 + TP := ThreadPool(0 , 1) # Create a priority pool with no workers + + if /(f := open(resultFile, "w")) then { + write(&errout, "Cannot open ", resultFile, " ( ", &errortext, " )") + f := &errout + } + + if *args > 0 then { # Queue a series of tasks in the priority order specified + priority := HIGHEST_PR + n := *args + every pr := !args do { + # pr is a string. We must convert it to a number because (believe it or not) + # max("-7", 0) returns "-7" + if pr := numeric(pr) then { + priority <:= pr + # Special case -ve priorities in the argument to P to avoid a false failure + TP.Dispatch(pr, P, max(pr,0), i+:=1) + } else { + TP.Dispatch(P, LOWEST_PR, i+:=1) + priority <:= 0 + } + } + PrQ(TP, "Task list", &output) + } else { # Queue a series of tasks with random priorities + priority := 100 + writes(f, "Tasks: ") + every i:= 1 to n do { + if 0 = (i%10) then writes(f, "\n ") + pr := ?priority + # Arbitrarily treat some priorities as zero + if 11 <= pr <= 21 then { + writes(f, " (", pr, ")") + TP.Dispatch(P, LOWEST_PR, i, pr) + } else if 42 <= pr <= 52 then { + writes(f, " null(", pr, ")") + TP.Dispatch(&null, P, LOWEST_PR, i, pr) + } else { + writes(f, " ", pr) + TP.Dispatch(pr, P, pr, i) + } + } + write(f) + + PrQ(TP, "Task list", f) + } + + priority := HIGHEST_PR + order := 0 + + TP.MakePool(1) # Add a single worker thread, so tasks are done one at a time. + TP.ClosePool() # Wait for every task to finish + + if tests ~= n then { + write(&errout, n - tests, " tests not performed") + } else if errs > 0 then { + write(&errout, errs, " errors") + write(&errout, "see ", resultFile, " for details of the task queue") + exit(1) + } else { + write(&errout, "no errors") + exit(0) + } + +end + +# ---------------------------------------------------------------------- +# Check that the priority is monotically non increasing. +# Don't need mutual exclusion: there is only one thread in the pool. +procedure P(x, sequence) + if x > priority then { + write(&errout, "Priority inversion: old = ", priority, " new = ", x) + errs +:= 1 + } + if (x = priority) & (sequence < order) then { + write(&errout, "non FIFO at : priority = ", x) + errs +:= 1 + } + priority := x + order := sequence + tests +:= 1 +end + +# ---------------------------------------------------------------------- +# Print the work queue of the pool to a file +# In the output the first parameter is the priority, the second is the order +# of adding to the queue. If there is a third parameter, it is the original +# priority before setting it to zero. Priority "none" meaans Dispatch was used. +procedure PrQ(p, mess, f) + local n, t, proc, np, priority + + writes(f, \mess, ": ") + write(f, "No. of tasks ", n := *(p.ToDo)) + every t := p.ToDo[1 to n] do { + priority := t[1] + if numeric(priority) then { + proc := t[2]; np := *t - 2 + } else { + proc := priority; priority := &null; np := *t -1 + } + writes(f, left(\priority | "none",6)) + writes(f, image(proc), " ", left(np || plural(" parameter", np), 15)) + writes(f, " [") + n := if \priority then 3 else 2 + every writes(f, " ", image(t[n to *t])) + write(f, " ]") + } +end + +procedure plural (s,n ) + if n = 1 then return s else return s || "s" +end + diff --git a/uni/lib/thread.icn b/uni/lib/thread.icn index cdc0b17fb..de79ff823 100644 --- a/uni/lib/thread.icn +++ b/uni/lib/thread.icn @@ -7,8 +7,8 @@ # package threads - - +import lang +link ximage # # create a communication channel with thread x # @@ -504,6 +504,329 @@ procedure ClosePool() return #success end +# ------------------------------------------------------------------------------- +# Default no of threads = 2 + number of cores +# We only need to do this calculation once. +procedure default_pool_size() + static n + initial { + &features ? { ="CPU cores " & n := 2 + ::tab(0) } + } + return n +end + +# ------------------------------------------------------------------------------- +# A development of the global pool (above) which allows +# Separate pools with different characteristics (no of threads, stack sizes etc.). +# Queueing tasks based on priority. +class ThreadPool( + ToDo, # A List of tasks to execute. Each task is itself a list where + # the first element is a procedure to call and the rest are its parameters. + # In a pool with priority, the priority is prepended to the list. + Idlers, # A count of waiting worker threads + WorkCv, # A condition variable for "there is work to do" + Workforce, # The collection of all worker threads for this pool. + Work, # A mutex protecting the above + strSpace, # Initial allocation of string space for each thread + blkSpace, # Initial allocation of block space for each thread + stkSpace, # Initial allocation of stack space for each thread + havePr # If non zero, tasks are queued in priority order + # -1 (default) lower values are more important (dequeued first) + # +1 higher values are more important + ) + + #-------------------------------------------------------------------------------- + # Construct a pool of n worker threads with the stack, string and block allocations + # specified by the parameters of the class constructor. + # Subsequent calls add workers to the pool. + method MakePool(n: integer: 0) + if n <= 0 then n := default_pool_size() + # Create the requested number of workers + critical Work: { + if /strSpace & /blkSpace & /stkSpace then { # Use default allocation + every 1 to n do ::put(Workforce, thread classworker(self)) + } else { + every 1 to n do ::put(Workforce, + ::spawn(create(classworker(self)), + blkSpace, strSpace, stkSpace)) + } + } + return self + end + + #-------------------------------------------------------------------------------- + # Return the number of worker threads in the pool + method Capacity() + return *Workforce + end + + #-------------------------------------------------------------------------------- + # Return the number of non-idle threads at this moment + method Active() + return *Workforce - Idlers + end + + # -------------------------------------------------------------------------------- + # Dispatch a method described by class instance and method name + # Can have an optional priority before, and optional arguments after + method DispatchMethod(args[]) + local obj, name, pr + + pr := args[1] + if numeric(pr) then { + pop(args); push(args, CallMethod); push(args,pr) + } else { + push(args, CallMethod) + } + + Dispatch ! args + end + + #-------------------------------------------------------------------------------- + # Add a task (described by an optional priority plus procedure plus optional parameters) + # to the list of tasks to be executed by (one of) the pool of worker threads. + # The priority controls whereabouts in the queue the task is placed. + # The priorities must be numeric and non negative (but can be real or integer or a mixture). + method Dispatch(args[]) + local priority, pos, lo, hi, pr, p + + pr := args[1] + if ::type(pr) == "procedure" then { + pr := &null # No priority supplied + } else { + if ::type(args[2]) ~== "procedure" then fail + if \pr then if (not numeric(pr)) | (pr < 0) then fail + } + + if havePr = 0 then { # This is a pool without priority + # In a non-priority pool, interpret pr (if supplied) as follows: + # null or =0 Insert task at the back + # >0 Insert task at the front + critical Work: (if \pr > 0 then ::push else ::put) (ToDo, args) + } else { # A pool with priority + # The task will be placed in the work queue ahead of all tasks that are less important + # and behind all tasks that are more important. Tasks with the same priority are in FIFO order. + # Importance is determined by the value of havePr: + # -1: lower priority values are more important + # +1: higher priority values are more important + # + # Note that if there is more than one worker thread (which will be the normal case) the + # order in which tasks are started cannot be guaranteed: They will be dequeued in order but + # execution order is dependent on the system thread scheduler. + + if /pr then { # no priority is the least important of all, irrespective of havePr. + if type(args[1]) ~== "procedure" then pop(args) + critical Work: ::put(ToDo, args) + } else { + # to guarantee FIFO within priority, subtract a tiny amount so that the task ends up + # behind all its siblings that have the same priority. Note that what "subtraction" means + # is dependent on whether low values of priority mean more important (the default) or + # the other way around. + pr := pr - (havePr * 1E-12) + ::lock(Work) + case *ToDo of { + 0: { # easy + ::push(ToDo, args) + } + 1: { # simple + if comparePR(ToDo[1][1], pr) = -1 then { + ::push(ToDo, args) + } else { + ::put(ToDo, args) + } + } + default: { # Binary search for position in the queue + lo := 1; hi := *ToDo; + while lo <= hi do { + pos := lo + (hi -lo)/2 + priority := ToDo[pos][1] + case comparePR(priority, pr) of { + -1: { hi := pos - 1 } + 1: { lo := pos + 1 } + 0: { break } + } + } + + if comparePR(priority,pr) >= 0 then pos+:= 1 + ::insert(ToDo, pos, args) + } + } + ::unlock(Work) + } + } + ::signal(WorkCv) # Wake up the workforce + return # success + end + + # ---------------------------------------- + # Internal method to compare priorities + # returns + # -1 p1 less important than p2 + # 0 p1 and p2 are equally important + # +1 p1 more important than p2 + method comparePR(p1,p2) + # A procedure value is less than any numerical value irrespective of havePr + if type(p1) == "procedure" then { + if type(p2) == "procedure" then return 0 else return -1 + } else if type(p2) == "procedure" then { + return 1 + } else { + if havePr < 0 then { + if p1 < p2 then return 1 + else if p1 > p2 then return -1 + } else { + if p1 > p2 then return 1 + else if p1 < p2 then return -1 + } + return 0 + } + end + + #-------------------------------------------------------------------------------- + # Empty the work queue + method Cancel() + local n + critical Work: { + n := *ToDo + if n > 0 then { + if n > 20 then { # 20 is a guess at when creating a new queue is more efficient + ToDo := ::mutex([], Work) + } else { # re-use the queue + every 1 to n do ::pull(ToDo) + } + } + } + end + + #-------------------------------------------------------------------------------- + # Fail if there is work in progress, or waiting to be done. + method IsIdle() + ::lock(Work) + if ((Idlers ~= *\Workforce) | (*ToDo > 0)) then { ::unlock(Work); fail } + + ::unlock(Work) + return # success + end + + #-------------------------------------------------------------------------------- + # Shut up shop. Remaining work placed in the queue before the call of ClosePool + # will be executed before closure. Fails if all work has not been done. + method ClosePool() + every 1 to *Workforce do Dispatch(stopWork) + every ::wait(!Workforce) + + # At this point, Idlers must be zero. + if Idlers ~= 0 then { + ::write(&errout, "Warning (ClosePool): Forcing count of waiting threads to zero") + Idlers := 0 + } + + Workforce := &null + + # There ought to be no work to do, unless Dispatch() has been + # called in parallel with ClosePool() + if *ToDo > 0 then { + ::write(&errout, "Warning (ClosePool): Some work was left in the queue") + ToDo := ::mutex([], Work) # Create a new work queue for a subsequent MakePool() + fail + } + return #success + end + + #-------------------------------------------------------------------------------- + # Print out running tasks + # If mutual exclusion fails, carry on anyway because this procedure can + # sometimes be used under error conditions when the pool is jammed. + method Snapshot(msg, depth) + local haveLock, n, p, plist + + haveLock := ::trylock(Work) + if 0 < *(plist := Running(depth)) then ::write(\msg) + every ::write(&errout, " ", ::image(!plist)) + ::unlock(\haveLock) + end + + #-------------------------------------------------------------------------------- + # Return a list of the procedures used by running tasks by searching + # for the variable taskProc (declared in the classworker procedure) up to + # depth levels above the current stack frame. + method Running(depth) + local d, haveLock, runner, runners := [] + + /depth := 10 + haveLock := ::trylock(Work) + every runner := !Workforce do { + every d := 1 to depth do { + if put(runners, ::variable("taskProc", runner, d)) then { break } + } + } + ::unlock(\haveLock) + return runners + end + + # ------------------------------------------------------------------------------- + method DefaultPoolSize() + return default_pool_size() + end + + #-------------------------------------------------------------------------------- + initially(n, pr, blkSp, strSp, stkSp) + # reduce pr to -1 or 0 or +1 with a default of -1 + havePr := if /pr | pr < 0 then -1 else if pr > 0 then 1 else 0 + strSpace := \strSp + blkSpace := \blkSp + stkSpace := \stkSp + Work := ::mutex() + WorkCv := ::condvar(Work) + ToDo := ::mutex([], Work) + Idlers := 0 + Workforce := [] + + # Allow a pool with no workers. Can be useful if creating a series of tasks + # (with or without priority) and then, once the series is complete, setting it + # all running with an explicit call of MakePool. + if (/n | n~= 0) then MakePool(n) +end + + +#-------------------------------------------------------------------------------- +# work thread: repeatedly get a task, which is a list [proc, param1, param2 ....] +# and call the procedure with the supplied parameters. +# The task may have a priority as the first element of the list. +procedure classworker(mySelf) + local task + local taskProc # Don't change the name of taskProc without changing Running() + + repeat { + ::lock(mySelf.WorkCv) + mySelf.Idlers +:= 1 + while 0 = *(mySelf.ToDo) do ::wait(mySelf.WorkCv) + task := ::pop(mySelf.ToDo) # Remove the next task from the queue + mySelf.Idlers -:= 1 + ::unlock(mySelf.WorkCv) + + # Discard the priority (if any) and recover the procedure placed by Dispatch() + taskProc := ::pop(task) + if numeric(taskProc) then taskProc := ::pop(task) + + if (taskProc === stopWork) then { return } else { taskProc ! task } + task := taskProc := &null # enable any GC to clear up after previous task + } +end + +# -------------------------------------------------------------------------------- +# A delegation procedure to call a method +procedure CallMethod (obj, methodName, args[] ) + local p + + if p := lang::find_method(obj, methodName) then { + ::push(args, obj) + return p ! args + } else { + write(&errout, "Cannot find ", methodName) + } +end + #-------------------------------------------------------------------------------- # Thread-local storage procedure TLS(var)