Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

--bind-to check prioritized. Use iface address instead of loopback if available. #6030

Merged
merged 1 commit into from
Apr 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,27 @@ end
# try to include() a file, ignoring if not found
try_include(path::String) = isfile(path) && include(path)

function init_bind_addr(args::Vector{UTF8String})
# Treat --bind-to in a position independent manner in ARGS since
# --worker, -n and --machinefile options are affected by it
btoidx = findfirst(args, "--bind-to")
if btoidx > 0
bind_addr = parseip(args[btoidx+1])
else
try
bind_addr = getipaddr()
catch
# All networking is unavailable, initialize bind_addr to the loopback address
# Will cause an exception to be raised only when used.
bind_addr = ip"127.0.0.1"
end
end
global LPROC
LPROC.bind_addr = bind_addr
end


function process_options(args::Vector{UTF8String})
global bind_addr
quiet = false
repl = true
startup = true
Expand All @@ -186,8 +205,7 @@ function process_options(args::Vector{UTF8String})
start_worker()
# doesn't return
elseif args[i]=="--bind-to"
i += 1
bind_addr = args[i]
i+=1 # has already been processed
elseif args[i]=="-e" || args[i]=="--eval"
repl = false
i+=1
Expand Down Expand Up @@ -322,6 +340,7 @@ function _start()
early_init()

try
init_bind_addr(ARGS)
any(a->(a=="--worker"), ARGS) || init_head_sched()
init_load_path()
(quiet,repl,startup,color_set,no_history_file) = process_options(copy(ARGS))
Expand Down
160 changes: 100 additions & 60 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,6 @@ end

abstract ClusterManager

type WorkerLocalInfo
# Currently only one field, in the future we may add more fields like
# local OS pid, system info like RAM, CPU type, etc.
#
privipaddr::IpAddr
WorkerLocalInfo() = new(getipaddr())
end

type Worker
host::ByteString
port::Uint16
Expand All @@ -104,21 +96,26 @@ type Worker
add_msgs::Array{Any,1}
id::Int
gcflag::Bool
privhost::ByteString
bind_addr::IpAddr
manage::Function
config::Dict
winfo::WorkerLocalInfo

Worker(host::String, port::Integer, sock::TcpSocket, id::Int) =
new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false, "")
new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false)
end
Worker(host::String, port::Integer, sock::TcpSocket) =
Worker(host, port, sock, 0)
Worker(host::String, port::Integer) =
Worker(host, port, connect(host,uint16(port)))
Worker(host::String, privhost::String, port::Integer, tunnel_user::String, sshflags) =
Worker(host, port, connect("localhost",
ssh_tunnel(tunnel_user, host, privhost, uint16(port), sshflags)))
function Worker(host::String, port::Integer)
w = Worker(host, port, connect(host,uint16(port)))
w.bind_addr = getaddrinfo(host)
w
end
function Worker(host::String, bind_addr::String, port::Integer, tunnel_user::String, sshflags)
w = Worker(host, port, connect("localhost",
ssh_tunnel(tunnel_user, host, bind_addr, uint16(port), sshflags)))
w.bind_addr = parseip(bind_addr)
w
end


function send_msg_now(w::Worker, kind, args...)
Expand Down Expand Up @@ -182,10 +179,11 @@ end

type LocalProcess
id::Int
winfo::WorkerLocalInfo
bind_addr::IpAddr
LocalProcess() = new()
end

const LPROC = LocalProcess(1, WorkerLocalInfo())
const LPROC = LocalProcess()

const map_pid_wrkr = Dict{Int, Union(Worker, LocalProcess)}()
const map_sock_wrkr = ObjectIdDict()
Expand All @@ -211,17 +209,18 @@ type ProcessGroup
end
const PGRP = ProcessGroup({})

getprivipaddr(pid::Integer) = getprivipaddr(worker_from_id(pid))
getprivipaddr(w::Union(Worker, LocalProcess)) = fetchwinfo(w).privipaddr

fetchwinfo(pid::Integer) = fetchwinfo(worker_from_id(pid))
function fetchwinfo(w::Union(Worker, LocalProcess))
# retrieve and cache upon first access
if !isdefined(w, :winfo)
w.winfo = remotecall_fetch(w.id, fetchwinfo, w.id)
get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid))
function get_bind_addr(w::Union(Worker, LocalProcess))
if !isdefined(w, :bind_addr)
if w.id != myid()
w.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id)
else
error("LPROC.bind_addr not defined") # Should never happend since LPROC.bind_addr
# is defined early on during process init.
end
end
w.winfo
end
w.bind_addr
end

