Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable parallelization #38

Merged
merged 23 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .JuliaFormatter.toml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
style = "blue"
pipe_to_function_call = false
7 changes: 6 additions & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ concurrency:
cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }}
jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ matrix.num_threads }} threads - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
Expand All @@ -26,6 +26,9 @@ jobs:
- ubuntu-latest
arch:
- x64
num_threads:
- 1
- 2
steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@v1
Expand All @@ -35,6 +38,8 @@ jobs:
- uses: julia-actions/cache@v1
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
env:
JULIA_NUM_THREADS: ${{ matrix.num_threads }}
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v2
with:
Expand Down
6 changes: 5 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
name = "Pathfinder"
uuid = "b1d3bc72-d0e7-4279-b92f-7fa5d6d2d454"
authors = ["Seth Axen <[email protected]> and contributors"]
version = "0.3.5"
version = "0.3.6"

[deps]
AbstractDifferentiation = "c29ec348-61ec-40c8-8164-b8c60e9d9f3d"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
Folds = "41a02a25-b8f0-4f67-bc48-60067656b558"
ForwardDiff = "f6369f11-7733-5829-9624-2563aa707210"
GalacticOptim = "a75be94c-b780-496d-a8a9-0878b188d577"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
Expand All @@ -17,10 +18,12 @@ Requires = "ae029012-a4dd-5104-9daa-d747884805df"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
StatsBase = "2913bbd2-ae8a-5f71-8c99-4fb6c76f3a91"
StatsFuns = "4c63d2b9-4356-54db-8cca-17b64c39e42c"
Transducers = "28d57a85-8fef-5791-bfe6-a80928e7c999"

[compat]
AbstractDifferentiation = "0.4"
Distributions = "0.25"
Folds = "0.2"
ForwardDiff = "0.10"
GalacticOptim = "2"
Optim = "1.4"
Expand All @@ -29,6 +32,7 @@ PSIS = "0.2, 0.3, 0.4"
Requires = "1"
StatsBase = "0.33"
StatsFuns = "0.9"
Transducers = "0.4.5"
julia = "1.6"

[extras]
Expand Down
3 changes: 3 additions & 0 deletions src/Pathfinder.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Pathfinder

using AbstractDifferentiation: AD
using Distributions: Distributions
using Folds: Folds
# ensure that ForwardDiff is conditionally loaded by GalacticOptim
using ForwardDiff: ForwardDiff
using LinearAlgebra
Expand All @@ -14,6 +15,7 @@ using Requires: Requires
using Statistics: Statistics
using StatsBase: StatsBase
using StatsFuns: log2π
using Transducers: Transducers

export pathfinder, multipathfinder

Expand All @@ -25,6 +27,7 @@ const DEFAULT_OPTIMIZER = Optim.LBFGS(;
m=DEFAULT_HISTORY_LENGTH, linesearch=DEFAULT_LINE_SEARCH
)

include("transducers.jl")
include("woodbury.jl")
include("optimize.jl")
include("inverse_hessian.jl")
Expand Down
16 changes: 11 additions & 5 deletions src/elbo.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
function maximize_elbo(rng, logp, dists, ndraws)
elbo_ϕ_logqϕ = map(dists) do dist
elbo_and_samples(rng, logp, dist, ndraws)
function maximize_elbo(rng, logp, dists, ndraws, executor)
elbo_ϕ_logqϕ1 = elbo_and_samples(rng, logp, dists[begin], ndraws)
elbo_ϕ_logqϕ = similar(dists, typeof(elbo_ϕ_logqϕ1))
elbo_ϕ_logqϕ[begin] = elbo_ϕ_logqϕ1
@views Folds.map!(
elbo_ϕ_logqϕ[(begin + 1):end], dists[(begin + 1):end], executor
) do dist
return elbo_and_samples(rng, logp, dist, ndraws)
end
lopt = argmax(first.(elbo_ϕ_logqϕ))
_, lopt = _findmax(elbo_ϕ_logqϕ |> Transducers.Map(first))
return (lopt, elbo_ϕ_logqϕ[lopt]...)
end

function elbo_and_samples(rng, logp, dist, ndraws)
ϕ, logqϕ = rand_and_logpdf(rng, dist, ndraws)
logpϕ = logp.(eachcol(ϕ))
logpϕ = similar(logqϕ)
logpϕ .= logp.(eachcol(ϕ))
elbo = elbo_from_logpdfs(logpϕ, logqϕ)
return elbo, ϕ, logqϕ
end
Expand Down
32 changes: 24 additions & 8 deletions src/multipath.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ resulting draws better approximate draws from the target distribution ``p`` inst
- `ad_backend=AD.ForwardDiffBackend()`: AbstractDifferentiation.jl AD backend.
- `ndraws_per_run::Int=5`: The number of draws to take for each component before resampling.
- `importance::Bool=true`: Perform Pareto smoothed importance resampling of draws.
- `rng::AbstractRNG=Random.GLOBAL_RNG`: Pseudorandom number generator. It is recommended to
use a parallelization-friendly PRNG like the default PRNG on Julia 1.7 and up.
- `executor::Transducers.Executor`: Transducers.jl executor that determines if and how
to run the single-path runs in parallel. If `rng` is known to be thread-safe, the
default is `Transducers.PreferParallel(; basesize=1)` (parallel executation, defaulting
to multi-threading). Otherwise, it is `Transducers.SequentialEx()` (no parallelization).
- `executor_per_run::Transducers.Executor=Transducers.SequentialEx()`: Transducers.jl
executor used within each run to parallelize PRNG calls. Defaults to no parallelization.
See [`pathfinder`](@ref) for a description.
- `kwargs...` : Remaining keywords are forwarded to [`pathfinder`](@ref).

