From eeefcf2f7c3b6d073b33a4dc39e203d10fc6c8ac Mon Sep 17 00:00:00 2001 From: Kiran Pamnany Date: Fri, 22 Jul 2016 13:04:44 -0700 Subject: [PATCH] Reserve a thread to drive Dtree correctly --- src/api.jl | 96 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 67 insertions(+), 29 deletions(-) diff --git a/src/api.jl b/src/api.jl index b50ea1f4..4438c305 100644 --- a/src/api.jl +++ b/src/api.jl @@ -142,6 +142,34 @@ function time_puts(elapsedtime, bytes, gctime, allocs) nputs(dt_nodeid, s) end +""" +Divide `N` into `np` parts as evenly as possible, returning `my` part as +a (first, last) tuple. +""" +function divparts(N, np, my) + len, rem = divrem(N, np) + if len == 0 + if my > rem + return 1, 0 + end + len, rem = 1, 0 + end + # compute my part + f = 1 + ((my-1) * len) + l = f + len - 1 + # distribute remaining evenly + if rem > 0 + if my <= rem + f = f + (my-1) + l = l + my + else + f = f + rem + l = l + rem + end + end + return f, l +end + """ Divide the given ra, dec range into sky areas of `wira`x`widec` and @@ -236,7 +264,6 @@ function divide_and_infer(ra_range::Tuple{Float64, Float64}, tic() while rundt[] rundtree(rundt) - cpu_pause() end finalize(dt) timing.wait_done = toq() @@ -299,7 +326,11 @@ function infer(fieldids::Vector{Tuple{Int, Int, Int}}, thread_fun=phalse, timing=InferTiming()) - Lumberjack.info("Running with $(nthreads()) threads") + nprocthreads = nthreads() + if reserve_thread[] + nprocthreads = nprocthreads-1 + end + Lumberjack.info("Running with $(nprocthreads) threads") # Read all primary objects in these fields. tic() @@ -353,42 +384,49 @@ function infer(fieldids::Vector{Tuple{Int, Int, Int}}, # iterate over sources results = Dict{Int, Dict}() results_lock = SpinLock() - tic() - @threads for ts in 1:length(target_sources) + function process_sources() tid = threadid() + if reserve_thread[] && tid == 1 while reserve_thread[] thread_fun(reserve_thread) cpu_pause() end else - s = target_sources[ts] - entry = catalog[s] - -# try - nputs(dt_nodeid, "processing source $s: objid = $(entry.objid)") - gc() - - t0 = time() - # TODO: subset images to images_local too. - vs_opt = Infer.infer_source(images, - catalog[neighbor_map[ts]], - entry) - runtime = time() - t0 - - lock!(results_lock) - results[entry.thing_id] = Dict( - "objid"=>entry.objid, - "ra"=>entry.pos[1], - "dec"=>entry.pos[2], - "vs"=>vs_opt, - "runtime"=>runtime) - unlock!(results_lock) -# catch ex -# Lumberjack.err(ex) -# end + # divide loop iterations among threads + f, l = divparts(length(target_sources), nprocthreads, tid) + for ts = f:l + s = target_sources[ts] + entry = catalog[s] + +# try + nputs(dt_nodeid, "processing source $s: objid = $(entry.objid)") + gc() + + t0 = time() + # TODO: subset images to images_local too. + vs_opt = Infer.infer_source(images, + catalog[neighbor_map[ts]], + entry) + runtime = time() - t0 + + lock!(results_lock) + results[entry.thing_id] = Dict( + "objid"=>entry.objid, + "ra"=>entry.pos[1], + "dec"=>entry.pos[2], + "vs"=>vs_opt, + "runtime"=>runtime) + unlock!(results_lock) +# catch ex +# Lumberjack.err(ex) +# end + end end end + + tic() + ccall(:jl_threading_run, Void, (Any,), Core.svec(process_sources)) timing.opt_srcs = toq() timing.num_srcs = length(target_sources)