function add_workers(pg::ProcessGroup, ws::Array{Any,1})
# NOTE: currently only node 1 can add new nodes, since nobody else
Expand All @@ -232,10 +231,11 @@ function add_workers(pg::ProcessGroup, ws::Array{Any,1})
register_worker(w)
create_message_handler_loop(w.socket)
end
all_locs = map(x -> isa(x, Worker) ? (x.privhost, x.port, x.id) : ("", 0, x.id), pg.workers)

all_locs = map(x -> isa(x, Worker) ? (string(x.bind_addr), x.port, x.id, x.manage == manage_local_worker) : ("", 0, x.id, true), pg.workers)

for w in ws
send_msg_now(w, :join_pgrp, w.id, all_locs)
send_msg_now(w, :join_pgrp, w.id, all_locs, w.manage == manage_local_worker)
end
for w in ws
@schedule begin
Expand All @@ -255,8 +255,16 @@ end

procs() = Int[x.id for x in PGRP.workers]
function procs(pid::Integer)
ipatpid = getprivipaddr(pid)
Int[x.id for x in filter(w -> getprivipaddr(w) == ipatpid, PGRP.workers)]
if myid() == 1
if (pid == 1) || (map_pid_wrkr[pid].manage == manage_local_worker)
Int[x.id for x in filter(w -> (w.id==1) || (w.manage == manage_local_worker), PGRP.workers)]
else
ipatpid = get_bind_addr(pid)
Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)]
end
else
remotecall_fetch(1, procs, pid)
end
end

function workers()
Expand Down Expand Up @@ -854,16 +862,24 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
# first connection; get process group info from client
self_pid = LPROC.id = deserialize(sock)
locs = deserialize(sock)
self_is_local = deserialize(sock)
#print("\nLocation: ",locs,"\nId:",myid(),"\n")
# joining existing process group

register_worker(Worker("", 0, sock, 1))
register_worker(LPROC)

for (rhost, rport, rpid) in locs
for (rhost, rport, rpid, r_is_local) in locs
if (rpid < self_pid) && (!(rpid == 1))
# Connect to them
w = Worker(rhost, rport)
if self_is_local && r_is_local
# If on localhost, use the loopback address - this addresses
# the special case of system suspend wherein the local ip
# may be changed upon system awake.
w = Worker("127.0.0.1", rport)
else
w = Worker(rhost, rport)
end
w.id = rpid
register_worker(w)
create_message_handler_loop(w.socket)
Expand Down Expand Up @@ -919,8 +935,6 @@ end
# argument is descriptor to write listening port # to.
start_worker() = start_worker(STDOUT)
function start_worker(out::IO)
global bind_addr

# we only explicitly monitor worker STDOUT on the console, so redirect
# stderr to stdout so we can see the output.
# at some point we might want some or all worker output to go to log
Expand All @@ -929,14 +943,11 @@ function start_worker(out::IO)
# exit when process 1 shut down. Don't yet know why.
#redirect_stderr(STDOUT)

if !isdefined(Base,:bind_addr)
bind_addr = getipaddr()
end
(actual_port,sock) = listenany(uint16(9009))
sock.ccb = accept_handler
print(out, "julia_worker:") # print header
print(out, "$(dec(actual_port))#") # print port
print(out, bind_addr) #TODO: print hostname
print(out, LPROC.bind_addr)
print(out, '\n')
flush(out)
# close STDIN; workers will not use it
Expand Down Expand Up @@ -973,8 +984,8 @@ function start_cluster_workers(np::Integer, config::Dict, cman::ClusterManager)
elseif insttype == :io_host
read_cb_response(inst) =
begin
(priv_hostname, port) = read_worker_host_port(inst[1])
inst[1], priv_hostname, port, inst[2], inst[3]
(bind_addr, port) = read_worker_host_port(inst[1])
inst[1], bind_addr, port, inst[2], inst[3]
end
elseif insttype == :io_host_port
read_cb_response(inst) = (inst[1], inst[2], inst[3], inst[2], inst[4])
Expand All @@ -985,8 +996,8 @@ function start_cluster_workers(np::Integer, config::Dict, cman::ClusterManager)
end

for i=1:np
(io, privhost, port, pubhost, wconfig) = read_cb_response(instances[i])
ws[i] = create_worker(privhost, port, pubhost, io, wconfig, cman.manage)
(io, bind_addr, port, pubhost, wconfig) = read_cb_response(instances[i])
ws[i] = create_worker(bind_addr, port, pubhost, io, wconfig, cman.manage)
end
ws
end
Expand All @@ -995,14 +1006,14 @@ function read_worker_host_port(io::IO)
io.line_buffered = true
while true
conninfo = readline(io)
private_hostname, port = parse_connection_info(conninfo)
if private_hostname != ""
return private_hostname, port
bind_addr, port = parse_connection_info(conninfo)
if bind_addr != ""
return bind_addr, port
end
end
end

