Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation for ClusterManagers, tighten function types #17688

Merged
merged 5 commits into from
Jul 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 0 additions & 168 deletions base/docs/helpdb/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -646,16 +646,6 @@ A string giving the literal bit representation of a number.
"""
bits

"""
launch(manager::FooManager, params::Dict, launched::Vector{WorkerConfig}, launch_ntfy::Condition)

Implemented by cluster managers. For every Julia worker launched by this function, it should
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
once all workers, requested by `manager` have been launched. `params` is a dictionary of all
keyword arguments `addprocs` was called with.
"""
launch

"""
invdigamma(x)

Expand Down Expand Up @@ -1569,19 +1559,6 @@ Connect to the named pipe / UNIX domain socket at `path`.
"""
connect(path)

"""
connect(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream)

Implemented by cluster managers using custom transports. It should establish a logical
connection to worker with id `pid`, specified by `config` and return a pair of `AsyncStream`
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
ensure that messages are delivered and received completely and in order.
`Base.connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
workers.
"""
connect(manager, pid::Int, config::WorkerConfig)

"""
mean(v[, region])

Expand Down Expand Up @@ -2918,109 +2895,6 @@ See [`RoundingMode`](:obj:`RoundingMode`) for available rounding modes.
"""
Float64

"""
```
addprocs(n::Integer; exeflags=``) -> List of process identifiers
```

Launches workers using the in-built `LocalManager` which only launches workers on the
local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4
processes on the local machine.
"""
addprocs(n::Integer)

"""
addprocs() -> List of process identifiers

Equivalent to `addprocs(Sys.CPU_CORES)`

Note that workers do not run a `.juliarc.jl` startup script, nor do they synchronize their
global state (such as global variables, new method definitions, and loaded modules) with any
of the other running processes.
"""
addprocs()

"""
```
addprocs(machines; keyword_args...) -> List of process identifiers
```

Add processes on remote machines via SSH. Requires `julia` to be installed in the same
location on each node, or to be available via a shared file system.

`machines` is a vector of machine specifications. Worker are started for each specification.

A machine specification is either a string `machine_spec` or a tuple - `(machine_spec, count)`.

`machine_spec` is a string of the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults
to current user, `port` to the standard ssh port. If `[bind_addr[:port]]` is specified, other
workers will connect to this worker at the specified `bind_addr` and `port`.

`count` is the number of workers to be launched on the specified host. If specified as `:auto`
it will launch as many workers as the number of cores on the specific host.


Keyword arguments:

* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
master process. Default is `false`.

* `sshflags`: specifies additional ssh options, e.g.

```
sshflags=`-i /home/foo/bar.pem`
```

* `max_parallel`: specifies the maximum number of workers connected to in parallel at a host.
Defaults to 10.

* `dir`: specifies the working directory on the workers. Defaults to the host's current
directory (as found by `pwd()`)

* `exename`: name of the `julia` executable. Defaults to `"\$JULIA_HOME/julia"` or
`"\$JULIA_HOME/julia-debug"` as the case may be.

* `exeflags`: additional flags passed to the worker processes.

* `topology`: Specifies how the workers connect to each other. Sending a message
between unconnected workers results in an error.

+ `topology=:all_to_all` : All processes are connected to each other.
This is the default.

+ `topology=:master_slave` : Only the driver process, i.e. pid 1 connects to the
workers. The workers do not connect to each other.

+ `topology=:custom` : The `launch` method of the cluster manager specifes the
connection topology via fields `ident` and `connect_idents` in
`WorkerConfig`. A worker with a cluster manager identity `ident`
will connect to all workers specified in `connect_idents`.


Environment variables :

If the master process fails to establish a connection with a newly launched worker within
60.0 seconds, the worker treats it a fatal situation and terminates. This timeout can be
controlled via environment variable `JULIA_WORKER_TIMEOUT`. The value of
`JULIA_WORKER_TIMEOUT` on the master process, specifies the number of seconds a newly
launched worker waits for connection establishment.
"""
addprocs(machines)

"""
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers

Launches worker processes via the specified cluster manager.

For example Beowulf clusters are supported via a custom cluster manager implemented in
package `ClusterManagers`.

The number of seconds a newly launched worker waits for connection establishment from the
master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's
environment. Relevant only when using TCP/IP as transport.
"""
addprocs(manager::ClusterManager)

"""
mkpath(path, [mode])

Expand Down Expand Up @@ -3946,15 +3820,6 @@ Send a signal to a process. The default is to terminate the process.
"""
kill(p::Process, signum=SIGTERM)

"""
kill(manager::FooManager, pid::Int, config::WorkerConfig)

Implemented by cluster managers. It is called on the master process, by `rmprocs`. It should
cause the remote worker specified by `pid` to exit. `Base.kill(manager::ClusterManager.....)`
executes a remote `exit()` on `pid`
"""
kill(manager, pid::Int, config::WorkerConfig)

"""
sylvester(A, B, C)

Expand Down Expand Up @@ -4441,19 +4306,6 @@ Byte-swap an integer.
"""
bswap

"""
manage(manager::FooManager, pid::Int, config::WorkerConfig. op::Symbol)

Implemented by cluster managers. It is called on the master process, during a worker's
lifetime, with appropriate `op` values:

- with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool.
- with `:interrupt` when `interrupt(workers)` is called. The [`ClusterManager`](:class:`ClusterManager`)
should signal the appropriate worker with an interrupt signal.
- with `:finalize` for cleanup purposes.
"""
manage

"""
resize!(collection, n) -> collection

Expand Down Expand Up @@ -4723,15 +4575,6 @@ Quit the program indicating that the processes completed successfully. This func
"""
quit

"""
init_worker(manager::FooManager)

Called by cluster managers implementing custom transports. It initializes a newly launched
process as a worker. Command line argument `--worker` has the effect of initializing a
process as a worker using TCP/IP sockets for transport.
"""
init_worker

"""
escape_string(io, str::AbstractString, esc::AbstractString)

Expand Down Expand Up @@ -8085,17 +7928,6 @@ true
"""
applicable

"""
Base.process_messages(instrm::AsyncStream, outstrm::AsyncStream)

Called by cluster managers using custom transports. It should be called when the custom
transport implementation receives the first message from a remote worker. The custom
transport must manage a logical connection to the remote worker and provide two
`AsyncStream` objects, one for incoming messages and the other for messages addressed to the
remote worker.
"""
Base.process_messages

"""
RandomDevice()

Expand Down
118 changes: 118 additions & 0 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,65 @@ end
# to be mutually reachable without a tunnel, as is often the case in a cluster.
# Default value of kw arg max_parallel is the default value of MaxStartups in sshd_config
# A machine is either a <hostname> or a tuple of (<hostname>, count)
"""
addprocs(machines; tunnel=false, sshflags=\`\`, max_parallel=10, kwargs...) -> List of process identifiers

Add processes on remote machines via SSH. Requires `julia` to be installed in the same
location on each node, or to be available via a shared file system.

`machines` is a vector of machine specifications. Workers are started for each specification.

A machine specification is either a string `machine_spec` or a tuple - `(machine_spec, count)`.

`machine_spec` is a string of the form `[user@]host[:port] [bind_addr[:port]]`. `user` defaults
to current user, `port` to the standard ssh port. If `[bind_addr[:port]]` is specified, other
workers will connect to this worker at the specified `bind_addr` and `port`.

`count` is the number of workers to be launched on the specified host. If specified as `:auto`
it will launch as many workers as the number of cores on the specific host.

Keyword arguments:

* `tunnel`: if `true` then SSH tunneling will be used to connect to the worker from the
master process. Default is `false`.

* `sshflags`: specifies additional ssh options, e.g.
```sshflags=\`-i /home/foo/bar.pem\` ```

