Skip to content

Commit

Permalink
Merge pull request #19052 from JuliaLang/amitm/channel_asyncmap_fixes
Browse files Browse the repository at this point in the history
asyncmap retains shape. Closes #18500
  • Loading branch information
amitmurthy authored Oct 22, 2016
2 parents 0d8a738 + 2cd49f2 commit 467e75b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
4 changes: 2 additions & 2 deletions base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ function next(itr::AsyncGenerator, state::AsyncGeneratorState)
return (r, state)
end

iteratorsize(::Type{AsyncGenerator}) = SizeUnknown()

iteratorsize(itr::AsyncGenerator) = iteratorsize(itr.collector.enumerator)
size(itr::AsyncGenerator) = size(itr.collector.enumerator)

"""
asyncmap(f, c...) -> collection
Expand Down
16 changes: 12 additions & 4 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,19 @@ eltype{T}(::Type{Channel{T}}) = T

show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")

start{T}(c::Channel{T}) = Ref{Nullable{T}}()
function done(c::Channel, state::Ref)
type ChannelState{T}
hasval::Bool
val::T
ChannelState(x) = new(x)
end

start{T}(c::Channel{T}) = ChannelState{T}(false)
function done(c::Channel, state::ChannelState)
try
# we are waiting either for more data or channel to be closed
state[] = take!(c)
state.hasval && return false
state.val = take!(c)
state.hasval = true
return false
catch e
if isa(e, InvalidStateException) && e.state==:closed
Expand All @@ -193,6 +201,6 @@ function done(c::Channel, state::Ref)
end
end
end
next{T}(c::Channel{T}, state) = (v=get(state[]); state[]=nothing; (v, state))
next{T}(c::Channel{T}, state) = (v=state.val; state.hasval=false; (v, state))

iteratorsize{C<:Channel}(::Type{C}) = SizeUnknown()
15 changes: 15 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ results=[]
end
@test sum(results) == 15

# Test channel iterator with done() being called multiple times
# This needs to be explicitly tested since `take!` is called
# in `done()` and not `next()`
c=Channel(32); foreach(i->put!(c,i), 1:10); close(c)
s=start(c)
@test done(c,s) == false
res = Int[]
while !done(c,s)
@test done(c,s) == false
v,s = next(c,s)
push!(res,v)
end
@test res == Int[1:10...]


# Testing timedwait on multiple channels
@sync begin
rr1 = Channel(1)
Expand Down
12 changes: 12 additions & 0 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,18 @@ end
# Test asyncmap
@test allunique(asyncmap(x->(sleep(1.0);object_id(current_task())), 1:10))

# check whether shape is retained
a=rand(2,2)
b=asyncmap(identity, a)
@test a == b
@test size(a) == size(b)

# check with an iterator that does not implement size()
c=Channel(32); foreach(i->put!(c,i), 1:10); close(c)
b=asyncmap(identity, c)
@test Int[1:10...] == b
@test size(b) == (10,)

# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(wp, x->x, 1:100)
Expand Down

1 comment on commit 467e75b

@nanosoldier
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing the daily benchmark build, I will reply here when finished:

@nanosoldier runbenchmarks(ALL, isdaily = true)

Please sign in to comment.