Skip to content

Commit

Permalink
New functions imap and amap.
Browse files Browse the repository at this point in the history
  • Loading branch information
samoconnor committed Feb 13, 2016
1 parent 140d004 commit 4b38b92
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 0 deletions.
3 changes: 3 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1210,8 +1210,11 @@ export

# multiprocessing
addprocs,
amap,
amap!,
ClusterManager,
fetch,
imap,
init_worker,
interrupt,
isready,
Expand Down
249 changes: 249 additions & 0 deletions base/mapiterator.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
# This file is a part of Julia. License is MIT: http://julialang.org/license

#-------------------------------------------------------------------------------
# MapItr
#
# Simple synchronous map iterator.
#
# collect(MapItr(f, c...)) == map(f, c...)
#
#-------------------------------------------------------------------------------

import Base: start, done, next

type MapItr
f::Union{Function,Type}
arg_itr
end

MapItr(f, c...) = MapItr(f, zip(c...))


start(itr::MapItr) = start(itr.arg_itr)

done(itr::MapItr, state) = done(itr.arg_itr, state)

function next(itr::MapItr, state)
args, state = next(itr.arg_itr, state)
itr.f(args...), state
end



#-------------------------------------------------------------------------------
# AsyncMapItr
#
# Asynchronous map iterator.
# Creates a @sync block (between start() and done().
# Executes mapped function calls using @asyc.
# Function results are asynchronously stored in "results" collection
# (iterator returns nothing).
#
# for task in AsyncMapItr(f, results, c...) end
#
# ==
#
# map!(f, results, c...)
#
# The maximum number of concurrent @async tasks can be set with the
# "ntasks" option. e.g.
#
# AsyncMapItr(f, results, c...; ntasks=10)
#
#-------------------------------------------------------------------------------


type AsyncMapItr
f
results
arg_enum::Enumerate
ntasks::Int
end

function AsyncMapItr(f, results, c...; ntasks=nothing)
if ntasks == nothing
ntasks = 100
end
AsyncMapItr(f, results, enumerate(zip(c...)), ntasks)
end


type AsyncMapState
enum_state
active_count::Int
task_done::Condition
done::Bool
end


# Busy if the maximum number of concurrent tasks is running.
function isbusy(itr::AsyncMapItr, state::AsyncMapState)
state.active_count == itr.ntasks
end


# Wait for @async task to end.
wait(state::AsyncMapState) = wait(state.task_done)


# Open a @sync block and initialise iterator state.
function start(itr::AsyncMapItr)
Base.sync_begin()
AsyncMapState(start(itr.arg_enum), 0, Condition(), false)
end

# Close @sync block when iterator is done.
function done(itr::AsyncMapItr, state::AsyncMapState)
if !state.done && done(itr.arg_enum, state.enum_state)
state.done = true
Base.sync_end()
end
return state.done
end

function next(itr::AsyncMapItr, state::AsyncMapState)

# Wait if the maximum number of concurrent tasks are already running...
while isbusy(itr, state)
wait(state)
end

# Get index and mapped function arguments from enumeration iterator...
(i, args), state.enum_state = next(itr.arg_enum, state.enum_state)

# Execute function call and save result asynchronously...
@async begin
itr.results[i] = itr.f(args...)
state.active_count -= 1
notify(state.task_done, nothing)
end

# Count number of concurrent tasks...
state.active_count += 1

return (nothing, state)
end



#-------------------------------------------------------------------------------
# StreamMapItr
#
# Streaming map iterator.
# Applies mapped function asynchronously and returns results as they become
# available.
#
# collect(StreamMapItr(f, c...)) == map(f, c...)
#
# The maximum number of concurrent @async tasks can be set with the
# "ntasks" option. e.g.
#
# StreamMapItr(f, c...; ntasks=10)
#
#-------------------------------------------------------------------------------


type StreamMapItr
async_itr::AsyncMapItr
end

function StreamMapItr(f, c...; ntasks=nothing)
StreamMapItr(AsyncMapItr(f, Dict{Int,Any}(), c...; ntasks=ntasks))
end


type StreamMapState
i::Int
async_state::AsyncMapState
end


start(itr::StreamMapItr) = StreamMapState(0, start(itr.async_itr))

# Done when source async iterator is done and all results have been consumed.
function done(itr::StreamMapItr, state::StreamMapState)
done(itr.async_itr, state.async_state) && isempty(itr.async_itr.results)
end


# Pump the source async iterator if it is not already busy...

function pump_source(itr::StreamMapItr, state::StreamMapState)
if !isbusy(itr.async_itr, state.async_state) &&
!done(itr.async_itr, state.async_state)
ignored, state.async_state = next(itr.async_itr, state.async_state)
return true
else
return false
end
end