* `max_parallel`: specifies the maximum number of workers connected to in parallel at a host.
Defaults to 10.

* `dir`: specifies the working directory on the workers. Defaults to the host's current
directory (as found by `pwd()`)

* `exename`: name of the `julia` executable. Defaults to `"\$JULIA_HOME/julia"` or
`"\$JULIA_HOME/julia-debug"` as the case may be.

* `exeflags`: additional flags passed to the worker processes.

* `topology`: Specifies how the workers connect to each other. Sending a message
between unconnected workers results in an error.

+ `topology=:all_to_all` : All processes are connected to each other.
This is the default.

+ `topology=:master_slave` : Only the driver process, i.e. pid 1 connects to the
workers. The workers do not connect to each other.

+ `topology=:custom` : The `launch` method of the cluster manager specifes the
connection topology via fields `ident` and `connect_idents` in
`WorkerConfig`. A worker with a cluster manager identity `ident`
will connect to all workers specified in `connect_idents`.


Environment variables :

If the master process fails to establish a connection with a newly launched worker within
60.0 seconds, the worker treats it as a fatal situation and terminates.
This timeout can be controlled via environment variable `JULIA_WORKER_TIMEOUT`.
The value of JULIA_WORKER_TIMEOUT` on the master process specifies the number of seconds a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mismatched backticks

newly launched worker waits for connection establishment.
"""
function addprocs(machines::AbstractVector; tunnel=false, sshflags=``, max_parallel=10, kwargs...)
check_addprocs_args(kwargs)
addprocs(SSHManager(machines); tunnel=tunnel, sshflags=sshflags, max_parallel=max_parallel, kwargs...)
Expand Down Expand Up @@ -223,7 +282,25 @@ immutable LocalManager <: ClusterManager
restrict::Bool # Restrict binding to 127.0.0.1 only
end

"""
addprocs(; kwargs...) -> List of process identifiers

Equivalent to `addprocs(Sys.CPU_CORES; kwargs...)`

Note that workers do not run a `.juliarc.jl` startup script, nor do they synchronize their
global state (such as global variables, new method definitions, and loaded modules) with any
of the other running processes.
"""
addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)

"""
addprocs(np::Integer; restrict=true, kwargs...) -> List of process identifiers

Launches workers using the in-built `LocalManager` which only launches workers on the
local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4
processes on the local machine. If `restrict` is `true`, binding is restricted to
`127.0.0.1`.
"""
function addprocs(np::Integer; restrict=true, kwargs...)
check_addprocs_args(kwargs)
addprocs(LocalManager(np, restrict); kwargs...)
Expand Down Expand Up @@ -256,6 +333,28 @@ function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Sy
end
end

"""
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Implemented by cluster managers. For every Julia worker launched by this function, it should
append a `WorkerConfig` entry to `launched` and notify `launch_ntfy`. The function MUST exit
once all workers, requested by `manager` have been launched. `params` is a dictionary of all
keyword arguments `addprocs` was called with.
"""
launch

"""
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Implemented by cluster managers. It is called on the master process, during a worker's
lifetime, with appropriate `op` values:

- with `:register`/`:deregister` when a worker is added / removed from the Julia worker pool.
- with `:interrupt` when `interrupt(workers)` is called. The [`ClusterManager`](:class:`ClusterManager`)
should signal the appropriate worker with an interrupt signal.
- with `:finalize` for cleanup purposes.
"""
manage

# DefaultClusterManager for the default TCP transport - used by both SSHManager and LocalManager

Expand All @@ -264,6 +363,17 @@ end

const tunnel_hosts_map = Dict{AbstractString, Semaphore}()

"""
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Implemented by cluster managers using custom transports. It should establish a logical
connection to worker with id `pid`, specified by `config` and return a pair of `IO`
objects. Messages from `pid` to current process will be read off `instrm`, while messages to
be sent to `pid` will be written to `outstrm`. The custom transport implementation must
ensure that messages are delivered and received completely and in order.
`Base.connect(manager::ClusterManager.....)` sets up TCP/IP socket connections in-between
workers.
"""
function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)
if !isnull(config.connect_at)
# this is a worker-to-worker setup call.
Expand Down Expand Up @@ -389,6 +499,14 @@ function connect_to_worker(host::AbstractString, bind_addr::AbstractString, port
(s, bind_addr)
end


"""
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Implemented by cluster managers. It is called on the master process, by `rmprocs`. It should
cause the remote worker specified by `pid` to exit. `Base.kill(manager::ClusterManager.....)`
executes a remote `exit()` on `pid`
"""
function kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
remote_do(exit, pid) # For TCP based transports this will result in a close of the socket
# at our end, which will result in a cleanup of the worker.
Expand Down
Loading