Skip to content

Commit

Permalink
move Distributed to stdlib
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Nov 25, 2017
1 parent f8243b7 commit e3c89ad
Show file tree
Hide file tree
Showing 46 changed files with 848 additions and 772 deletions.
45 changes: 10 additions & 35 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -287,24 +287,10 @@ function process_options(opts::JLOptions)
# remove filename from ARGS
global PROGRAM_FILE = arg_is_program ? shift!(ARGS) : ""

# startup worker.
# opts.startupfile, opts.load, etc should should not be processed for workers.
if opts.worker == 1
# does not return
if opts.cookie != C_NULL
start_worker(unsafe_string(opts.cookie))
else
start_worker()
end
end

# add processors
if opts.nprocs > 0
addprocs(opts.nprocs)
end
# load processes from machine file
if opts.machinefile != C_NULL
addprocs(load_machine_file(unsafe_string(opts.machinefile)))
# Load Distributed module only if any of the Distributed options have been specified.
if (opts.worker == 1) || (opts.nprocs > 0) || (opts.machinefile != C_NULL)
eval(Main, :(using Distributed))
invokelatest(Main.Distributed.process_opts, opts)
end

# load ~/.juliarc file
Expand All @@ -319,8 +305,12 @@ function process_options(opts::JLOptions)
println()
elseif cmd == 'L'
# load file immediately on all processors
@sync for p in procs()
@async remotecall_wait(include, p, Main, arg)
if nprocs() == 1
include(Main, arg)
else
@sync for p in invokelatest(Main.procs)
@async invokelatest(Main.remotecall_wait, include, p, Main, arg)
end
end
end
end
Expand Down Expand Up @@ -349,21 +339,6 @@ function load_juliarc()
nothing
end

function load_machine_file(path::AbstractString)
machines = []
for line in split(read(path, String),'\n'; keep=false)
s = split(line, '*'; keep = false)
map!(strip, s, s)
if length(s) > 1
cnt = isnumber(s[1]) ? parse(Int,s[1]) : Symbol(s[1])
push!(machines,(s[2], cnt))
else
push!(machines,line)
end
end
return machines
end

import .Terminals
import .REPL

Expand Down
36 changes: 36 additions & 0 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,42 @@ export conv, conv2, deconv, filt, filt!, xcorr
@deprecate_moved PollingFileWatcher "FileWatching" true true
@deprecate_moved watch_file "FileWatching" true true
@deprecate_moved FileMonitor "FileWatching" true true
# FIXME : This causes problems with the `myid` in loading.jl
#@deprecate_binding Distributed nothing true ", run `using Distributed` instead"

@eval @deprecate_moved $(Symbol("@spawn")) "Distributed" true true
@eval @deprecate_moved $(Symbol("@spawnat")) "Distributed" true true
@eval @deprecate_moved $(Symbol("@fetch")) "Distributed" true true
@eval @deprecate_moved $(Symbol("@fetchfrom")) "Distributed" true true
@eval @deprecate_moved $(Symbol("@everywhere")) "Distributed" true true
@eval @deprecate_moved $(Symbol("@parallel")) "Distributed" true true

@deprecate_moved addprocs "Distributed" true true
@deprecate_moved CachingPool "Distributed" true true
@deprecate_moved clear! "Distributed" true true
@deprecate_moved ClusterManager "Distributed" true true
@deprecate_moved default_worker_pool "Distributed" true true
@deprecate_moved init_worker "Distributed" true true
@deprecate_moved interrupt "Distributed" true true
@deprecate_moved launch "Distributed" true true
@deprecate_moved manage "Distributed" true true
@deprecate_moved nworkers "Distributed" true true
@deprecate_moved pmap "Distributed" true true
@deprecate_moved procs "Distributed" true true
@deprecate_moved remote "Distributed" true true
@deprecate_moved remotecall "Distributed" true true
@deprecate_moved remotecall_fetch "Distributed" true true
@deprecate_moved remotecall_wait "Distributed" true true
@deprecate_moved remote_do "Distributed" true true
@deprecate_moved rmprocs "Distributed" true true
@deprecate_moved workers "Distributed" true true
@deprecate_moved WorkerPool "Distributed" true true
@deprecate_moved RemoteChannel "Distributed" true true
@deprecate_moved Future "Distributed" true true
@deprecate_moved WorkerConfig "Distributed" true true
@deprecate_moved RemoteException "Distributed" true true
@deprecate_moved ProcessExitedException "Distributed" true true


@deprecate_moved crc32c "CRC32c" true true

Expand Down
3 changes: 0 additions & 3 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ end
Block the current task until some event occurs, depending on the type of the argument:
* [`RemoteChannel`](@ref) : Wait for a value to become available on the specified remote
channel.
* [`Future`](@ref) : Wait for a value to become available for the specified future.
* [`Channel`](@ref): Wait for a value to be appended to the channel.
* [`Condition`](@ref): Wait for [`notify`](@ref) on a condition.
* `Process`: Wait for a process or process chain to exit. The `exitcode` field of a process
Expand Down
38 changes: 3 additions & 35 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export
Markdown,
Threads,
Iterators,
Distributed,

# Types
AbstractChannel,
Expand Down Expand Up @@ -1262,38 +1261,7 @@ export
nzrange,
nnz,

# Distributed module re-exports
@spawn,
@spawnat,
@fetch,
@fetchfrom,
@everywhere,
@parallel,

addprocs,
CachingPool,
clear!,
ClusterManager,
default_worker_pool,
init_worker,
interrupt,
launch,
manage,
# Minimal set of Distributed exports - useful for a program to check if running
# in distributed mode or not.
myid,
nprocs,
nworkers,
pmap,
procs,
remote,
remotecall,
remotecall_fetch,
remotecall_wait,
remote_do,
rmprocs,
workers,
WorkerPool,
RemoteChannel,
Future,
WorkerConfig,
RemoteException,
ProcessExitedException
nprocs
10 changes: 5 additions & 5 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ end
# require always works in Main scope and loads files from node 1
const toplevel_load = Ref(true)

myid() = isdefined(Main, :Distributed) ? invokelatest(Main.Distributed.myid) : 1
nprocs() = isdefined(Main, :Distributed) ? invokelatest(Main.Distributed.nprocs) : 1

"""
require(module::Symbol)
Expand All @@ -301,12 +304,9 @@ function require(mod::Symbol)
# After successfully loading, notify downstream consumers
if toplevel_load[] && myid() == 1 && nprocs() > 1
# broadcast top-level import/using from node 1 (only)
@sync for p in procs()
@sync for p in invokelatest(Main.procs)
p == 1 && continue
@async remotecall_wait(p) do
require(mod)
nothing
end
@async invokelatest(Main.remotecall_wait, ()->(require(mod); nothing), p)
end
end
for callback in package_callbacks
Expand Down
Loading

0 comments on commit e3c89ad

Please sign in to comment.