Skip to content

Commit

Permalink
Merge pull request #246 from kpamnany/fix_dt_thread
Browse files Browse the repository at this point in the history
Reserve a thread to drive Dtree correctly
  • Loading branch information
jeff-regier authored Jul 22, 2016
2 parents e93fa5d + eeefcf2 commit 1c7033f
Showing 1 changed file with 67 additions and 29 deletions.
96 changes: 67 additions & 29 deletions src/api.jl
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,34 @@ function time_puts(elapsedtime, bytes, gctime, allocs)
nputs(dt_nodeid, s)
end

"""
Divide `N` into `np` parts as evenly as possible, returning `my` part as
a (first, last) tuple.
"""
function divparts(N, np, my)
len, rem = divrem(N, np)
if len == 0
if my > rem
return 1, 0
end
len, rem = 1, 0
end
# compute my part
f = 1 + ((my-1) * len)
l = f + len - 1
# distribute remaining evenly
if rem > 0
if my <= rem
f = f + (my-1)
l = l + my
else
f = f + rem
l = l + rem
end
end
return f, l
end


"""
Divide the given ra, dec range into sky areas of `wira`x`widec` and
Expand Down Expand Up @@ -236,7 +264,6 @@ function divide_and_infer(ra_range::Tuple{Float64, Float64},
tic()
while rundt[]
rundtree(rundt)
cpu_pause()
end
finalize(dt)
timing.wait_done = toq()
Expand Down Expand Up @@ -299,7 +326,11 @@ function infer(fieldids::Vector{Tuple{Int, Int, Int}},
thread_fun=phalse,
timing=InferTiming())

Lumberjack.info("Running with $(nthreads()) threads")
nprocthreads = nthreads()
if reserve_thread[]
nprocthreads = nprocthreads-1
end
Lumberjack.info("Running with $(nprocthreads) threads")

# Read all primary objects in these fields.
tic()
Expand Down Expand Up @@ -353,42 +384,49 @@ function infer(fieldids::Vector{Tuple{Int, Int, Int}},
# iterate over sources
results = Dict{Int, Dict}()
results_lock = SpinLock()
tic()
@threads for ts in 1:length(target_sources)
function process_sources()
tid = threadid()

if reserve_thread[] && tid == 1
while reserve_thread[]
thread_fun(reserve_thread)
cpu_pause()
end
else
s = target_sources[ts]
entry = catalog[s]

# try
nputs(dt_nodeid, "processing source $s: objid = $(entry.objid)")
gc()

t0 = time()
# TODO: subset images to images_local too.
vs_opt = Infer.infer_source(images,
catalog[neighbor_map[ts]],
entry)
runtime = time() - t0

lock!(results_lock)
results[entry.thing_id] = Dict(
"objid"=>entry.objid,
"ra"=>entry.pos[1],
"dec"=>entry.pos[2],
"vs"=>vs_opt,
"runtime"=>runtime)
unlock!(results_lock)
# catch ex
# Lumberjack.err(ex)
# end
# divide loop iterations among threads
f, l = divparts(length(target_sources), nprocthreads, tid)
for ts = f:l
s = target_sources[ts]
entry = catalog[s]

# try
nputs(dt_nodeid, "processing source $s: objid = $(entry.objid)")
gc()

t0 = time()
# TODO: subset images to images_local too.
vs_opt = Infer.infer_source(images,
catalog[neighbor_map[ts]],
entry)
runtime = time() - t0

lock!(results_lock)
results[entry.thing_id] = Dict(
"objid"=>entry.objid,
"ra"=>entry.pos[1],
"dec"=>entry.pos[2],
"vs"=>vs_opt,
"runtime"=>runtime)
unlock!(results_lock)
# catch ex
# Lumberjack.err(ex)
# end
end
end
end

tic()
ccall(:jl_threading_run, Void, (Any,), Core.svec(process_sources))
timing.opt_srcs = toq()
timing.num_srcs = length(target_sources)

Expand Down

0 comments on commit 1c7033f

Please sign in to comment.