Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: User definable transports using ClusterManagers #9046

Closed
wants to merge 3 commits into from
Closed

RFC: User definable transports using ClusterManagers #9046

wants to merge 3 commits into from

Conversation

amitmurthy
Copy link
Contributor

This is an attempt to make it possible to use alternate transport mechanisms for a Julia cluster.

NOTE: multi.jl has been split into manager.jl and multi.jl. The former has all code related to connection
setup and cluster managers, while the latter has the multi-processing part.

Concept

Currently, all nodes are setup in a mesh network with each node connected to every other node via TCP sockets.

From a scaling point of view, in some cases, a star configuration or even a tree configuration may be preferred.

This patch makes it possible to, for example,

  • Use 0MQ for transport. For example, use a 0MQ router in the middle for a star configuration.
  • Better integrate with MPI.
    • Use MPI as transport, or
    • Have the same set of Julia processes participate in both the MPI cluster as well as the Julia cluster. This will allow for folks
      to use Julia libraries that leverage MPI as well as the regular Julia parallel infrastructure

NOTE: I am unfamiliar with MPI, so the MPI comments are based on a sketchy knowledge of MPI.

As an example, say we write a ZMQClusterManager and we interconnect thousands of julia processes
via a 0MQ router into a star network.

So while a Julia worker connects to the 0MQ router with a single 0MQ socket,
the interface between ZMQClusterManager and multi.jl is via a pair of "StreamBuffer"s. A "StreamBuffer" is a
non-OS AsyncStream. It essentially wraps a PipeBuffer and read/write Condition variables to
provide waitable in-process stream objects.

