-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
distributed_exec.jl
1858 lines (1596 loc) · 55 KB
/
distributed_exec.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# This file is a part of Julia. License is MIT: https://julialang.org/license
using Test, Distributed, Random, Serialization, Sockets
import Distributed: launch, manage
@test cluster_cookie() isa String
include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl"))
@test Distributed.extract_imports(:(begin; import Foo, Bar; let; using Baz; end; end)) ==
Any[:(import Foo, Bar), :(using Baz)]
# Test a few "remote" invocations when no workers are present
@test remote(myid)() == 1
@test pmap(identity, 1:100) == [1:100...]
@test 100 == @distributed (+) for i in 1:100
1
end
addprocs_with_testenv(4)
@test nprocs() == 5
# distributed loading of packages
# setup
@everywhere begin
old_act_proj = Base.ACTIVE_PROJECT[]
pushfirst!(Base.LOAD_PATH, "@")
Base.ACTIVE_PROJECT[] = joinpath(Sys.BINDIR, "..", "share", "julia", "test", "TestPkg")
end
# cause precompilation of TestPkg to avoid race condition
Base.compilecache(Base.identify_package("TestPkg"))
@everywhere using TestPkg
@everywhere using TestPkg
@everywhere begin
Base.ACTIVE_PROJECT[] = old_act_proj
popfirst!(Base.LOAD_PATH)
end
@everywhere using Test, Random, LinearAlgebra
id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]
# Test role
@everywhere using Distributed
@test Distributed.myrole() === :master
for wid = workers()
wrole = remotecall_fetch(wid) do
Distributed.myrole()
end
@test wrole === :worker
end
# Test remote()
let
pool = default_worker_pool()
count = 0
count_condition = Condition()
function remote_wait(c)
@async_logerr begin
count += 1
remote(take!)(c)
count -= 1
notify(count_condition)
end
yield()
end
testchannels = [RemoteChannel() for i in 1:nworkers()]
testcount = 0
@test isready(pool) == true
for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false
for c in testchannels
@test count == testcount
put!(c, "foo")
testcount -= 1
(count == testcount) || wait(count_condition)
@test count == testcount
@test isready(pool) == true
end
@test count == 0
for c in testchannels
@test count == testcount
remote_wait(c)
testcount += 1
end
@test count == testcount
@test isready(pool) == false
for c in reverse(testchannels)
@test count == testcount
put!(c, "foo")
testcount -= 1
(count == testcount) || wait(count_condition)
@test count == testcount
@test isready(pool) == true
end
@test count == 0
end
# Test Futures
function testf(id)
f=Future(id)
@test isready(f) == false
@test f.v === nothing
put!(f, :OK)
@test isready(f) == true
@test f.v !== nothing
@test_throws ErrorException put!(f, :OK) # Cannot put! to a already set future
@test_throws MethodError take!(f) # take! is unsupported on a Future
@test fetch(f) == :OK
end
testf(id_me)
testf(id_other)
function poll_while(f::Function; timeout_seconds::Integer = 120)
start_time = time_ns()
while f()
sleep(1)
if ( ( time_ns() - start_time )/1e9 ) > timeout_seconds
@error "Timed out" timeout_seconds
return false
end
end
return true
end
function _getenv_include_thread_unsafe()
environment_variable_name = "JULIA_TEST_INCLUDE_THREAD_UNSAFE"
default_value = "false"
environment_variable_value = strip(get(ENV, environment_variable_name, default_value))
b = parse(Bool, environment_variable_value)::Bool
return b
end
const _env_include_thread_unsafe = _getenv_include_thread_unsafe()
function include_thread_unsafe()
if Threads.nthreads() > 1
if _env_include_thread_unsafe
return true
end
msg = "Skipping a thread-unsafe test because `Threads.nthreads() > 1`"
@warn msg Threads.nthreads()
Test.@test_broken false
return false
end
return true
end
# Distributed GC tests for Futures
function test_futures_dgc(id)
f = remotecall(myid, id)
fid = remoteref_id(f)
# remote value should be deleted after a fetch
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid) == true
@test f.v === nothing
@test fetch(f) == id
@test f.v !== nothing
yield(); # flush gc msgs
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid))
# if unfetched, it should be deleted after a finalize
f = remotecall(myid, id)
fid = remoteref_id(f)
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid) == true
@test f.v === nothing
finalize(f)
yield(); # flush gc msgs
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid))
end
test_futures_dgc(id_me)
test_futures_dgc(id_other)
# if sent to another worker, it should not be deleted till all references are fetched.
wid1 = workers()[1]
wid2 = workers()[2]
f = remotecall(myid, wid1)
fid = remoteref_id(f)
fstore = RemoteChannel(wid2)
put!(fstore, f)
@test fetch(f) == wid1
@test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == true
remotecall_fetch(r->(fetch(fetch(r)); yield()), wid2, fstore)
sleep(0.5) # to ensure that wid2 gc messages have been executed on wid1
@test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == false
# put! should release remote reference since it would have been cached locally
f = Future(wid1)
fid = remoteref_id(f)
# should not be created remotely till accessed
@test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == false
# create it remotely
isready(f)
@test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == true
put!(f, :OK)
@test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == false
@test fetch(f) == :OK
# RemoteException should be thrown on a put! when another process has set the value
f = Future(wid1)
fid = remoteref_id(f)
fstore = RemoteChannel(wid2)
put!(fstore, f) # send f to wid2
put!(f, :OK) # set value from master
@test remotecall_fetch(k->haskey(Distributed.PGRP.refs, k), wid1, fid) == true
testval = remotecall_fetch(wid2, fstore) do x
try
put!(fetch(x), :OK)
return 0
catch e
if isa(e, RemoteException)
return 1
else
return 2
end
end
end
@test testval == 1
# Issue number #25847
@everywhere function f25847(ref)
fetch(ref)
return true
end
f = remotecall_wait(identity, id_other, ones(10))
rrid = Distributed.RRID(f.whence, f.id)
remotecall_fetch(f25847, id_other, f)
@test BitSet([id_me]) == remotecall_fetch(()->Distributed.PGRP.refs[rrid].clientset, id_other)
remotecall_fetch(f25847, id_other, f)
@test BitSet([id_me]) == remotecall_fetch(()->Distributed.PGRP.refs[rrid].clientset, id_other)
finalize(f)
yield() # flush gc msgs
@test false == remotecall_fetch(chk_rrid->(yield(); haskey(Distributed.PGRP.refs, chk_rrid)), id_other, rrid)
# Distributed GC tests for RemoteChannels
function test_remoteref_dgc(id)
rr = RemoteChannel(id)
put!(rr, :OK)
rrid = remoteref_id(rr)
# remote value should be deleted after finalizing the ref
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid) == true
@test fetch(rr) == :OK
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid) == true
finalize(rr)
yield(); # flush gc msgs
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid))
end
test_remoteref_dgc(id_me)
test_remoteref_dgc(id_other)
# if sent to another worker, it should not be deleted till the other worker has also finalized.
let wid1 = workers()[1],
wid2 = workers()[2],
rr = RemoteChannel(wid1),
rrid = remoteref_id(rr),
fstore = RemoteChannel(wid2)
put!(fstore, rr)
if include_thread_unsafe()
@test remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid) == true
end
finalize(rr) # finalize locally
yield() # flush gc msgs
if include_thread_unsafe()
@test remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid) == true
end
remotecall_fetch(r -> (finalize(take!(r)); yield(); nothing), wid2, fstore) # finalize remotely
sleep(0.5) # to ensure that wid2 messages have been executed on wid1
@test poll_while(() -> remotecall_fetch(k -> haskey(Distributed.PGRP.refs, k), wid1, rrid))
end
# Tests for issue #23109 - should not hang.
f = @spawnat :any rand(1, 1)
@Base.Experimental.sync begin
for _ in 1:10
@async fetch(f)
end
end
wid1, wid2 = workers()[1:2]
f = @spawnat wid1 rand(1,1)
@Base.Experimental.sync begin
@async fetch(f)
@async remotecall_fetch(()->fetch(f), wid2)
end
@test fetch(@spawnat id_other myid()) == id_other
@test (@fetchfrom id_other myid()) == id_other
pids=[]
for i in 1:nworkers()
push!(pids, @fetch myid())
end
@test sort(pids) == sort(workers())
# test getindex on Futures and RemoteChannels
function test_indexing(rr)
a = rand(5,5)
put!(rr, a)
@test rr[2,3] == a[2,3]
@test rr[] == a
end
test_indexing(Future())
test_indexing(Future(id_other))
test_indexing(RemoteChannel())
test_indexing(RemoteChannel(id_other))
# Test ser/deser to non-ClusterSerializer objects.
function test_regular_io_ser(ref::Distributed.AbstractRemoteRef)
io = IOBuffer()
serialize(io, ref)
seekstart(io)
ref2 = deserialize(io)
for fld in fieldnames(typeof(ref))
v = getfield(ref2, fld)
if isa(v, Number)
@test v === zero(typeof(v))
elseif fld == :lock
@test v isa ReentrantLock
@test !islocked(v)
elseif v !== nothing
error(string("Add test for field ", fld))
end
end
end
test_regular_io_ser(Future())
test_regular_io_ser(RemoteChannel())
# Test @distributed load balancing - all processors should get either M or M+1
# iterations out of the loop range for some M.
ids = @distributed((a,b)->[a;b], for i=1:7; myid(); end)
workloads = Int[sum(ids .== i) for i in 2:nprocs()]
@test maximum(workloads) - minimum(workloads) <= 1
# @distributed reduction should work even with very short ranges
@test @distributed(+, for i=1:2; i; end) == 3
@test_throws ArgumentError sleep(-1)
@test_throws ArgumentError timedwait(()->false, 0.1, pollint=-0.5)
# specify pids for pmap
@test sort(workers()[1:2]) == sort(unique(pmap(x->(sleep(0.1);myid()), WorkerPool(workers()[1:2]), 1:10)))
# Testing buffered and unbuffered reads
# This large array should write directly to the socket
a = fill(1, 10^6)
@test a == remotecall_fetch((x)->x, id_other, a)
# Not a bitstype, should be buffered
s = [randstring() for x in 1:10^5]
@test s == remotecall_fetch((x)->x, id_other, s)
#large number of small requests
num_small_requests = 10000
@test fill(id_other, num_small_requests) == [remotecall_fetch(myid, id_other) for i in 1:num_small_requests]
# test parallel sends of large arrays from multiple tasks to the same remote worker
ntasks = 10
rr_list = [Channel(1) for x in 1:ntasks]
for rr in rr_list
local rr
let rr = rr
@async try
for i in 1:10
a = rand(2*10^5)
@test a == remotecall_fetch(x->x, id_other, a)
yield()
end
put!(rr, :OK)
catch
put!(rr, :ERROR)
end
end
end
@test [fetch(rr) for rr in rr_list] == [:OK for x in 1:ntasks]
function test_channel(c)
@test isopen(c) == true
put!(c, 1)
put!(c, "Hello")
put!(c, 5.0)
@test isready(c) == true
@test isopen(c) == true
@test fetch(c) == 1
@test fetch(c) == 1 # Should not have been popped previously
@test take!(c) == 1
@test take!(c) == "Hello"
@test fetch(c) == 5.0
@test take!(c) == 5.0
@test isready(c) == false
@test isopen(c) == true
close(c)
@test isopen(c) == false
end
test_channel(Channel(10))
test_channel(RemoteChannel(()->Channel(10)))
c=Channel{Int}(1)
@test_throws MethodError put!(c, "Hello")
# test channel iterations
function test_iteration(in_c, out_c)
t=@async for v in in_c
put!(out_c, v)
end
@test isopen(in_c) == true
put!(in_c, 1)
@test take!(out_c) == 1
put!(in_c, "Hello")
close(in_c)
@test take!(out_c) == "Hello"
@test isopen(in_c) == false
@test_throws InvalidStateException put!(in_c, :foo)
yield()
@test istaskdone(t) == true
end
test_iteration(Channel(10), Channel(10))
# make sure exceptions propagate when waiting on Tasks
@test_throws CompositeException (@sync (@async error("oops")))
try
@sync begin
for i in 1:5
@async error(i)
end
end
error("unexpected")
catch ex
@test typeof(ex) == CompositeException
@test length(ex) == 5
@test typeof(ex.exceptions[1]) == TaskFailedException
@test typeof(ex.exceptions[1].task.exception) == ErrorException
# test start, next, and done
for (i, i_ex) in enumerate(ex)
@test i == parse(Int, i_ex.task.exception.msg)
end
# test showerror
err_str = sprint(showerror, ex)
err_one_str = sprint(showerror, ex.exceptions[1])
@test err_str == err_one_str * "\n\n...and 4 more exceptions.\n"
end
@test sprint(showerror, CompositeException()) == "CompositeException()\n"
function test_remoteexception_thrown(expr)
try
expr()
error("unexpected")
catch ex
@test typeof(ex) == RemoteException
@test typeof(ex.captured) == CapturedException
@test typeof(ex.captured.ex) == ErrorException
@test ex.captured.ex.msg == "foobar"
end
end
for id in [id_other, id_me]
local id
test_remoteexception_thrown() do
remotecall_fetch(id) do
throw(ErrorException("foobar"))
end
end
test_remoteexception_thrown() do
remotecall_wait(id) do
throw(ErrorException("foobar"))
end
end
test_remoteexception_thrown() do
wait(remotecall(id) do
throw(ErrorException("foobar"))
end)
end
end
# make sure the stackframe from the remote error can be serialized
let ex
try
remotecall_fetch(id_other) do
@eval module AModuleLocalToOther
foo() = throw(ErrorException("A.error"))
foo()
end
end
catch ex
end
@test (ex::RemoteException).pid == id_other
@test ((ex.captured::CapturedException).ex::ErrorException).msg == "A.error"
bt = ex.captured.processed_bt::Array{Any,1}
@test length(bt) > 1
frame, repeated = bt[1]::Tuple{Base.StackTraces.StackFrame, Int}
@test frame.func == :foo
@test frame.linfo === nothing
@test repeated == 1
end
# pmap tests. Needs at least 4 processors dedicated to the below tests. Which we currently have
# since the distributed tests are now spawned as a separate set.
# Test all combinations of pmap keyword args.
pmap_args = [
(:distributed, [:default, false]),
(:batch_size, [:default,2]),
(:on_error, [:default, e -> (e.msg == "foobar" ? true : rethrow())]),
(:retry_delays, [:default, fill(0.001, 1000)]),
(:retry_check, [:default, (s,e) -> (s,endswith(e.msg,"foobar"))]),
]
kwdict = Dict()
function walk_args(i)
if i > length(pmap_args)
kwargs = []
for (k,v) in kwdict
if v !== :default
push!(kwargs, (k,v))
end
end
data = 1:100
testw = kwdict[:distributed] === false ? [1] : workers()
if kwdict[:retry_delays] !== :default
mapf = x -> iseven(myid()) ? error("notfoobar") : (x*2, myid())
results_test = pmap_res -> begin
results = [x[1] for x in pmap_res]
pids = [x[2] for x in pmap_res]
@test results == [2:2:200...]
for p in testw
if isodd(p)
@test p in pids
else
@test !(p in pids)
end
end
end
elseif kwdict[:on_error] === :default
mapf = x -> (x*2, myid())
results_test = pmap_res -> begin
results = [x[1] for x in pmap_res]
pids = [x[2] for x in pmap_res]
@test results == [2:2:200...]
for p in testw
@test p in pids
end
end
else
mapf = x -> iseven(x) ? error("foobar") : (x*2, myid())
results_test = pmap_res -> begin
w = testw
for (idx,x) in enumerate(data)
if iseven(x)
@test pmap_res[idx] == true
else
@test pmap_res[idx][1] == x*2
@test pmap_res[idx][2] in w
end
end
end
end
try
results_test(pmap(mapf, data; kwargs...))
catch
println("pmap executing with args : ", kwargs)
rethrow()
end
return
end
kwdict[pmap_args[i][1]] = pmap_args[i][2][1]
walk_args(i+1)
kwdict[pmap_args[i][1]] = pmap_args[i][2][2]
walk_args(i+1)
end
# Start test for various kw arg combinations
walk_args(1)
include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "generic_map_tests.jl"))
empty_pool = WorkerPool([myid()])
pmap_fallback = (f, c...) -> pmap(f, empty_pool, c...)
generic_map_tests(pmap_fallback)
# pmap with various types. Test for equivalence with map
run_map_equivalence_tests(pmap)
@test pmap(uppercase, "Hello World!") == map(uppercase, "Hello World!")
# Simple test for pmap throws error
let error_thrown = false
try
pmap(x -> x == 50 ? error("foobar") : x, 1:100)
catch e
@test e.captured.ex.msg == "foobar"
error_thrown = true
end
@test error_thrown
end
# Test pmap with a generator type iterator
@test [1:100...] == pmap(x->x, Base.Generator(x->(sleep(0.0001); x), 1:100))
# Test pgenerate
n = 10
as = [rand(4,4) for i in 1:n]
bs = deepcopy(as)
cs = collect(Distributed.pgenerate(x->(sleep(rand()*0.1); svd(x)), bs))
svdas = map(svd, as)
for i in 1:n
@test cs[i].U ≈ svdas[i].U
@test cs[i].S ≈ svdas[i].S
@test cs[i].V ≈ svdas[i].V
end
# Test that the default worker pool cycles through all workers
pmap(_->myid(), 1:nworkers()) # priming run
@test nworkers() == length(unique(pmap(_->myid(), 1:100)))
# Test same behaviour when executed on a worker
@test nworkers() == length(unique(remotecall_fetch(()->pmap(_->myid(), 1:100), id_other)))
# Same tests with custom worker pools.
wp = WorkerPool(workers())
@test nworkers() == length(unique(pmap(_->myid(), wp, 1:100)))
@test nworkers() == length(unique(remotecall_fetch(wp->pmap(_->myid(), wp, 1:100), id_other, wp)))
# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(x->x, wp, 1:100)
clear!(wp)
@test length(wp.map_obj2ref) == 0
# The below block of tests are usually run only on local development systems, since:
# - tests which print errors
# - addprocs tests are memory intensive
# - ssh addprocs requires sshd to be running locally with passwordless login enabled.
# The test block is enabled by defining env JULIA_TESTFULL=1
DoFullTest = Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0"))))
if DoFullTest
println("Testing exception printing on remote worker from a `remote_do` call")
println("Please ensure the remote error and backtrace is displayed on screen")
remote_do(id_other) do
throw(ErrorException("TESTING EXCEPTION ON REMOTE DO. PLEASE IGNORE"))
end
sleep(0.5) # Give some time for the above error to be printed
println("\n\nThe following 'invalid connection credentials' error messages are to be ignored.")
all_w = workers()
# Test sending fake data to workers. The worker processes will print an
# error message but should not terminate.
for w in Distributed.PGRP.workers
if isa(w, Distributed.Worker)
local s = connect(w.config.host, w.config.port)
write(s, randstring(32))
end
end
@test workers() == all_w
@test all([p == remotecall_fetch(myid, p) for p in all_w])
if Sys.isunix() # aka have ssh
function test_n_remove_pids(new_pids)
for p in new_pids
w_in_remote = sort(remotecall_fetch(workers, p))
try
@test intersect(new_pids, w_in_remote) == new_pids
catch
print("p : $p\n")
print("newpids : $new_pids\n")
print("w_in_remote : $w_in_remote\n")
print("intersect : $(intersect(new_pids, w_in_remote))\n\n\n")
rethrow()
end
end
remotecall_fetch(rmprocs, 1, new_pids)
end
print("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.\n")
print("Please ensure: \n")
print("1) sshd is running locally with passwordless login enabled.\n")
print("2) Env variable USER is defined and is the ssh user.\n")
print("3) Port 9300 is not in use.\n")
sshflags = `-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=ERROR `
#Issue #9951
hosts=[]
localhost_aliases = ["localhost", string(getipaddr()), "127.0.0.1"]
num_workers = parse(Int,(get(ENV, "JULIA_ADDPROCS_NUM", "9")))
for i in 1:(num_workers/length(localhost_aliases))
append!(hosts, localhost_aliases)
end
print("\nTesting SSH addprocs with $(length(hosts)) workers...\n")
new_pids = addprocs_with_testenv(hosts; sshflags=sshflags)
@test length(new_pids) == length(hosts)
test_n_remove_pids(new_pids)
print("\nMixed ssh addprocs with :auto\n")
new_pids = addprocs_with_testenv(["localhost", ("127.0.0.1", :auto), "localhost"]; sshflags=sshflags)
@test length(new_pids) == (2 + Sys.CPU_THREADS)
test_n_remove_pids(new_pids)
print("\nMixed ssh addprocs with numeric counts\n")
new_pids = addprocs_with_testenv([("localhost", 2), ("127.0.0.1", 2), "localhost"]; sshflags=sshflags)
@test length(new_pids) == 5
test_n_remove_pids(new_pids)
print("\nssh addprocs with tunnel\n")
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, sshflags=sshflags)
@test length(new_pids) == num_workers
test_n_remove_pids(new_pids)
print("\nssh addprocs with tunnel (SSH multiplexing)\n")
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, multiplex=true, sshflags=sshflags)
@test length(new_pids) == num_workers
controlpath = joinpath(homedir(), ".ssh", "julia-$(ENV["USER"])@localhost:22")
@test issocket(controlpath)
test_n_remove_pids(new_pids)
@test :ok == timedwait(()->!issocket(controlpath), 10.0; pollint=0.5)
print("\nAll supported formats for hostname\n")
h1 = "localhost"
user = ENV["USER"]
h2 = "$user@$h1"
h3 = "$h2:22"
h4 = "$h3 $(string(getipaddr()))"
h5 = "$h4:9300"
new_pids = addprocs_with_testenv([h1, h2, h3, h4, h5]; sshflags=sshflags)
@test length(new_pids) == 5
test_n_remove_pids(new_pids)
print("\nkeyword arg exename\n")
for exename in [`$(joinpath(Sys.BINDIR, Base.julia_exename()))`, "$(joinpath(Sys.BINDIR, Base.julia_exename()))"]
for addp_func in [()->addprocs_with_testenv(["localhost"]; exename=exename, exeflags=test_exeflags, sshflags=sshflags),
()->addprocs_with_testenv(1; exename=exename, exeflags=test_exeflags)]
local new_pids = addp_func()
@test length(new_pids) == 1
test_n_remove_pids(new_pids)
end
end
end # unix-only
end # full-test
let t = @task 42
schedule(t, ErrorException(""), error=true)
@test_throws TaskFailedException(t) Base.wait(t)
end
# issue #8207
let A = Any[]
@distributed (+) for i in (push!(A,1); 1:2)
i
end
@test length(A) == 1
end
# issue #13168
function f13168(n)
val = 0
for i = 1:n
val += sum(rand(n, n)^2)
end
return val
end
let t = schedule(@task f13168(100))
@test t.state == :runnable
@test t.queue !== nothing
@test_throws ErrorException schedule(t)
yield()
@test t.state == :done
@test t.queue === nothing
@test_throws ErrorException schedule(t)
@test isa(fetch(t), Float64)
end
# issue #13122
@test remotecall_fetch(identity, workers()[1], C_NULL) === C_NULL
# issue #11062
function t11062()
@async v11062 = 1
v11062 = 2
end
@test t11062() == 2
# issue #15406
v15406 = remotecall_wait(() -> 1, id_other)
fetch(v15406)
remotecall_wait(fetch, id_other, v15406)
# issue #43396
# Covers the remote fetch where the value returned is `nothing`
# May be caused by attempting to unwrap a non-`Some` type with `something`
# `call_on_owner` ref fetches return values not wrapped in `Some`
# and have to be returned directly
@test nothing === fetch(remotecall(() -> nothing, workers()[1]))
@test 10 === fetch(remotecall(() -> 10, workers()[1]))
# Test various forms of remotecall* invocations
@everywhere f_args(v1, v2=0; kw1=0, kw2=0) = v1+v2+kw1+kw2
function test_f_args(result, args...; kwargs...)
@test fetch(remotecall(args...; kwargs...)) == result
@test fetch(remotecall_wait(args...; kwargs...)) == result
@test remotecall_fetch(args...; kwargs...) == result
# A visual test - remote_do should NOT print any errors
remote_do(args...; kwargs...)
end
for tid in [id_other, id_me, default_worker_pool()]
test_f_args(1, f_args, tid, 1)
test_f_args(3, f_args, tid, 1, 2)
test_f_args(5, f_args, tid, 1; kw1=4)
test_f_args(13, f_args, tid, 1; kw1=4, kw2=8)
test_f_args(15, f_args, tid, 1, 2; kw1=4, kw2=8)
end
# Test remote_do
f=Future(id_me)
remote_do(fut->put!(fut, myid()), id_me, f)
@test fetch(f) == id_me
f=Future(id_other)
remote_do(fut->put!(fut, myid()), id_other, f)
@test fetch(f) == id_other
# Github issue #29932
rc_unbuffered = RemoteChannel(()->Channel{Vector{Float64}}(0))
@test eltype(rc_unbuffered) == Vector{Float64}
@async begin
# Trigger direct write (no buffering) of largish array
array_sz = Int(Base.SZ_UNBUFFERED_IO/8) + 1
largev = zeros(array_sz)
for i in 1:10
largev[1] = float(i)
put!(rc_unbuffered, largev)
end
end
@test remotecall_fetch(rc -> begin
for i in 1:10
take!(rc)[1] != float(i) && error("Failed")
end
return :OK
end, id_other, rc_unbuffered) == :OK
# github issue 33972
rc_unbuffered_other = RemoteChannel(()->Channel{Int}(0), id_other)
close(rc_unbuffered_other)
try; take!(rc_unbuffered_other); catch; end
@test !remotecall_fetch(rc -> islocked(Distributed.lookup_ref(remoteref_id(rc)).synctake),
id_other, rc_unbuffered_other)
# github PR #14456
n = DoFullTest ? 6 : 5
for i = 1:10^n
fetch(@spawnat myid() myid())
end
# issue #15451
@test remotecall_fetch(x->(y->2y)(x)+1, workers()[1], 3) == 7
# issue #16091
mutable struct T16091 end
wid = workers()[1]
@test try
remotecall_fetch(()->T16091, wid)
false
catch ex
((ex::RemoteException).captured::CapturedException).ex === UndefVarError(:T16091)
end
@test try
remotecall_fetch(identity, wid, T16091)
false
catch ex
((ex::RemoteException).captured::CapturedException).ex === UndefVarError(:T16091)
end
f16091a() = 1
remotecall_fetch(()->eval(:(f16091a() = 2)), wid)
@test remotecall_fetch(f16091a, wid) === 2
@test remotecall_fetch((myid)->remotecall_fetch(f16091a, myid), wid, myid()) === 1
# these will only heisen-fail, since it depends on the gensym counter collisions:
f16091b = () -> 1
remotecall_fetch(()->eval(:(f16091b = () -> 2)), wid)
@test remotecall_fetch(f16091b, 2) === 1
# Global anonymous functions are over-written...
@test remotecall_fetch((myid)->remotecall_fetch(f16091b, myid), wid, myid()) === 1
# ...while local anonymous functions are by definition, local.
let
f16091c = () -> 1
@test remotecall_fetch(f16091c, 2) === 1
@test remotecall_fetch(
myid -> begin
let
f16091c = () -> 2
remotecall_fetch(f16091c, myid)
end
end, wid, myid()) === 2
end
# issue #16451
rng=RandomDevice()
retval = @distributed (+) for _ in 1:10
rand(rng)
end
@test retval > 0.0 && retval < 10.0
rand(rng)
retval = @distributed (+) for _ in 1:10
rand(rng)
end
@test retval > 0.0 && retval < 10.0
# serialization tests
wrkr1 = workers()[1]
wrkr2 = workers()[end]
@test remotecall_fetch(p->remotecall_fetch(myid, p), wrkr1, wrkr2) == wrkr2
# Send f to wrkr1 and wrkr2. Then try calling f on wrkr2 from wrkr1
f_myid = ()->myid()
@test wrkr1 == remotecall_fetch(f_myid, wrkr1)
@test wrkr2 == remotecall_fetch(f_myid, wrkr2)
@test wrkr2 == remotecall_fetch((f, p)->remotecall_fetch(f, p), wrkr1, f_myid, wrkr2)
# Deserialization error recovery test
# locally defined module, but unavailable on workers
module LocalFoo
global foo=1
end
let
@test_throws RemoteException remotecall_fetch(()->LocalFoo.foo, 2)
bad_thunk = ()->NonexistantModule.f()
@test_throws RemoteException remotecall_fetch(bad_thunk, 2)
# Test that the stream is still usable
@test remotecall_fetch(()->:test,2) == :test