From 4e8b20990583b8b09eb760e6be5ab2cac62f34f8 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Fri, 3 Feb 2023 09:00:10 -0600 Subject: [PATCH 1/2] thunk: Improve ThunkFailedException printing --- src/thunk.jl | 58 ++++++++++++++++++++++++++++++++++++++++++++------- test/thunk.jl | 6 +++--- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/src/thunk.jl b/src/thunk.jl index 72fb2576f..6168ff330 100644 --- a/src/thunk.jl +++ b/src/thunk.jl @@ -193,15 +193,59 @@ ThunkFailedException(thunk, origin, ex::E) where E = ThunkFailedException{E}(convert(WeakThunk, thunk), convert(WeakThunk, origin), ex) function Base.showerror(io::IO, ex::ThunkFailedException) t = unwrap_weak(ex.thunk) - o = unwrap_weak(ex.origin) - t_str = t !== nothing ? "$t" : "?" - o_str = o !== nothing ? "$o" : "?" + + # Find root-cause thunk + last_tfex = ex + failed_tasks = Union{Thunk,Nothing}[] + while last_tfex.ex isa ThunkFailedException && unwrap_weak(last_tfex.ex.origin) !== nothing + push!(failed_tasks, unwrap_weak(last_tfex.thunk)) + last_tfex = last_tfex.ex + end + o = unwrap_weak(last_tfex.origin) + root_ex = last_tfex.ex + + function thunk_string(t) + if t === nothing + return "Thunk(?)" + end + Tinputs = Any[] + for input in t.inputs + input = unwrap_weak(input) + if istask(input) + push!(Tinputs, "Thunk(id=$(input.id))") + else + push!(Tinputs, input) + end + end + t_sig = if length(Tinputs) <= 4 + "$(t.f)($(join(Tinputs, ", "))))" + else + "$(t.f)($(length(Tinputs)) inputs...)" + end + return "Thunk(id=$(t.id), $t_sig" + end + t_str = thunk_string(t) + o_str = thunk_string(o) t_id = t !== nothing ? t.id : '?' o_id = o !== nothing ? o.id : '?' - println(io, "ThunkFailedException ($t failure", - (o !== nothing && t != o) ? " due to a failure in $o)" : ")", - ":") - Base.showerror(io, ex.ex) + println(io, "ThunkFailedException:") + println(io, " Root Exception Type: $(typeof(root_ex))") + println(io, " Root Exception:") + Base.showerror(io, root_ex); println(io) + if t !== o + println(io, " Root Thunk: $o_str") + if length(failed_tasks) <= 4 + for i in failed_tasks + i_str = thunk_string(i) + println(io, " Inner Thunk: $i_str") + end + else + println(io, " ...") + println(io, " $(length(failed_tasks)) Inner Thunks...") + println(io, " ...") + end + end + print(io, " This Thunk: $t_str") end """ diff --git a/test/thunk.jl b/test/thunk.jl index 1055819a2..3df8d36ef 100644 --- a/test/thunk.jl +++ b/test/thunk.jl @@ -95,9 +95,9 @@ end end ex = Dagger.Sch.unwrap_nested_exception(ex) ex_str = sprint(io->Base.showerror(io,ex)) - @test occursin(r"^ThunkFailedException \(Thunk.*failure\):", ex_str) + @test occursin(r"^ThunkFailedException:", ex_str) @test occursin("Test", ex_str) - @test !occursin("due to a failure in", ex_str) + @test !occursin("Root Thunk", ex_str) ex = try fetch(b) @@ -106,8 +106,8 @@ end end ex = Dagger.Sch.unwrap_nested_exception(ex) ex_str = sprint(io->Base.showerror(io,ex)) - @test occursin(r"Thunk.*failure due to a failure in", ex_str) @test occursin("Test", ex_str) + @test occursin("Root Thunk", ex_str) end @testset "single dependent" begin a = @spawn error("Test") From 2ebb51b54168beaa6841b47c474d5962ec32b3f8 Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Fri, 3 Feb 2023 09:00:32 -0600 Subject: [PATCH 2/2] Sch: Fix task error-related assertion --- src/sch/Sch.jl | 17 ++++++++++++++++- src/sch/util.jl | 5 ++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/sch/Sch.jl b/src/sch/Sch.jl index d9e7e242a..1fb0652d7 100644 --- a/src/sch/Sch.jl +++ b/src/sch/Sch.jl @@ -610,7 +610,22 @@ function schedule!(ctx, state, procs=procs_to_use(ctx)) end task = pop!(state.ready) timespan_start(ctx, :schedule, task.id, (;thunk_id=task.id)) - @assert !haskey(state.cache, task) + if haskey(state.cache, task) + if haskey(state.errored, task) + # An error was eagerly propagated to this task + finish_failed!(state, task) + else + # This shouldn't have happened + iob = IOBuffer() + println(iob, "Scheduling inconsistency: Task being scheduled is already cached!") + println(iob, " Task: $(task.id)") + println(iob, " Cache Entry: $(typeof(state.cache[task]))") + ex = SchedulingException(String(take!(iob))) + state.cache[task] = ex + state.errored[task] = true + end + @goto pop_task + end opts = merge(ctx.options, task.options) sig = signature(task, state) diff --git a/src/sch/util.jl b/src/sch/util.jl index e1704b11b..864ac65e2 100644 --- a/src/sch/util.jl +++ b/src/sch/util.jl @@ -145,6 +145,9 @@ function set_failed!(state, origin, thunk=origin) filter!(x->x!==thunk, state.ready) state.cache[thunk] = ThunkFailedException(thunk, origin, state.cache[origin]) state.errored[thunk] = true + finish_failed!(state, thunk, origin) +end +function finish_failed!(state, thunk, origin=nothing) fill_registered_futures!(state, thunk, true) if haskey(state.waiting_data, thunk) for dep in state.waiting_data[thunk] @@ -152,7 +155,7 @@ function set_failed!(state, origin, thunk=origin) delete!(state.waiting, dep) haskey(state.errored, dep) && continue - set_failed!(state, origin, dep) + origin !== nothing && set_failed!(state, origin, dep) end delete!(state.waiting_data, thunk) end