-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
threading branch #13410
threading branch #13410
Changes from all commits
5cdc03b
6cfe327
8c98205
65c9d89
0afad35
6c7e175
1db03d2
4ee059e
72de07b
766c608
8673d74
6311538
7aadf02
fbfd8e9
4a2d2cc
0f17e69
3f4afe0
4573cdb
a14a8b5
9604d75
2f0d759
ac027dc
236c5e4
1a0ddf3
0152ea1
a715b4e
c0983e0
4df1fe2
3fd7aca
2fc8d97
67a93b9
b29089c
95f22cf
8b69400
cc71a4a
707e81f
87f9a9c
cf70bc9
3a86a98
c129d1e
0f49b2f
6eab323
f3402da
66bf7be
a113dd3
0921b94
f68130c
5203429
74f906f
49d582b
2cc1371
e6a8b1e
db59e7a
a56a3bc
89636aa
f5da3f8
407b00a
8d840bd
b3b22e8
f54fbb2
4282ba2
79da037
7723bd2
d164587
173cd22
eaf6115
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# This file is a part of Julia. License is MIT: http://julialang.org/license | ||
|
||
using Base.Intrinsics: llvmcall | ||
|
||
import Base: setindex!, getindex | ||
|
||
export | ||
Atomic, | ||
atomic_cas!, | ||
atomic_xchg!, | ||
atomic_add!, atomic_sub!, | ||
atomic_and!, atomic_nand!, atomic_or!, atomic_xor!, | ||
atomic_max!, atomic_min!, atomic_umax!, atomic_umin! | ||
|
||
type Atomic{T<:Integer} | ||
value::T | ||
Atomic() = new(zero(T)) | ||
Atomic(value) = new(value) | ||
end | ||
|
||
Atomic() = Atomic{Int}() | ||
|
||
atomicintsmap = Dict(Int8 => "i8", UInt8 => "i8", | ||
Int16 => "i16", UInt16 => "i16", | ||
Int32 => "i32", UInt32 => "i32", | ||
Int64 => "i64", UInt64 => "i64", | ||
Int128 => "i128", UInt128 => "i128") | ||
|
||
unsafe_convert{T}(::Type{Ptr{T}}, x::Atomic{T}) = convert(Ptr{T}, pointer_from_objref(x)) | ||
setindex!{T}(x::Atomic{T}, v) = setindex!(x, convert(T, v)) | ||
|
||
for (typ, lt) in atomicintsmap | ||
rt = VersionNumber(Base.libllvm_version) >= v"3.6" ? "$lt, $lt*" : "$lt*" | ||
@eval getindex(x::Atomic{$typ}) = | ||
llvmcall($""" | ||
%rv = load atomic volatile $rt %0 monotonic, align $WORD_SIZE | ||
ret $lt %rv | ||
""", $typ, Tuple{Ptr{$typ}}, unsafe_convert(Ptr{$typ}, x)) | ||
@eval setindex!(x::Atomic{$typ}, v::$typ) = | ||
llvmcall($""" | ||
store atomic volatile $lt %1, $lt* %0 monotonic, align $WORD_SIZE | ||
ret void | ||
""", Void, Tuple{Ptr{$typ},$typ}, unsafe_convert(Ptr{$typ}, x), v) | ||
@eval atomic_cas!(x::Atomic{$typ}, cmp::$typ, new::$typ) = | ||
llvmcall($""" | ||
%rv = cmpxchg $lt* %0, $lt %1, $lt %2 acq_rel monotonic | ||
%bv = extractvalue { $lt, i1 } %rv, 1 | ||
ret i1 %bv | ||
""", Bool, Tuple{Ptr{$typ},$typ,$typ}, unsafe_convert(Ptr{$typ}, x), cmp, new) | ||
for rmwop in [:xchg, :add, :sub, :and, :nand, :or, :xor, :max, :min, :umax, :umin] | ||
rmw = string(rmwop) | ||
fn = symbol("atomic_", rmw, "!") | ||
@eval $fn(x::Atomic{$typ}, v::$typ) = | ||
llvmcall($""" | ||
%rv = atomicrmw volatile $rmw $lt* %0, $lt %1 acquire | ||
ret $lt %rv | ||
""", $typ, Tuple{Ptr{$typ}, $typ}, unsafe_convert(Ptr{$typ}, x), v) | ||
end | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
# This file is a part of Julia. License is MIT: http://julialang.org/license | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is already a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I'm working on the interaction between threads and tasks, which is also needed for I/O. When that is properly designed, these will get merged. |
||
|
||
export SpinLock, Mutex, init_lock!, destroy_lock!, lock!, trylock!, unlock! | ||
|
||
abstract AbstractLock | ||
|
||
# Test-and-test-and-set spin locks are quickest up to about 30ish | ||
# contending threads. If you have more contention than that, perhaps | ||
# a lock is the wrong way to synchronize. | ||
type TatasLock <: AbstractLock | ||
handle::Atomic{Int} | ||
TatasLock() = new(Atomic{Int}(0)) | ||
end | ||
|
||
typealias SpinLock TatasLock | ||
|
||
function lock!(l::TatasLock) | ||
while true | ||
if l.handle[] == 0 | ||
p = atomic_xchg!(l.handle, 1) | ||
if p == 0 | ||
return 0 | ||
end | ||
end | ||
# TODO: pause | ||
end | ||
end | ||
|
||
function trylock!(l::TatasLock) | ||
if l.handle[] == 0 | ||
return atomic_xchg!(l.handle, 1) | ||
end | ||
return 1 | ||
end | ||
|
||
function unlock!(l::TatasLock) | ||
l.handle[] = 0 | ||
return 0 | ||
end | ||
|
||
|
||
# Recursive test-and-test-and-set lock. Slower. | ||
type RecursiveTatasLock <: AbstractLock | ||
ownertid::Atomic{Int16} | ||
handle::Atomic{Int} | ||
RecursiveTatasLock() = new(0, Atomic{Int}(0)) | ||
end | ||
|
||
typealias RecursiveSpinLock RecursiveTatasLock | ||
|
||
function lock!(l::RecursiveTatasLock) | ||
if l.ownertid[] == threadid() | ||
return 0 | ||
end | ||
while true | ||
if l.handle[] == 0 | ||
p = atomic_xchg!(l.handle, 1) | ||
if p == 0 | ||
l.ownertid[] = threadid() | ||
return 0 | ||
end | ||
end | ||
# TODO: pause | ||
end | ||
end | ||
|
||
function trylock!(l::RecursiveTatasLock) | ||
if l.ownertid[] == threadid() | ||
return 0 | ||
end | ||
if l.handle[] == 0 | ||
p = atomic_xchg!(l.handle, 1) | ||
if p == 0 | ||
l.ownertid[] = threadid() | ||
end | ||
return p | ||
end | ||
return 1 | ||
end | ||
|
||
function unlock!(l::RecursiveTatasLock) | ||
if l.ownertid[] != threadid() | ||
return 1 | ||
end | ||
l.ownertid[] = 0 | ||
l.handle[] = 0 | ||
return 0 | ||
end | ||
|
||
|
||
# These are mutexes from libuv, which abstract pthread mutexes and | ||
# Windows critical sections. We're doing some error checking (and | ||
# paying for it in overhead), but regardless, in some situations, | ||
# passing a bad parameter will cause an abort. | ||
|
||
# TODO: how defensive to get, and how to turn it off? | ||
# TODO: how to catch an abort? | ||
|
||
const UV_MUTEX_SIZE = ccall(:jl_sizeof_uv_mutex, Cint, ()) | ||
|
||
type Mutex <: AbstractLock | ||
ownertid::Int16 | ||
handle::Array{Int8} | ||
Mutex() = (m = new(zero(Int16), zeros(Int8, UV_MUTEX_SIZE)); | ||
ccall(:uv_mutex_init, Void, (Ptr{Void},), m.handle); | ||
finalizer(m, (x -> ccall(:uv_mutex_destroy, Void, (Ptr{Void},), x.handle))); | ||
m) | ||
end | ||
|
||
function lock!(m::Mutex) | ||
if m.ownertid == threadid() | ||
return 0 | ||
end | ||
ccall(:uv_mutex_lock, Void, (Ptr{Void},), m.handle) | ||
m.ownertid = threadid() | ||
return 0 | ||
end | ||
|
||
function trylock!(m::Mutex) | ||
if m.ownertid == threadid() | ||
return 0 | ||
end | ||
r = ccall(:uv_mutex_trylock, Cint, (Ptr{Void},), m.handle) | ||
if r == 0 | ||
m.ownertid = threadid() | ||
end | ||
return r | ||
end | ||
|
||
function unlock!(m::Mutex) | ||
if m.ownertid != threadid() | ||
return Base.UV_EPERM | ||
end | ||
m.ownertid = 0 | ||
ccall(:uv_mutex_unlock, Void, (Ptr{Void},), m.handle) | ||
return 0 | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# This file is a part of Julia. License is MIT: http://julialang.org/license | ||
|
||
export threadid, maxthreads, nthreads, @threads | ||
|
||
threadid() = Int(ccall(:jl_threadid, Int16, ())+1) | ||
maxthreads() = Int(unsafe_load(cglobal(:jl_max_threads, Cint))) | ||
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint))) | ||
|
||
function _threadsfor(forexpr) | ||
fun = gensym("_threadsfor") | ||
lidx = forexpr.args[1].args[1] # index | ||
lf = forexpr.args[1].args[2].args[1] # first | ||
ll = forexpr.args[1].args[2].args[2] # last | ||
lbody = forexpr.args[2] # body | ||
quote | ||
function $fun() | ||
tid = threadid() | ||
# divide loop iterations among threads | ||
len, rem = divrem($(esc(ll))-$(esc(lf))+1, nthreads()) | ||
# not enough iterations for all the threads? | ||
if len == 0 | ||
if tid > rem | ||
return | ||
end | ||
len, rem = 1, 0 | ||
end | ||
# compute this thread's range | ||
f = $(esc(lf)) + ((tid-1) * len) | ||
l = f + len - 1 | ||
# distribute remaining iterations evenly | ||
if rem > 0 | ||
if tid <= rem | ||
f = f + (tid-1) | ||
l = l + tid | ||
else | ||
f = f + rem | ||
l = l + rem | ||
end | ||
end | ||
# run this thread's iterations | ||
for $(esc(lidx)) = f:l | ||
$(esc(lbody)) | ||
end | ||
end | ||
ccall(:jl_threading_run, Void, (Any, Any), $fun, ()) | ||
end | ||
end | ||
|
||
function _threadsblock(blk) | ||
fun = gensym("_threadsblock") | ||
esc(quote | ||
function $fun() | ||
$blk | ||
end | ||
ccall(:jl_threading_run, Void, (Any, Any), $fun, ()) | ||
end) | ||
end | ||
|
||
function _threadscall(callexpr) | ||
fun = callexpr.args[1] | ||
esc(quote | ||
ccall(:jl_threading_run, Void, (Any, Any), $fun, $(Expr(:call, Core.svec, callexpr.args[2:end]...))) | ||
end) | ||
end | ||
|
||
macro threads(args...) | ||
na = length(args) | ||
if na != 2 | ||
throw(ArgumentError("wrong number of arguments in @threads")) | ||
end | ||
tg = args[1] | ||
if !is(tg, :all) | ||
throw(ArgumentError("only 'all' supported as thread group for @threads")) | ||
end | ||
ex = args[2] | ||
if !isa(ex, Expr) | ||
throw(ArgumentError("need an expression argument to @threads")) | ||
end | ||
if is(ex.head, :for) | ||
return _threadsfor(ex) | ||
elseif is(ex.head, :block) | ||
return _threadsblock(ex) | ||
elseif is(ex.head, :call) | ||
return _threadscall(ex) | ||
else | ||
throw(ArgumentError("unrecognized argument to @threads")) | ||
end | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests for
@nany
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd add a couple, but I can't find tests for any of the existing macros in
Cartesian
.