While the implementaion may have similarities with a Channel{Uint8} (#8507),
it is different in the sense that StreamBuffers implement AsynsStream, while Channel
implements single element put!, take!, etc.

Implementation

Custom cluster managers should implement the following methods:

launch - launches multiple worker processes
connect_m2w - sets up connections between the master and workers. Optional.
connect_w2w - sets up connections between the workers. Optional.
manage - bookkeeping / interrupting workers

A default implementation of connect_m2w and connect_w2w using TCpSocket is provided in Base.
NOTE: Custom implementations which return "StreamBuffer"s may not even setup any actual "connection"s.
They may just associate the Julia pid with the appropriate routing information.

Base provides:

launch{LocalManager}
launch{SSHManager}
connect_m2w{ClusterManager}
connect_w2w{ClusterManager}
manage{LocalManager}
manage{SSHManager}

The control flow starts with an addprocs(np::Int; kwargs...) where one of the
keyword args is manager which defaults to LocalManager()

  • addprocs calls launch(::Type, config::Dict, launched::Array, c::Condition) to start julia workers.
    The launch method, which is run in a separate task, should

    • launch and add a Dict to launched::Array for every worker launched. This will typically be a copy of
      config with additional fields that may be required to connect to the worker.
    • notify the Condition variable c, as and when a worker is launched, so that connections to the launched workers can be setup in parallel

    Base provides two launch methods :

    launch{T<:LocalManager}(::Type{T}, .... and launch{T<:SSHManager}(::Type{T}, ....

  • Next, connect_m2w(::Type, pid::Int, config::Dict) is called to setup
    connections between the master and workers. It is called for each and every
    config returned by the launch method above. config also has an additional
    key :pid which is the pid of the worker.

    connect_m2w should return a tuple, (read_stream::AsyncStream, write_stream::AsyncStream).
    It can also add any additonal key-value pairs to config.

    Any bytes read from read_stream must have been sent from process pid
    Any bytes written to write_stream should be delivered to process pid

    Base provides a default implementation which uses regular TCP sockets to
    setup the mesh network :
    connect{T<:ClusterManager}(::Type{T}, ....)

    Both read_stream and write_stream are the same TCPSocket in this case.

    connect{T<:ClusterManager} expects either :io or :host/ :port to be defined
    in config, where io is the stdout of the launched worker.

    If :io is defined, it is used to read the host/port information that is printed by the worker.
    If :host is defined, it overrides the host printed from reading io.

    Both launch and connect_m2w could add a key :connect_at. The value of this
    field is sent to all workers to be used in worker-to-worker connection setups.

  • connect_w2w(::Type, pid::Int, config::Dict) is called to setup worker-to-worker connections.
    Since this is called from the workers, very limited config information is available.

    config in this case has a field :connect_at which can be used by the
    cluster managers's connect_w2w implementation to connect to pid.

  • manage(::Type{T}, id::Integer, config::Dict, op::Symbol) is called from the master
    process only with op either of :register, :deregister, :interrupt and "finalize

startup and specifying the cluster manager in workers

  • worker processes are started with --worker <cluster_manager> where cluster_manager
    is the type of cluster manager.
  • ClusterManagers implementing their own transport, should launch Julia workers using
    the -e or program options.
  • Base.start_worker{T<:ClusterManager}(::Type{T}, out::IO) is the current entry point
    for a worker, which listens on a free port and prints out connection information.
  • custom ClusterManagers are free to write their own entry points, which can be called
    after loading the cluster manager using -e or or program options.
  • custom ClusterManagers should call process_messages(r_stream::AsyncStream, w_stream::AsyncStream; kwargs...),
    once for every incoming connection/data from another node.

TODO

  • Implement StreamBuffer
  • Reference implementation of a ClusterManager using StreamBuffers
  • Documentation

@amitmurthy
Copy link
Contributor Author

Currently, the worker knows about its pid, only when it receives the first message. This leads to issues like

  • we cannot create RemoteRefs on the worker till the first message is received.
  • a custom transport layer does not know the worker pid at launch time. Especially true if we want launch to be able to start as many workers as there are cores on a remote host.

We can address this by exporting Base.get_next_pid() for the launch methods to call, and adding a command line argument --pid <pid> to the worker process which initializes the worker pid at startup itself.

@eschnett
Copy link
Contributor

With MPI, the workers are started by MPI. There is no special manager process, there are just workers -- if you want to have a manager, you need to use one of the workers. MPI offers an API to send messages between workers. There is also additional functionality (e.g. efficient broadcast and reduce operations) but that's too high level to be mapped to a ClusterManager.

MPI has no pids; instead, workers are numbered sequentially starting from zero. It is possible to add new workers (by connecting several MPI jobs), but let's ignore that for the moment.

Would this MPI model fit with your changes? Essentially, when Julia starts, all workers have already been set up and have been connected, and one uses small integers to address them?

@amitmurthy
Copy link
Contributor Author

It would broadly work in the following manner (psuedo code):

Julia processes started with

julia -e "using MPIClustermanager; MPIClustermanager::do_mpi_loop()"

MPIClusterManager implementation:

type MPIClusterManager <: ClusterManager
    map_mpi_julia::Dict{Int, Tuple} # maps mpi_id => (julia_pid, read_stream, write_stream)
    map_julia_mpi::Dict{Int, Tuple}   # maps julia_pid => (mpi_id, read_stream, write_stream)
end


function do_mpi_loop()
    manager=MPIClustermanager()

    for i in 1:MPI_comm_size()
        if i == 0       # treat this as julia pid 1, i.e. the master process
            np = MPI_comm_size() - 1
            addprocs(np, manager=manager)
        else
            init_worker(MPIClustermanager)
            set_cluster_manager(manager)
        end
    end

    while true
        data, from_mpi_id = MPI_recv()
        from_julia_pid, actual_data = data

        connection = get(manager.map_mpi_julia, from_mpi_id, nothing)
        if connection == nothing
           # new incoming connection, will happen only on the workers
            read_stream=BufferStream()
            write_stream=BufferStream()

            manager.map_mpi_julia[from_mpi_id] = (from_julia_pid, read_stream, write_stream)
            manager.map_julia_mpi[from_julia_pid] = (from_mpi_id, read_stream, write_stream)


            @schedule begin
                while true
                    data = read(write_stream)
                    # add a small header of self julia_pid to the rae data
                    MPI_send((myid(), data), mpi_id)     # Send tuple (julia_pid, raw_data) to process mpi_id
                end
            end

            process_messages(read_stream, write_stream)
        else
            (from_julia_pid, read_stream, write_stream) = connection
        end

        write(read_stream, actual_data)    # This is read by the Julia end
    end
end

function launch{T<:MPIClusterManager}(::Type{T}, config::Dict, launched::Array, c::Condition)
    # the workers have already been launched
    for i in 1:config[:np]
        wconfig = copy(config)
        wconfig[:mpi_id] = i
        push!(launched, wconfig)
    end

    notify(c)
end

function connect_m2w{T<:MPIClusterManager}(T, config::Dict)
  pid = config[:pid]
  mpi_id = config[:mpi_id]
  manager = config[:manager]

  read_stream=BufferStream()
  write_stream=BufferStream()

  manager.map_mpi_julia[mpi_id] = (pid, read_stream, write_stream)
  manager.map_julia_mpi[pid] = (mpi_id, read_stream, write_stream)

  @schedule begin
    while true
        data = read(write_stream)

        # add a small header of self julia_pid to the rae data
        MPI_send((myid(), data), mpi_id)     # Send tuple (julia_pid, raw_data) to process mpi_id
    end
  end

  config[:connect_to] = mpi_id    # This will be useful in the worker-to-worker connection setup.
  (read_stream, write_stream)
end


function connect_w2w{T<:MPIClusterManager}(T, config::Dict)
  pid = config[:pid]
  mpi_id = config[:connect_to]
  manager = get_cluster_manager()

  read_stream=BufferStream()
  write_stream=BufferStream()

  manager.map_mpi_julia[mpi_id] = (pid, read_stream, write_stream)
  manager.map_julia_mpi[pid] = (mpi_id, read_stream, write_stream)

  @schedule begin
    while true
        data = read(write_stream)
        MPI_send(data, mpi_id)     # Send data to process mpi_id
    end
  end

  (read_stream, write_stream)
end

@eschnett
Copy link
Contributor

When you receive data via MPI, you already know who sent the data; the additional headers are not necessary. Also, MPI already implements buffers, so it should not be necessary to implement buffers in Julia as well. Unbuffered reads/writes should work just fine.

In an ideal world, the manager would not need to store any data regarding connections, ids, workers etc., as one can get all this information by querying MPI instead. In your code above, the whole if connection == nothing block would go away. BufferStream() objects would have a trivial implementation that just call something like MPI_Recv or MPI_Send.

The @schedule begin block may also not be necessary, as a write to write_stream could directly call MPI_Send, but I'm less sure about this since MPI implementations are usually not multi-threaded, and once Julia is multi-threaded, such write buffering will become necessary.

I'm sorry that I didn't look at your implementation yet, but if you think that the above is feasible? Your quick response and implementation is very encouraging.

@amitmurthy
Copy link
Contributor Author

The issue is in the different communication infrastructure implementations

In Julia, the model is:

  • have one long running task for every worker connection
  • the task waits on an AsyncStream (currently a TCPSocket), deserializes the request, executes it and then waits for the next one
  • requests sent to other workers can happen from any task, at any time. These result in serialization of the request and writing the same to an
    AsyncStream. The single-threaded nature of Julia ensures that a single serialization goes through without interruption
    (also because serialization of a request is first done to an IOBuffer and then written to the socket)

We do not want to change the above model just for MPI.

For MPI communication we are not aware (and don't care) of its internal communication infrastructure.

However, if we want Julia parallel constructs (@parallel, pmap, etc) to use MPI for transport, we need to bridge these two worlds.

StreamBuffers are a way to do this. The Julia communication infrastructure continues to remains unchanged,
i.e., one task per connection, "connections" to multiple workers, waits/reads/writes from/to AsyncStream objects, etc.
Except in this case they are not TCPSockets but StreamBuffers.

As part of the bridging, we send data recevied from a worker(via MPI) to the appropriate Julia task handling that particular worker connection via
the correct StreamBuffer. Also the other way around, i.e. any data written to the write StreamBuffer will be read and send to correct worker via MPI.

We need to store the mapping between a Julia process id (used by @parallel, pmap, etc) and its MPI counterpart (which we will use for transporting the request).
And hence the dictionary that maps the identifiers between these two worlds.

The additional header is required for just the initial state. A Julia worker does not know its Julia process id, till the master connects to it and sends the pid
over as part of the first message. We could treat this initialization process differently and maybe have a callback into the clustermanager. That would do away with the
requirement for a header.

@ViralBShah ViralBShah added the parallelism Parallel or distributed computation label Nov 19, 2014
@amitmurthy
Copy link
Contributor Author

The Julia pid can be sent only once the first time a node sends any message to any other node - the same way that Julia does. That will do away with the requirement for a header.

However, a header, may still be useful in other circumstances - for example, if the MPIClusterManager is written to support both regular MPI as well as MPI-Julia bridge calls in the same user code.

Also MPI broadcast can probably be supported by having another version of @everywhere in Base which does not wait for results - i.e., it calls remote_do internally instead of remotecalls within an @sync.

@eschnett
Copy link
Contributor

Maybe I am conflating two issues. Yes, this cluster manager can be used with MPI, this is good. We should probably proceed this way, and address the remaining issues later.

My other worry is that having one task per connection leads to O(n^2) tasks when there are n workers, if we assume a long-running application with a "random" communication pattern. This will not scale to the process counts I'm interested in (say, 10,000 workers). It will likely work fine if there are 100 workers, and that is probably the majority of the use cases.

However -- at the moment, it is probably more important to support MPI at all than to worry about this kind of scalability. So, please go ahead, I like your proposal very much.

@amitmurthy
Copy link
Contributor Author

Tasks are very, very cheap.

For example:

julia> function foo(n)
           a=[]
           c=Condition()

           for i in 1:n
               @schedule begin
                   wait(c)
                   push!(a, 1)
               end
           end

           sleep(1)
           notify(c, all=true)

           while length(a) != n
               sleep(0.01)
           end
       end
foo (generic function with 1 method)

julia> @time foo(10^4)
elapsed time: 1.080626024 seconds (37438568 bytes allocated)

julia> @time foo(10^4)
elapsed time: 1.043263574 seconds (23727440 bytes allocated)

There is a segfault issue that came by with this code - #9066 - but the fact remains that tasks are not expensive at all. However, at 10K+ workers, we will need to have efficient broadcast mechanisms.

@amitmurthy
Copy link
Contributor Author

I have added a proof-of-concept that uses ZMQ for transport. It is in examples/clustermanager

It uses a star topology as opposed to the native mesh network.
Package ZMQ must be installed. All workers only run on localhost.

All Julia nodes only connect to a "broker" process that listens on known ports 8100 and 8101 via ZMQ sockets.

All commands must be run from examples/clustermanager directory

First, start the broker. In a new console type:
julia broker.jl

Next, start a Julia REPL and type:
push!(LOAD_PATH, "$(pwd())/ZMQCM")
using ZMQCM
ZMQCM.start_master(4) # start with four workers

Alternatively, head.jl, a test script could be run. It just launches the requested number of workers, executes a simple command on al of them and exits.
julia head.jl 4

NOTE: As stated this is a proof-of-concept. A real Julia cluster using ZMQ will probably use different ZMQ socket types and optimize the transport further.

@amitmurthy amitmurthy closed this Nov 24, 2014
@amitmurthy amitmurthy reopened this Nov 24, 2014
@ViralBShah
Copy link
Member

cc @shashi

@amitmurthy amitmurthy changed the title WIP/RFC: User definable transports using ClusterManagers RFC: User definable transports using ClusterManagers Nov 25, 2014
@amitmurthy
Copy link
Contributor Author

Some timing information for the simple ZMQ vs native transport sockets

@time @async remotecall_fetch(2, ()->ones(10^7)); - 0.22 seconds (ZMQ), 0.1 seconds(in built TCP)

With 8 workers

@time @sync begin
    for x in workers()
        @async remotecall_fetch(x, ()->ones(10^7))
    end
end

1.7 seconds (ZMQ), 0.75 seconds (in built TCP)

Not too bad given that an extra hop is involved in the above model. Using ZMQ for transport will be beneficial when we need to scale to thousands of workers, or bridging clusters of nodes in different locations. When the need to leverage extra computing resources justifies any extra network latencies.

@eschnett do you think you can use the above as a template for an MPI cluster manager?

@eschnett
Copy link
Contributor

I think that any system allowing thousands of workers will have some kind of management software installed, such as Condor or MPI. But then, maybe someone discovers a cool project running thousands of Julia workers distributed over the globe...

@amitmurthy I will try this with MPI. I don't know when I'll have time for this, though.

@amitmurthy
Copy link
Contributor Author

Superseded by #9434

@amitmurthy amitmurthy closed this Dec 21, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants