Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safety improvements #4

Closed

Conversation

jpsamaroo
Copy link
Member

Rebase of JuliaLang/julia#42239 plus fixes a thread safety issue in init_multi.

Intended to fix/improve #73

@vchuravy vchuravy closed this Oct 14, 2023
@vchuravy vchuravy reopened this Oct 14, 2023
@jpsamaroo jpsamaroo force-pushed the jps/threadsafe_workerstate branch from 55ba912 to 0704fe5 Compare October 14, 2023 21:49
@jpsamaroo
Copy link
Member Author

Depends on JuliaLang/julia#5 for CI

@codecov
Copy link

codecov bot commented Oct 14, 2023

Codecov Report

Merging #4 (88680df) into master (8799660) will increase coverage by 0.15%.
The diff coverage is 97.05%.

@@            Coverage Diff             @@
##           master       JuliaLang/julia#4      +/-   ##
==========================================
+ Coverage   79.19%   79.34%   +0.15%     
==========================================
  Files          10       10              
  Lines        1951     1966      +15     
==========================================
+ Hits         1545     1560      +15     
  Misses        406      406              
Files Coverage Δ
src/cluster.jl 74.15% <100.00%> (+0.67%) ⬆️
src/managers.jl 46.53% <0.00%> (ø)

@JamesWrigley
Copy link
Collaborator

I've tried a rebased version of this branch on latest master, can confirm the example test from #73 passes 👍

➜  Distributed.jl git:(jps/threadsafe_workerstate) ~/git/julia/julia                              
               _
   _       _ _(_)_     |  Documentation: https://docs.julialang.org
  (_)     | (_) (_)    |
   _ _   _| |_  __ _   |  Type "?" for help, "]?" for Pkg help.
  | | | | | | |/ _` |  |
  | | |_| | | | (_| |  |  Version 1.12.0-DEV.321 (2024-04-09)
 _/ |\__'_|_|_|\__'_|  |  Commit 7099bddd5f* (0 days old master)
|__/                   |

(@v1.12) pkg> st
Status `~/.julia/environments/v1.12/Project.toml`
  [295af30f] Revise v3.5.14

julia> include("distributed-thread-safety.jl")
Test Summary:               | Pass  Total     Time
RemoteChannel is threadsafe |   64     64  1m53.2s
Test.DefaultTestSet("RemoteChannel is threadsafe", Any[Test.DefaultTestSet("from worker 1 to 1 via 1", Any[Test.DefaultTestSet("from thread 1.1 to 1.1", Any[], 4, false, false, true, 1.712685455788765e9, 1.712685465251433e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 1.2 to 1.1", Any[], 4, false, false, true, 1.71268546525156e9, 1.712685470893932e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 1.1 to 1.2", Any[], 4, false, false, true, 1.712685470894084e9, 1.712685476819892e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 1.2 to 1.2"
, Any[], 4, false, false, true, 1.712685476820029e9, 1.71268548236758e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl")], 0, false, false, true, 1.712685455788721e9, 1.712685482371097e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from worker 2 to 1 via 1", Any[Test.DefaultTestSet("from thread 2.1 to 1.1", Any[], 4, false, false, true, 1.712685482371207e9, 1.712685489590523e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 2.2 to 1.1", Any[], 4, false, false, true, 1.712685489590582e9, 1.712685496875588e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 2.1 to 1.2", Any[], 4, false, false, true, 1.712685496875651e9, 1.712685504261359e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 2.2 to 1.2", Any[], 4, 
false, false, true, 1.712685504261427e9, 1.712685511562309e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl")], 0, false, false, true, 1.712685482371161e9, 1.712685511562352e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from worker 1 to 2 via 1", Any[Test.DefaultTestSet("from thread 1.1 to 2.1", Any[], 4, false, false, true, 1.712685511562474e9, 1.712685518686331e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 1.2 to 2.1", Any[], 4, false, false, true, 1.712685518686465e9, 1.712685525761654e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 1.1 to 2.2", Any[], 4, false, false, true, 1.712685525761734e9, 1.712685532856677e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 1.2 to 2.2", Any[], 4, false, false, true, 1.712685532856761e9, 1.712685539948613e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl")], 0, false, false, true, 1.712685511562421e9, 1.712685539948652e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from worker 2 to 2 via 1", Any[Test.DefaultTestSet("from thread 2.1 to 2.1", Any[], 4, false, false, true, 1.712685539948781e9, 1.712685547250845e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 2.2 to 2.1", Any[], 4, false, false, true, 1.712685547250929e9, 1.712685554467226e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from th
read 2.1 to 2.2", Any[], 4, false, false, true, 1.712685554467299e9, 1.712685561768197e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl"), Test.DefaultTestSet("from thread 2.2 to 2.2", Any[], 4, false, false, true, 1.71268556176827e9, 1.712685568972127e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl")], 0, false, false, true, 1.712685539948732e9, 1.712685568972152e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl")], 0, false, false, true, 1.71268545575782e9, 1.712685568972163e9, false, "/home/james/git/Distributed.jl/distributed-thread-safety.jl")

I'll also try running the Julia tests with that version.

@JamesWrigley
Copy link
Collaborator

I had some issues with my machine running out of memory so I skipped the SparseArrays and LinearAlgebra tests with tests/runtests.jl -LinearAlgebra -SparseArrays. I saw failures with Artifacts:

Error in testset Artifacts:
Test Failed at /home/james/git/julia/usr/share/julia/stdlib/v1.12/Artifacts/test/runtests.jl:57
  Expression: load_overrides() == empty_output
   Evaluated: Dict{Symbol, Any}(:UUID => Dict{Base.UUID, Dict{String, Union{Base.SHA1, String}}}(Base.UUID("6b5019fb-a83d-5b4e-a9f7-678a36c28df7") => Dict("jlqml" => "/home/james/git/jlqml/build")), :hash => Dict{Base.SHA1, Union{Base.SHA1
, String}}()) == Dict{Symbol, Any}(:UUID => Dict{Base.UUID, Dict{String, Union{Base.SHA1, String}}}(), :hash => Dict{Base.SHA1, Union{Base.SHA1, String}}())

Error in testset Artifacts:
Test Failed at /home/james/git/julia/usr/share/julia/stdlib/v1.12/Artifacts/test/runtests.jl:73
  Expression: load_overrides() == empty_output
   Evaluated: Dict{Symbol, Any}(:UUID => Dict{Base.UUID, Dict{String, Union{Base.SHA1, String}}}(Base.UUID("6b5019fb-a83d-5b4e-a9f7-678a36c28df7") => Dict("jlqml" => "/home/james/git/jlqml/build")), :hash => Dict{Base.SHA1, Union{Base.SHA1
, String}}()) == Dict{Symbol, Any}(:UUID => Dict{Base.UUID, Dict{String, Union{Base.SHA1, String}}}(), :hash => Dict{Base.SHA1, Union{Base.SHA1, String}}())

Error in testset Artifacts:
Test Failed at /home/james/git/julia/usr/share/julia/stdlib/v1.12/Artifacts/test/runtests.jl:87
  Expression: load_overrides(force = true) == empty_output
   Evaluated: Dict{Symbol, Any}(:UUID => Dict{Base.UUID, Dict{String, Union{Base.SHA1, String}}}(Base.UUID("6b5019fb-a83d-5b4e-a9f7-678a36c28df7") => Dict("jlqml" => "/home/james/git/jlqml/build")), :hash => Dict{Base.SHA1, Union{Base.SHA1
, String}}()) == Dict{Symbol, Any}(:UUID => Dict{Base.UUID, Dict{String, Union{Base.SHA1, String}}}(), :hash => Dict{Base.SHA1, Union{Base.SHA1, String}}())

Sockets (but I think this is something to do with my machine):

Error in testset Sockets:
Error During Test at /home/james/git/julia/usr/share/julia/stdlib/v1.12/Sockets/test/runtests.jl:228
  Test threw exception
  Expression: getnameinfo(ip"::ffff:0.1.1.1") == "::ffff:0.1.1.1"
  DNSError: ip"::ffff:1:101", temporary failure (EAI_AGAIN) 
  Stacktrace:
   [1] getnameinfo(address::Sockets.IPv6)
     @ Sockets ~/git/julia/usr/share/julia/stdlib/v1.12/Sockets/src/addrinfo.jl:246
   [2] top-level scope
     @ ~/git/julia/usr/share/julia/stdlib/v1.12/Sockets/test/runtests.jl:199
   [3] macro expansion
     @ ~/git/julia/usr/share/julia/stdlib/v1.12/Test/src/Test.jl:164 [inlined]
   [4] macro expansion
     @ ~/git/julia/usr/share/julia/stdlib/v1.12/Sockets/test/runtests.jl:228 [inlined]
   [5] macro expansion
     @ ~/git/julia/usr/share/julia/stdlib/v1.12/Test/src/Test.jl:164 [inlined]

And some errors from FileWatching:

      From worker 10:   error in running finalizer: ErrorException("`ccall` requires the compiler")                                                                                                                                            
      From worker 10:   iolock_begin at libuv.jl:48                                                                                                                                                                                            
      From worker 10:   uvfinalize at /home/james/git/julia/usr/share/julia/stdlib/v1.12/FileWatching/src/FileWatching.jl:331                                                                                                                  

But as far as I can tell these are unrelated to Distributed. If that's the case, would it be possible to get this merged and make a new release?

@@ -491,7 +509,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
while true
if isempty(launched)
istaskdone(t_launch) && break
@async (sleep(1); notify(launch_ntfy))
@async begin
sleep(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this why addprocs() always takes at least 1s for me? 🤔

@JamesWrigley
Copy link
Collaborator

Bump 🙈 Is there any chance of getting this merged in time for 1.11? CC @vchuravy

@JamesWrigley
Copy link
Collaborator

Superseded by #101.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants