Skip to content

Commit

Permalink
Add a ThreadPool class to the threads package.
Browse files Browse the repository at this point in the history
The new class is documented in Appendix C of the Unicon book. Tests have been
added to tests/threads to test priority queuing and the queuing of methods.
  • Loading branch information
Don-Ward committed Nov 24, 2024
1 parent c90c3be commit 6b1ff1e
Show file tree
Hide file tree
Showing 9 changed files with 850 additions and 5 deletions.
18 changes: 15 additions & 3 deletions doc/book/thread.tex
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,7 @@ \subsection*{Private communication channels}
}

\subsection*{A simple thread pool}
\label{SimpleThreadPool}
In some cases the explicit creation of a thread for each concurrent activity is
the simplest and most transparent way of writing the program, especially if the
threads need access to the local variables of the procedure that created them.
Expand All @@ -1450,7 +1451,7 @@ \subsection*{A simple thread pool}
\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1.5in][l]{\texttt{MakePool(n)}\vspace{0.75in}}
Create a pool of \texttt{n} worker threads. The default value for \texttt{n} is
2 + the number of processors reported in \texttt{\&features}.There is usually
2 + the number of processors reported in \texttt{\&features}. There is usually
not much to be gained by having many more active threads than the number of
available processors (unless a significant number are idle, waiting for an event
to happen).
Expand Down Expand Up @@ -1489,8 +1490,19 @@ \subsection*{A simple thread pool}

The thread pool is minimalist by design. There are a number of extra facilities
that could, perhaps, be added --- cancellation of a task, place a task at the
front of the queue, rather than the rear --- but these are left as an exercise
for the reader who needs them.
front of the queue, rather than the rear. % --- but these are left as an exercise
%for the reader who needs them.
A \texttt{ThreadPool} class, which is also in the \texttt{threads} package
and described on page \pageref{ThreadPoolClass}, provides some extra
functionality. The main features are
\begin{itemize}
\item
It allows several pools to be created, with different characteristics.
\item
Tasks may have a priority that controls whereabouts in the queue the
task is placed: higher priority tasks will be started before lower
priority tasks.
\end{itemize}

\subsection*{Thread-local storage}
It is notable that thread--local storage can be implemented in Unicon without
Expand Down
137 changes: 137 additions & 0 deletions doc/book/ucl.tex
Original file line number Diff line number Diff line change
Expand Up @@ -956,3 +956,140 @@ \section{GUI Classes}
\texttt{set\_which\_one(x)} sets the currently displayed OverlayItem;
default is the first.\\
\texttt{add(c:OverlayItem)} adds the given OverlayItem to the OverlaySet.

\newpage
\section{The \texttt{ThreadPool} class}
\label{ThreadPoolClass}
The \texttt{ThreadPool} class is a development of the original (global) thread pool
in the \texttt{threads} package described on page \pageref{SimpleThreadPool}.
The interface is broadly the same, with methods in place of the procedures of
the global pool, plus extra methods for additional functionality.
The main extra facilities provided are
\begin{itemize}
\item
More than one pool may be created with different characteristics -- number
of worker threads, initial string, block and stack size, priority queuing
(or not).
\item
Tasks may have an optional numerical priority (real or integer) that controls
the position that the task is placed in the queue to be dispatched.
\item
Worker threads may be added after the pool is created. One possible use
is to create a pool with no workers, build up a body of work by adding
tasks to the queue in an arbitrary order (but using priority to control
the position in the queue) and, once all the tasks have been added, add
worker threads to execute the tasks.
\end{itemize}
Note that the task priority is not a runtime execution priority: it only
controls the order in which tasks are started.

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[2.5in][l]{\texttt{ThreadPool(n, pr, blk, str, stk)}\vspace{0.75in}}

Create an instance of a thread pool with \texttt{n} worker threads (\texttt{n}
can be zero which means create no threads).
If \texttt{n} is null the number of threads will be the default of 2 + the
number of processors reported in \texttt{\&features}.

If \texttt{pr} is negative then tasks with lower numerical values of priority
will be dequeued before those having a higher value; i.e. \texttt{0} is the most
important.
%
If \texttt{pr} is positive then tasks with a higher numerical priority will be
dequeued before those with a lower value.
%
If \texttt{pr} is zero the pool has no priority.
%
If \texttt{pr} is not supplied, its default value is \texttt{-1}.
%
Tasks with no value of priority are dequeued after tasks with a priority,
irrespective of the value of \texttt{pr}.

\texttt{blk}, \texttt{str} and \texttt{stk} are optional and provide non-default
values for the block space, string space, and stack space for each worker
thread.\\

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1.5in][l]{\texttt{MakePool(n)}\vspace{0.75in}}

Create \texttt{n} worker threads and add them to the pool. If \texttt{n} is
zero or absent, the default number will be added. The threads will have the
amount of stack space, block space and string space specified in the arguments
supplied to the class constructor.

%\pagebreak[2]
\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[2.5in][l]{\texttt{Dispatch([pr,] proc, params, ...)}\vspace{0.75in}}

Queue a task to be executed by a thread from the pool. If a thread is available
the procedure will be called immediately with the supplied parameters, otherwise
it will be called when a thread becomes available. If \texttt{pr} is not
supplied, The task has the lowest possible priority. The position in the queue
depends on whether the pool was created with priority or without.
If the pool has priority queueing, the task will be placed after any higher
priority tasks and before any lower priority tasks.
If the pool has no priority a ``simple'' rule applies: tasks with a positive
priority will be placed at the front (LIFO) and zero (or no) priority tasks will
be placed at the back (FIFO).

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[2.5in][l]{\texttt{DispatchMethod([pr,] obj, name, params, ...)}\vspace{0.75in}}

