Skip to content

Commit

Permalink
Separate keywords worker_prefix and manager_pod_name (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
omus authored Apr 12, 2023
1 parent 0971fd6 commit e8a90b3
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 25 deletions.
11 changes: 7 additions & 4 deletions src/native_driver.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const DEREGISTER_ALERT = Condition()

struct K8sClusterManager <: ClusterManager
np::Int
pod_name::String
worker_prefix::String
image::String
cpu::String
memory::String
Expand All @@ -32,6 +32,8 @@ available.
`current_namespace()`.
- `manager_pod_name`: the name of the manager pod. Defaults to `gethostname()` which is
the name of the pod when executed inside of a Kubernetes pod.
- `worker_prefix`: the prefix given to spawned workers. Defaults to
`"\$(manager_pod_name)-worker"`.
- `image`: Docker image to use for the workers. Defaults to using the image of the Julia
caller if running within a pod using a single container otherwise is a required argument.
- `cpu`: [CPU resources requested](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu)
Expand All @@ -47,7 +49,8 @@ available.
"""
function K8sClusterManager(np::Integer;
namespace::AbstractString=current_namespace(),
manager_pod_name::AbstractString=gethostname(),
manager_pod_name=gethostname(),
worker_prefix::AbstractString="$(manager_pod_name)-worker",
image=nothing,
cpu=DEFAULT_WORKER_CPU,
memory=DEFAULT_WORKER_MEMORY,
Expand All @@ -68,7 +71,7 @@ function K8sClusterManager(np::Integer;
end
end

return K8sClusterManager(np, manager_pod_name, image, string(cpu), string(memory), pending_timeout, configure)
return K8sClusterManager(np, worker_prefix, image, string(cpu), string(memory), pending_timeout, configure)
end

struct TimeoutException <: Exception
Expand All @@ -78,7 +81,7 @@ end
Base.showerror(io::IO, e::TimeoutException) = print(io, "TimeoutException: ", e.msg)

function worker_pod_spec(manager::K8sClusterManager; kwargs...)
pod = worker_pod_spec(; manager_name=manager.pod_name,
pod = worker_pod_spec(; worker_prefix=manager.worker_prefix,
image=manager.image,
cpu=manager.cpu,
memory=manager.memory,
Expand Down
8 changes: 4 additions & 4 deletions src/pod.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,16 @@ function worker_pod_spec(pod::AbstractDict=POD_TEMPLATE; kwargs...)
end

function worker_pod_spec!(pod::AbstractDict;
manager_name::AbstractString,
worker_prefix::AbstractString,
image::AbstractString,
cmd::Cmd,
cpu=DEFAULT_WORKER_CPU,
memory=DEFAULT_WORKER_MEMORY,
service_account_name=nothing)
pod["metadata"]["generateName"] = "$(manager_name)-worker-"
pod["metadata"]["generateName"] = "$(worker_prefix)-"

# Set a label with the manager name to support easy termination of all workers
pod["metadata"]["labels"]["manager"] = manager_name
# Set a label with the `worker_prefix` to support easy termination of all workers
pod["metadata"]["labels"]["worker-prefix"] = worker_prefix

worker_container =
rdict("name" => "worker",
Expand Down
32 changes: 20 additions & 12 deletions test/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,10 @@ end

let job_name = "test-success"
@testset "$job_name" begin
worker_prefix = "$(job_name)-worker"
code = """
using Distributed, K8sClusterManagers
worker_prefix = "$worker_prefix"
function configure(pod)
push!(pod["metadata"]["labels"], "test" => "$job_name", $COMMON_LABELS...)
Expand All @@ -223,7 +225,7 @@ let job_name = "test-success"
pod["spec"]["containers"][1]["imagePullPolicy"] = "Never"
return pod
end
pids = addprocs(K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi"))
pids = addprocs(K8sClusterManager(1; configure, worker_prefix, pending_timeout=60, cpu="0.5", memory="300Mi"))
println("Num Processes: ", nprocs())
for i in workers()
Expand All @@ -250,8 +252,8 @@ let job_name = "test-success"
@info "Waiting for $job_name job. This could take up to 4 minutes..."
wait_job(job_name, condition=!isempty, timeout=4 * 60)

manager_pod = first(pod_names("job-name" => job_name))
worker_pod = first(pod_names("manager" => manager_pod))
manager_pod = only(pod_names("job-name" => job_name))
worker_pod = only(pod_names("worker-prefix" => worker_prefix))

manager_log = pod_logs(manager_pod)
call_matches = collect(eachmatch(POD_NAME_REGEX, manager_log))
Expand Down Expand Up @@ -291,8 +293,10 @@ end

let job_name = "test-multi-addprocs"
@testset "$job_name" begin
worker_prefix = "$(job_name)-worker"
code = """
using Distributed, K8sClusterManagers
worker_prefix = "$worker_prefix"
function configure(pod)
push!(pod["metadata"]["labels"], "test" => "$job_name", $COMMON_LABELS...)
Expand All @@ -302,7 +306,7 @@ let job_name = "test-multi-addprocs"
return pod
end
mgr = K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi")
mgr = K8sClusterManager(1; configure, worker_prefix, pending_timeout=60, cpu="0.5", memory="300Mi")
addprocs(mgr)
addprocs(mgr)
Expand All @@ -322,8 +326,8 @@ let job_name = "test-multi-addprocs"
@info "Waiting for $job_name job. This could take up to 4 minutes..."
wait_job(job_name, condition=!isempty, timeout=4 * 60)

manager_pod = first(pod_names("job-name" => job_name))
worker_pods = pod_names("manager" => manager_pod; sort_by="{.metadata.labels.worker-id}")
manager_pod = only(pod_names("job-name" => job_name))
worker_pods = pod_names("worker-prefix" => worker_prefix; sort_by="{.metadata.labels.worker-id}")

manager_log = pod_logs(manager_pod)
matches = collect(eachmatch(POD_NAME_REGEX, manager_log))
Expand Down Expand Up @@ -366,8 +370,10 @@ end

let job_name = "test-interrupt"
@testset "$job_name" begin
worker_prefix = "$(job_name)-worker"
code = """
using Distributed, K8sClusterManagers
worker_prefix = "$worker_prefix"
function configure(pod)
push!(pod["metadata"]["labels"], "test" => "$job_name", $COMMON_LABELS...)
Expand All @@ -377,7 +383,7 @@ let job_name = "test-interrupt"
return pod
end
mgr = K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi")
mgr = K8sClusterManager(1; configure, worker_prefix, pending_timeout=60, cpu="0.5", memory="300Mi")
pids = addprocs(mgr)
interrupt(pids)
"""
Expand All @@ -391,8 +397,8 @@ let job_name = "test-interrupt"
@info "Waiting for $job_name job. This could take up to 4 minutes..."
wait_job(job_name, condition=!isempty, timeout=4 * 60)

manager_pod = first(pod_names("job-name" => job_name))
worker_pod = first(pod_names("manager" => manager_pod))
manager_pod = only(pod_names("job-name" => job_name))
worker_pod = only(pod_names("worker-prefix" => worker_prefix))

manager_log = pod_logs(manager_pod)

Expand All @@ -417,8 +423,10 @@ end

let job_name = "test-oom"
@testset "$job_name" begin
worker_prefix = "$(job_name)-worker"
code = """
using Distributed, K8sClusterManagers
worker_prefix = "$worker_prefix"
function configure(pod)
push!(pod["metadata"]["labels"], "test" => "$job_name", $COMMON_LABELS...)
Expand All @@ -428,7 +436,7 @@ let job_name = "test-oom"
return pod
end
mgr = K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi")
mgr = K8sClusterManager(1; configure, worker_prefix, pending_timeout=60, cpu="0.5", memory="300Mi")
pids = addprocs(mgr)
@everywhere begin
Expand Down Expand Up @@ -462,8 +470,8 @@ let job_name = "test-oom"
@info "Waiting for $job_name job. This could take up to 4 minutes..."
wait_job(job_name, condition=!isempty, timeout=4 * 60)

manager_pod = first(pod_names("job-name" => job_name))
worker_pod = first(pod_names("manager" => manager_pod))
manager_pod = only(pod_names("job-name" => job_name))
worker_pod = only(pod_names("worker-prefix" => worker_prefix))

manager_log = pod_logs(manager_pod)
worker_oom_msg = "Worker 2 on pod $worker_pod was terminated due to: OOMKilled"
Expand Down
17 changes: 16 additions & 1 deletion test/native_driver.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,29 @@
@testset "basic" begin
mgr = K8sClusterManager(1; image="julia:1")
@test mgr.np == 1
@test mgr.pod_name == gethostname()
@test mgr.worker_prefix == "$(gethostname())-worker"
@test mgr.image == "julia:1"
@test mgr.cpu == string(K8sClusterManagers.DEFAULT_WORKER_CPU)
@test mgr.memory == K8sClusterManagers.DEFAULT_WORKER_MEMORY
@test mgr.pending_timeout == 180
@test mgr.configure === identity
end

@testset "worker_prefix" begin
kwargs = (; image="julia:1")
mgr = K8sClusterManager(1; kwargs...)
@test mgr.worker_prefix == "$(gethostname())-worker"

mgr = K8sClusterManager(1; worker_prefix="wkr-group", kwargs...)
@test mgr.worker_prefix == "wkr-group"

mgr = K8sClusterManager(1; manager_pod_name="pod", kwargs...)
@test mgr.worker_prefix == "pod-worker"

mgr = K8sClusterManager(1; worker_prefix="wkr-group", manager_pod_name="pod", kwargs...)
@test mgr.worker_prefix == "wkr-group"
end

@testset "pods not found" begin
try
K8sClusterManager(1)
Expand Down
8 changes: 4 additions & 4 deletions test/pod.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ end
end

@testset "worker_pod_spec" begin
kwargs = (; manager_name="test", image="julia", cmd=`julia`)
kwargs = (; worker_prefix="test-wkr", image="julia", cmd=`julia`)
pod = K8sClusterManagers.worker_pod_spec(; kwargs...)

@test keys(pod) == Set(["apiVersion", "kind", "metadata", "spec"])
@test pod["apiVersion"] == "v1"
@test pod["kind"] == "Pod"

@test keys(pod["metadata"]) == Set(["generateName", "labels"])
@test pod["metadata"]["generateName"] == "test-worker-"
@test keys(pod["metadata"]["labels"]) == Set(["manager"])
@test pod["metadata"]["labels"]["manager"] == "test"
@test pod["metadata"]["generateName"] == "test-wkr-"
@test keys(pod["metadata"]["labels"]) == Set(["worker-prefix"])
@test pod["metadata"]["labels"]["worker-prefix"] == "test-wkr"

@test pod["spec"]["restartPolicy"] == "Never"
@test length(pod["spec"]["containers"]) == 1
Expand Down

0 comments on commit e8a90b3

Please sign in to comment.