# Returns
Expand Down Expand Up @@ -80,7 +89,9 @@ function multipathfinder(
θ₀s,
ndraws;
ndraws_per_run::Int=5,
rng::Random.AbstractRNG=Random.default_rng(),
rng::Random.AbstractRNG=Random.GLOBAL_RNG,
executor::Transducers.Executor=_default_executor(rng; basesize=1),
executor_per_run=Transducers.SequentialEx(),
importance::Bool=true,
kwargs...,
)
Expand All @@ -93,18 +104,23 @@ function multipathfinder(
logp(x) = -optim_fun.f(x, nothing)

# run pathfinder independently from each starting point
# TODO: allow to be parallelized
res = map(θ₀s) do θ₀
return pathfinder(optim_fun, θ₀, ndraws_per_run; rng, kwargs...)
trans = Transducers.Map() do θ₀
return pathfinder(
optim_fun, θ₀, ndraws_per_run; rng, executor=executor_per_run, kwargs...
)
end
qs = reduce(vcat, first.(res))
ϕs = reduce(hcat, getindex.(res, 2))
iter_sp = Transducers.withprogress(θ₀s; interval=1e-3) |> trans
res = Folds.collect(iter_sp, executor)
qs = res |> Transducers.Map(first) |> collect
ϕs = reduce(hcat, res |> Transducers.Map(x -> x[2]))

# draw samples from augmented mixture model
inds = axes(ϕs, 2)
sample_inds = if importance
logqϕs = reduce(vcat, last.(res))
log_ratios = map(((ϕ, logqϕ),) -> logp(ϕ) - logqϕ, zip(eachcol(ϕs), logqϕs))
logqϕs = res |> Transducers.MapCat(last) |> collect
iter_logp = eachcol(ϕs) |> Transducers.Map(logp)
logpϕs = Folds.collect(iter_logp, executor)
log_ratios = logpϕs - logqϕs
resample(rng, inds, log_ratios, ndraws)
else
resample(rng, inds, ndraws)
Expand Down
7 changes: 5 additions & 2 deletions src/mvnormal.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ point are returned.
"""
function fit_mvnormals(θs, ∇logpθs; kwargs...)
Σs = lbfgs_inverse_hessians(θs, ∇logpθs; kwargs...)
trans = Transducers.MapSplat() do Σ, ∇logpθ, θ
μ = muladd(Σ, ∇logpθ, θ)
return Distributions.MvNormal(μ, Σ)
end
l = length(Σs)
μs = @views map(muladd, Σs, ∇logpθs[1:l], θs[1:l])
return Distributions.MvNormal.(μs, Σs)
return @views(zip(Σs, ∇logpθs[1:l], θs[1:l])) |> trans |> collect
end

# faster than computing `logpdf` and `rand` independently
Expand Down
16 changes: 8 additions & 8 deletions src/optimize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,10 @@ function build_optim_problem(optim_fun, x₀; kwargs...)
return GalacticOptim.OptimizationProblem(optim_fun, x₀, nothing; kwargs...)
end

function optimize_with_trace(prob, optimizer)
function optimize_with_trace(prob, optimizer, executor=Transducers.SequentialEx())
u0 = prob.u0
fun = prob.f
grad! = fun.grad
function ∇f(x)
∇fx = similar(x)
grad!(∇fx, x, nothing)
rmul!(∇fx, -1)
return ∇fx
end
# caches for the trace of x and f(x)
xs = typeof(u0)[]
fxs = typeof(fun.f(u0, nothing))[]
Expand All @@ -52,6 +46,12 @@ function optimize_with_trace(prob, optimizer)
# NOTE: GalacticOptim doesn't have an interface for accessing the gradient trace,
# so we need to recompute it ourselves
# see https://github.com/SciML/GalacticOptim.jl/issues/149
∇fxs = ∇f.(xs)
∇fxs = [similar(u0) for _ in xs]
trans = Transducers.Map() do (∇fx, x)
grad!(∇fx, x, nothing)
rmul!(∇fx, -1)
return nothing
end
Transducers.transduce(trans, (_...,) -> nothing, nothing, zip(∇fxs, xs), executor)
return xs, fxs, ∇fxs
end
20 changes: 15 additions & 5 deletions src/singlepath.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ constructed using at most the previous `history_length` steps.
# Keywords
- `ad_backend=AD.ForwardDiffBackend()`: AbstractDifferentiation.jl AD backend.
- `rng::Random.AbstractRNG`: The random number generator to be used for drawing samples
- `executor::Transducers.Executor=Transducers.SequentialEx()`: Transducers.jl executor that
determines if and how to perform ELBO computation in parallel. The default
(`SequentialEx()`) performs no parallelization. If `rng` is known to be thread-safe, and
the log-density function is known to have no internal state, then
`Transducers.PreferParallel()` may be used to parallelize log-density evaluation.
This is generally only faster for expensive log density functions.
- `optimizer`: Optimizer to be used for constructing trajectory. Can be any optimizer
compatible with GalacticOptim, so long as it supports callbacks. Defaults to
`Optim.LBFGS(; m=$DEFAULT_HISTORY_LENGTH, linesearch=LineSearches.MoreThuente())`. See
Expand Down Expand Up @@ -65,14 +71,17 @@ function pathfinder(
optim_fun::GalacticOptim.OptimizationFunction,
θ₀,
ndraws;
rng::Random.AbstractRNG=Random.default_rng(),
rng::Random.AbstractRNG=Random.GLOBAL_RNG,
executor::Transducers.Executor=Transducers.SequentialEx(),
optimizer=DEFAULT_OPTIMIZER,
history_length::Int=optimizer isa Optim.LBFGS ? optimizer.m : DEFAULT_HISTORY_LENGTH,
ndraws_elbo::Int=5,
kwargs...,
)
optim_prob = build_optim_problem(optim_fun, θ₀; kwargs...)
return pathfinder(optim_prob, ndraws; rng, optimizer, history_length, ndraws_elbo)
return pathfinder(
optim_prob, ndraws; rng, executor, optimizer, history_length, ndraws_elbo
)
end

"""
Expand All @@ -91,7 +100,8 @@ See [`pathfinder`](@ref) for a description of remaining arguments.
function pathfinder(
optim_prob::GalacticOptim.OptimizationProblem,
ndraws;
rng::Random.AbstractRNG=Random.default_rng(),
rng::Random.AbstractRNG=Random.GLOBAL_RNG,
executor::Transducers.Executor=Transducers.SequentialEx(),
optimizer=DEFAULT_OPTIMIZER,
history_length::Int=optimizer isa Optim.LBFGS ? optimizer.m : DEFAULT_HISTORY_LENGTH,
ndraws_elbo::Int=5,
Expand All @@ -101,15 +111,15 @@ function pathfinder(
end
logp(x) = -optim_prob.f.f(x, nothing)
# compute trajectory
θs, logpθs, ∇logpθs = optimize_with_trace(optim_prob, optimizer)
θs, logpθs, ∇logpθs = optimize_with_trace(optim_prob, optimizer, executor)
L = length(θs) - 1
@assert L + 1 == length(logpθs) == length(∇logpθs)

# fit mv-normal distributions to trajectory
qs = fit_mvnormals(θs, ∇logpθs; history_length)

# find ELBO-maximizing distribution
lopt, elbo, ϕ, logqϕ = maximize_elbo(rng, logp, qs[2:end], ndraws_elbo)
lopt, elbo, ϕ, logqϕ = maximize_elbo(rng, logp, qs[2:end], ndraws_elbo, executor)
@info "Optimized for $L iterations. Maximum ELBO of $(round(elbo; digits=2)) reached at iteration $lopt."

# get parameters of ELBO-maximizing distribution
Expand Down
27 changes: 27 additions & 0 deletions src/transducers.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# utility functions to work with Transducers.jl

_is_thread_safe_rng(rng) = false
if VERSION ≥ v"1.7.0"
_is_thread_safe_rng(rng::Random.TaskLocalRNG) = true
_is_thread_safe_rng(rng::typeof(Random.GLOBAL_RNG)) = true
end

function _default_executor(rng; kwargs...)
if _is_thread_safe_rng(rng)
return Transducers.PreferParallel(; kwargs...)
else
return Transducers.SequentialEx()
end
end

# transducer-friendly findmax
function _findmax(x)
return Transducers.foldxl(x |> Transducers.Enumerate(); init=missing) do xmax_imax, i_xi
xmax_imax === missing && return reverse(i_xi)
return last(i_xi) > first(xmax_imax) ? reverse(i_xi) : xmax_imax
end
end

# WARNING: Type piracy!
# https://github.com/JuliaFolds/Transducers.jl/issues/521
Base.size(x::Transducers.ProgressLoggingFoldable) = size(x.foldable)
26 changes: 22 additions & 4 deletions test/elbo.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ using Distributions
using Pathfinder
using Random
using Test
using Transducers

@testset "ELBO estimation" begin
@testset "elbo_and_samples" begin
Expand Down Expand Up @@ -29,9 +30,26 @@ using Test
logp(x) = logpdf(target_dist, x[1])
σs = [1e-3, 0.05, σ_target, 1.0, 1.1, 1.2, 5.0, 10.0]
dists = Normal.(0, σs)
rng = MersenneTwister(42)
lopt, elbo, ϕ, logqϕ = @inferred Pathfinder.maximize_elbo(rng, logp, dists, 100)
@test lopt == 3
@test elbo ≈ 0
if VERSION ≥ v"1.7.0"
executors = [SequentialEx(), ThreadedEx()]
else
executors = [SequentialEx()]
end
@testset "$executor" for executor in executors
rng = Random.seed!(42)
lopt, elbo, ϕ, logqϕ = @inferred Pathfinder.maximize_elbo(
rng, logp, dists, 100, executor
)
@test lopt == 3
@test elbo ≈ 0
rng = Random.seed!(42)
lopt2, elbo2, ϕ2, logqϕ2 = Pathfinder.maximize_elbo(
rng, logp, dists, 100, executor
)
@test lopt2 == lopt
@test elbo2 == elbo
@test ϕ2 ≈ ϕ
@test logqϕ ≈ logqϕ2
end
end
end
Loading