function create_worker(privhost, port, pubhost, stream, config, manage)
function create_worker(bind_addr, port, pubhost, stream, config, manage)
tunnel = config[:tunnel]

s = split(pubhost,'@')
Expand All @@ -1020,12 +1031,11 @@ function create_worker(privhost, port, pubhost, stream, config, manage)

if tunnel
sshflags = config[:sshflags]
w = Worker(pubhost, privhost, port, user, sshflags)
w = Worker(pubhost, bind_addr, port, user, sshflags)
else
w = Worker(pubhost, port)
w = Worker(bind_addr, port)
end

w.privhost = privhost
w.config = config
w.manage = manage

Expand Down Expand Up @@ -1060,10 +1070,10 @@ end
tunnel_port = 9201
# establish an SSH tunnel to a remote worker
# returns P such that localhost:P connects to host:port
function ssh_tunnel(user, host, privhost, port, sshflags)
function ssh_tunnel(user, host, bind_addr, port, sshflags)
global tunnel_port
localp = tunnel_port::Int
while !success(detach(`ssh -f -o ExitOnForwardFailure=yes $sshflags $(user)@$host -L $localp:$privhost:$(int(port)) sleep 60`)) && localp < 10000
while !success(detach(`ssh -f -o ExitOnForwardFailure=yes $sshflags $(user)@$host -L $localp:$bind_addr:$(int(port)) sleep 60`)) && localp < 10000
localp += 1
end

Expand Down Expand Up @@ -1095,7 +1105,7 @@ function launch_local_workers(cman::LocalManager, np::Integer, config::Dict)

# start the processes first...
for i in 1:np
io, pobj = readsfrom(detach(`$(dir)/$(exename) --bind-to 127.0.0.1 $exeflags`))
io, pobj = readsfrom(detach(`$(dir)/$(exename) --bind-to $(LPROC.bind_addr) $exeflags`))
io_objs[i] = io
configs[i] = merge(config, {:process => pobj})
end
Expand Down Expand Up @@ -1123,30 +1133,43 @@ show(io::IO, cman::SSHManager) = println("SSHManager(machines=", cman.machines,
function launch_ssh_workers(cman::SSHManager, np::Integer, config::Dict)
dir = config[:dir]
exename = config[:exename]
exeflags = config[:exeflags]
exeflags_base = config[:exeflags]

io_objs = cell(np)
configs = cell(np)

# start the processes first...

for i in 1:np
# machine could be of the format [user@]host[:port]
machine_def = split(cman.machines[i], ':')
# machine could be of the format [user@]host[:port] bind_addr
machine_bind = split(cman.machines[i])
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags_base`
else
exeflags = exeflags_base
end
machine_def = machine_bind[1]

machine_def = split(machine_def, ':')
portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : ``
config[:sshflags] = `$(config[:sshflags]) $portopt`

sshflags = config[:sshflags]
cman.machines[i] = machine_def[1]
host = cman.machines[i] = machine_def[1]

io, pobj = readsfrom(detach(`ssh -n $sshflags $(machine_def[1]) "sh -l -c \"cd $dir && $exename $exeflags\""`))
# Build up the ssh command
cmd = `cd $dir && $exename $exeflags` # launch julia
cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under
cmd = `ssh -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch

io, pobj = readsfrom(detach(cmd))
io_objs[i] = io
configs[i] = merge(config, {:machine => cman.machines[i]})
end

# ...and then read the host:port info. This optimizes overall start times.
# For ssh, the tunnel connection, if any, has to be with the specified machine name.
# but the port needs to be forwarded to the private hostname/ip-address
# but the port needs to be forwarded to the bound hostname/ip-address
return (:io_host, collect(zip(io_objs, cman.machines, configs)))
end

Expand Down Expand Up @@ -1537,3 +1560,20 @@ function disable_nagle(sock)
end
end
end

function check_same_host(pids)
if myid() != 1
return remotecall_fetch(1, check_same_host, pids)
else
# We checkfirst if all test pids have been started using the local manager,
# else we check for the same bind_to addr. This handles the special case
# where the local ip address may change - as during a system sleep/awake
if all(p -> (p==1) || (map_pid_wrkr[p].manage == manage_local_worker), pids)
return true
else
first_bind_addr = map_pid_wrkr[pids[1]].bind_addr
return all(p -> (p != 1) && (map_pid_wrkr[p].bind_addr == first_bind_addr), pids[2:end])
end
end
end

Loading