Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restart worker during tests depending on resident memory size #13577

Merged
merged 1 commit into from
Oct 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -336,30 +336,35 @@ function rmprocs(args...; waitfor = 0.0)
error("only process 1 can add and remove processes")
end

rmprocset = []
for i in vcat(args...)
if i == 1
warn("rmprocs: process 1 not removed")
else
if haskey(map_pid_wrkr, i)
w = map_pid_wrkr[i]
set_worker_state(w, W_TERMINATING)
kill(w.manager, i, w.config)
push!(rmprocset, w)
lock(worker_lock)
try
rmprocset = []
for i in vcat(args...)
if i == 1
warn("rmprocs: process 1 not removed")
else
if haskey(map_pid_wrkr, i)
w = map_pid_wrkr[i]
set_worker_state(w, W_TERMINATING)
kill(w.manager, i, w.config)
push!(rmprocset, w)
end
end
end
end

start = time()
while (time() - start) < waitfor
if all(w -> w.state == W_TERMINATED, rmprocset)
break;
else
sleep(0.1)
start = time()
while (time() - start) < waitfor
if all(w -> w.state == W_TERMINATED, rmprocset)
break;
else
sleep(0.1)
end
end
end

((waitfor > 0) && any(w -> w.state != W_TERMINATED, rmprocset)) ? :timed_out : :ok
((waitfor > 0) && any(w -> w.state != W_TERMINATED, rmprocset)) ? :timed_out : :ok
finally
unlock(worker_lock)
end
end


Expand Down Expand Up @@ -1075,7 +1080,20 @@ end
# `manager` is of type ClusterManager. The respective managers are responsible
# for launching the workers. All keyword arguments (plus a few default values)
# are available as a dictionary to the `launch` methods
#
# Only one addprocs can be in progress at any time
#
const worker_lock = ReentrantLock()
function addprocs(manager::ClusterManager; kwargs...)
lock(worker_lock)
try
addprocs_locked(manager::ClusterManager; kwargs...)
finally
unlock(worker_lock)
end
end

function addprocs_locked(manager::ClusterManager; kwargs...)

params = merge(default_addprocs_params(), AnyDict(kwargs))
topology(symbol(params[:topology]))
Expand Down
2 changes: 2 additions & 0 deletions base/sysinfo.jl
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,6 @@ function set_process_title(title::AbstractString)
uv_error("set_process_title", err)
end

maxrss() = ccall(:jl_maxrss, Csize_t, ())

end # module Sys
28 changes: 27 additions & 1 deletion src/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#ifndef _OS_WINDOWS_
#ifdef _OS_WINDOWS_
#include <psapi.h>
#else
#include <sys/sysctl.h>
#include <sys/wait.h>
#include <sys/ptrace.h>
Expand Down Expand Up @@ -742,6 +744,30 @@ DLLEXPORT jl_sym_t* jl_get_ARCH()
return ARCH;
}

DLLEXPORT size_t jl_maxrss()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth using the libuv function directly from julia instead of this? (https://github.com/nodejs/node-v0.x-archive/blob/master/deps/uv/include/uv.h#L1020-L1039)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that in the first commit, but Windows is not supported in the libuv interface - it returns 0 for maxrss size. Hence decided to just have a simple solution that works on all 3 test platforms.

{
#if defined(_OS_WINDOWS_)
PROCESS_MEMORY_COUNTERS counter;
GetProcessMemoryInfo( GetCurrentProcess( ), &counter, sizeof(counter) );
return (size_t)counter.PeakWorkingSetSize;

#elif defined(_OS_LINUX_) || defined(_OS_DARWIN_) || defined (_OS_FREEBSD_)
struct rusage rusage;
getrusage( RUSAGE_SELF, &rusage );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any idea about bsd?


#if defined(_OS_LINUX_)
return (size_t)(rusage.ru_maxrss * 1024);
#else
return (size_t)rusage.ru_maxrss;
#endif

#else
return (size_t)0;
#endif
}



#ifdef __cplusplus
}
#endif
5 changes: 1 addition & 4 deletions test/choosetests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function choosetests(choices = [])
"nullable", "meta", "profile", "libgit2", "docs", "markdown",
"base64", "serialize", "functors", "misc",
"enums", "cmdlineargs", "i18n", "workspace", "libdl", "int",
"intset", "floatfuncs", "compile"
"intset", "floatfuncs", "compile", "parallel"
]

if Base.USE_GPL_LIBS
Expand All @@ -43,9 +43,6 @@ function choosetests(choices = [])
push!(testnames, "examples")
end

# parallel tests depend on other workers - do them last
push!(testnames, "parallel")

tests = []
skip_tests = []

Expand Down
9 changes: 4 additions & 5 deletions test/examples.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,13 @@ end
dc_path = joinpath(dir, "dictchannel.jl")
include(dc_path)

w_set=filter!(x->x != myid(), workers())
pid = length(w_set) > 0 ? w_set[1] : myid()

remotecall_fetch(pid, dc_path) do f
# Run the remote on pid 1, since runtests may terminate workers
# at any time depending on memory usage
remotecall_fetch(1, dc_path) do f
include(f)
nothing
end
dc=RemoteRef(()->DictChannel(), pid)
dc=RemoteRef(()->DictChannel(), 1)
@test typeof(dc) == RemoteRef{DictChannel}

@test isready(dc) == false
Expand Down
Loading