function next(itr::StreamMapItr, state::StreamMapState)

state.i += 1

results = itr.async_itr.results
while !haskey(results, state.i)

# Wait for results to become available...
if !pump_source(itr,state) && !haskey(results, state.i)
wait(state.async_state)
end
end
r = results[state.i]
delete!(results, state.i)

return (r, state)
end



#-------------------------------------------------------------------------------
# Interface: amap and imap
#-------------------------------------------------------------------------------

"""
amap(f, c...; ntasks=100) -> collection
Transform collection c by applying f to each element using at most
100 asynchronous tasks. For multiple collection arguments, apply f
elementwise.
Note: `amap(f, c...; ntasks=1)` is equivalent to `map(f, c...)`.
"""
amap(f, c...; kv...) = collect(imap(f, c...; kv...))


"""
imap(f, c...; ntasks=100) -> iterator
Apply f to each element of c using at most 100 asynchronous tasks.
For multiple collection arguments, apply f elementwise.
Note: `collect(imap(f, c...; ntasks=1))` is equivalent to `map(f, c...)`.
"""
imap(f, c...; ntasks=nothing) = StreamMapItr(f, c...; ntasks=ntasks)


"""
amap!(function, collection; ntasks=100)
In-place version of [`amap`](:func:`amap`).
"""
function amap! end

"""
amap!(function, destination, collection...; ntasks=100)
Like [`amap`](:func:`amap`), but stores the result in `destination` rather than a new collection.
`destination` must be at least as large as the first collection.
"""
function amap!(f, c...; ntasks=nothing)

destination = c[1]
if length(c) > 1
c = c[2:end]
end

for task in AsyncMapItr(f, destination, c..., ntasks=ntasks) end

return destination
end
1 change: 1 addition & 0 deletions base/sysimg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ importall .Serializer
include("channels.jl")
include("multi.jl")
include("managers.jl")
include("mapiterator.jl")

# code loading
include("loading.jl")
Expand Down
24 changes: 24 additions & 0 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,30 @@ General Parallel Computing Support
Get the id of the current process.

.. function:: imap(f, c...; ntasks=100) -> iterator

.. Docstring generated from Julia source
Apply f to each element of c using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: ``collect(imap(f, c...; ntasks=1))`` is equivalent to ``map(f, c...)``\ .

.. function:: amap(f, c...; ntasks=100) -> collection

.. Docstring generated from Julia source
Transform collection c by applying f to each element using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: ``amap(f, c...; ntasks=1)`` is equivalent to ``map(f, c...)``\ .

.. function:: amap!(function, collection; ntasks=100)

.. Docstring generated from Julia source
In-place version of :func:`amap`\ .

.. function:: amap!(function, destination, collection...; ntasks=100)

.. Docstring generated from Julia source
Like :func:`amap`\ , but stores the result in ``destination`` rather than a new collection. ``destination`` must be at least as large as the first collection.

.. function:: pmap(f, lsts...; err_retry=true, err_stop=false, pids=workers())

.. Docstring generated from Julia source
Expand Down
5 changes: 5 additions & 0 deletions test/abstractarray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ function test_map(::Type{TestAbstractArray})
f(x) = x + 1
I = GenericIterator{10}()
@test map(f, I) == Any[2:11...]
@test amap(f, I) == Any[2:11...]

# AbstractArray map for 2 arg case
f(x, y) = x + y
Expand All @@ -413,15 +414,19 @@ function test_map(::Type{TestAbstractArray})
C = Float64[1:10...]
@test Base.map_to!(f, 1, A, B, C) == Real[ 2 * i for i in 1:10 ]
@test map(f, Int[], Float64[]) == Float64[]
@test amap(f, Int[], Float64[]) == Float64[]

# AbstractArray map for N-arg case
f(x, y, z) = x + y + z
D = Float64[1:10...]

@test map!(f, A, B, C, D) == Int[ 3 * i for i in 1:10 ]
@test amap!(f, A, B, C, D) == Int[ 3 * i for i in 1:10 ]
@test Base.map_to_n!(f, 1, A, (B, C, D)) == Real[ 3 * i for i in 1:10 ]
@test map(f, B, C, D) == Float64[ 3 * i for i in 1:10 ]
@test amap(f, B, C, D) == Float64[ 3 * i for i in 1:10 ]
@test map(f, Int[], Int[], Complex{Int}[]) == Number[]
@test amap(f, Int[], Int[], Complex{Int}[]) == Number[]
end

function test_map_promote(::Type{TestAbstractArray})
Expand Down

0 comments on commit 4b38b92

Please sign in to comment.