Queue a method (given by a class instance and a method name) to queue a task to
be executed by a thread from the pool. The treatment of the optional priority
value is the same as \texttt{Dispatch}.

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1in][l]{\texttt{Active()}\vspace{0.75in}}

Returns the number of worker threads in the pool that are not idle.\\

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1in][l]{\texttt{Capacity()}\vspace{0.75in}}

Returns the number of worker threads in the pool.\\

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1in][l]{\texttt{Cancel()}\vspace{0.75in}}

Removes all tasks from the queue. Tasks that are active will continue.

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1in][l]{\texttt{isIdle()}\vspace{0.75in}}

Succeeds if no worker threads are active and there are no tasks in the queue.\\

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1.5in][l]{\texttt{ClosePool()}\vspace{0.75in}}

Shuts down the pool after remaining tasks have finished
(including those that are in the queue). \texttt{ClosePool} does not return
until the pool has been shut down and all the threads have finished, which
provides a simple way of synchronizing the concurrent activities with the
controller thread (often \texttt{\&main}).
\texttt{ClosePool} should not be called from a thread in the pool: in particular,
\texttt{DispatchMethod(self,ClosePool)} will deadlock because the task will be
waiting for itself to terminate.\\

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1.5in][l]{\texttt{Running(depth)}\vspace{0.75in}}

Returns a list of the procedures that are being run by the active worker threads
(i.e. the procedures that were supplied to \texttt{Dispatch}) The stack of each
worker is searched up to a limit of \texttt{depth} frames for a variable that is
known to exist in the top level procedure of a worker thread, so a task with deeply
nested procedures might be missed. The default value for \texttt{depth} is 10.\\

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1.5in][l]{\texttt{Snapshot(depth)}\vspace{0.75in}}

Calls \texttt{Running(depth)}, formats the result and prints it out to the
standard error output.

\bigskip\hrule\vspace{0.1cm}
\noindent\makebox[1.5in][l]{\texttt{DefaultPoolSize()}\vspace{0.75in}}

Returns the default number of threads in a pool (also available by calling the
\linebreak% forbid hyphenation of default_pool_size
\texttt{default\_pool\_size()} procedure.

106 changes: 106 additions & 0 deletions tests/thread/methodinvocation.icn
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Test that methods can be queued to a thread pool by using CallMethod and DispatchMethod
#
# Don Ward
# November 2024

import threads

global f

# --------------------------------------------------------------------------------
# A test class that adds to and subtracts from an integer total.
# It expects n requests (given as a parameter to the constructor) and after
# that many requests a listener thread sends the total to the main thread.
class AddSub (total, mtx, listener)

method add(n)
write(f, "Doing add ", n)
critical mtx: total +:= n
@>> \listener
end

method sub(n)
write(f, "Doing subtract ", n)
critical mtx: total -:= n
@>> \listener
end

method result(n)
while 0 <= (n -:= 1) do <<@ # Wait until add or sub have been called n times
total @>> &main
end

initially(n)
listener := thread CallMethod(self, "result", n)
mtx := mutex()
total := 0
end


# --------------------------------------------------------------------------------
procedure main ( args )
local AS, TP, total := 0, tasks, val, theAnswer
local resultFile := "./local/methodinvocation-results.out"

if /(f := open(resultFile, "w")) then {
write(&errout, "Cannot open ", resultFile, " ( ", &errortext, " )")
f := &errout
}

if not (tasks := numeric(\args[1])) then tasks := 50
AS := AddSub(tasks)
TP := ThreadPool(0, 0) # A pool with no priority and no workers

# Add the specified number of add or sub requests to the task queue.
# Split them approximately evenly between add and sub.
# Also split them between using DispatchMethod(...) and Dispatch(CallMethod, ....)
every 1 to tasks do {
val := ?100
if 0 = val%2 then {
write(f, "adding ", val)
total +:= val
if val > 50 then TP.Dispatch(CallMethod, AS, "add", val) else TP.DispatchMethod(AS, "add", val)
} else {
write(f, "subtracting ", val)
total -:= val
if val < 50 then TP.Dispatch(CallMethod, AS, "sub", val) else TP.DispatchMethod(AS, "sub", val)
}
}

PrQ(TP, "Before MakePool()", f)

TP.MakePool(0) # Add the default number of threads to the pool
TP.ClosePool() # Wait for them to finish
theAnswer := <<@ # and for the final total

if theAnswer = total then {
write(&errout, "No errors")
} else {
write(&errout, "Total = ", total, " The answer = ", theAnswer)
}
end

procedure PrQ(p, mess, f)
local n, t, proc, np, priority

writes(f, \mess, ": ")
write(f, "No. of tasks ", n := *(p.ToDo))
every t := p.ToDo[1 to n] do {
priority := t[1]
if numeric(priority) then {
proc := t[2]; np := *t - 2
} else {
proc := priority; priority := &null; np := *t -1
}
writes(f, left(\priority | "none",6))
writes(f, image(proc), " ", left(np || plural(" parameter", np), 15))
writes(f, " [")
n := if \priority then 3 else 2
every writes(f, " ", image(t[n to *t]))
write(f, " ]")
}
end

procedure plural (s,n )
if n = 1 then return s else return s || "s"
end
1 change: 1 addition & 0 deletions tests/thread/stand/methodinvocation.std
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
No errors
1 change: 1 addition & 0 deletions tests/thread/stand/testprioritypool-down.std
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
no errors
1 change: 1 addition & 0 deletions tests/thread/stand/testprioritypool-up.std
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
no errors
Loading

0 comments on commit 6b1ff1e

Please sign in to comment.