From bb72addeed22386b4264d526fb5b4188807539b8 Mon Sep 17 00:00:00 2001 From: Hector Perez Date: Wed, 2 Aug 2023 22:35:33 -0400 Subject: [PATCH] Priority as number (#90) --------- Co-authored-by: Stefan Krastanov Co-authored-by: Stefan Krastanov --- docs/src/api.md | 2 +- src/base.jl | 2 +- src/deprecated.jl | 2 +- src/events.jl | 8 ++--- src/operators.jl | 2 +- src/processes.jl | 6 ++-- src/resources/containers.jl | 46 +++++++++++++----------- src/resources/stores.jl | 47 ++++++++++++------------ src/simulations.jl | 4 +-- src/utils/time.jl | 2 +- test/runtests.jl | 1 + test/test_resource_priorities.jl | 59 +++++++++++++++++++++++++++++++ test/test_resources_containers.jl | 2 +- 13 files changed, 125 insertions(+), 58 deletions(-) create mode 100644 test/test_resource_priorities.jl diff --git a/docs/src/api.md b/docs/src/api.md index 6c1f145a..11a8741e 100755 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -6,6 +6,6 @@ Private = false ``` ```@docs -unlock(res::Container; priority::Int=0) +unlock(res::Resource; priority::Number=0) take!(sto::Store, filter::Function=get_any_item; priority::Int=0) ``` \ No newline at end of file diff --git a/src/base.jl b/src/base.jl index d61e5953..a319acf3 100755 --- a/src/base.jl +++ b/src/base.jl @@ -63,7 +63,7 @@ function remove_callback(cb::Function, ev::AbstractEvent) i != 0 && deleteat!(ev.bev.callbacks, i) end -function schedule(ev::AbstractEvent, delay::Number=zero(Float64); priority::Int=0, value::Any=nothing) +function schedule(ev::AbstractEvent, delay::Number=0; priority::Number=0, value::Any=nothing) state(ev) === processed && throw(EventProcessed(ev)) env = environment(ev) bev = ev.bev diff --git a/src/deprecated.jl b/src/deprecated.jl index 9c049119..0b5a46dc 100755 --- a/src/deprecated.jl +++ b/src/deprecated.jl @@ -1,4 +1,4 @@ -Base.@deprecate put(args...) put!(args...) +Base.@deprecate put(args...; kwargs...) put!(args...; kwargs...) #Base.@deprecate request(args...; kwargs...) lock(args...; kwargs...) # Not the same: `request` needs to be yielded, while `lock` yields itself #Base.@deprecate tryrequest(args...; kwargs...) trylock(args...; kwargs...) # Not the same: `request` needs to be yielded, while `lock` yields itself Base.@deprecate release(args...; kwargs...) unlock(args...; kwargs...) diff --git a/src/events.jl b/src/events.jl index f0631575..eed5e93d 100755 --- a/src/events.jl +++ b/src/events.jl @@ -5,12 +5,12 @@ struct Event <: AbstractEvent end end -function succeed(ev::Event; priority::Int=0, value::Any=nothing) :: Event +function succeed(ev::Event; priority::Number=0, value::Any=nothing) :: Event state(ev) !== idle && throw(EventNotIdle(ev)) schedule(ev; priority=priority, value=value) end -function fail(ev::Event, exc::Exception; priority::Int=0) :: Event +function fail(ev::Event, exc::Exception; priority::Number=0) :: Event succeed(ev; priority=priority, value=exc) end @@ -21,10 +21,10 @@ struct Timeout <: AbstractEvent end end -function timeout(env::Environment, delay::Number=0; priority::Int=0, value::Any=nothing) +function timeout(env::Environment, delay::Number=0; priority::Number=0, value::Any=nothing) schedule(Timeout(env), delay; priority=priority, value=value) end -function run(env::Environment, until::Number=typemax(Float64)) +function run(env::Environment, until::Number=Inf) run(env, timeout(env, until-now(env))) end diff --git a/src/operators.jl b/src/operators.jl index 24623b6e..04b7c82c 100755 --- a/src/operators.jl +++ b/src/operators.jl @@ -32,7 +32,7 @@ function check(ev::AbstractEvent, op::Operator, event_state_values::Dict{Abstrac end elseif state(op) === scheduled if isa(val, Exception) - schedule(op; priority=typemax(Int), value=val) + schedule(op; priority=Inf, value=val) else event_state_values[ev] = StateValue(state(ev), val) end diff --git a/src/processes.jl b/src/processes.jl index 16699865..20ab5b19 100755 --- a/src/processes.jl +++ b/src/processes.jl @@ -57,9 +57,9 @@ end function interrupt(proc::Process, cause::Any=nothing) env = environment(proc) if proc.fsmi._state !== 0xff - proc.target isa Initialize && schedule(proc.target; priority=typemax(Int)) - target = schedule(Interrupt(env); priority=typemax(Int), value=InterruptException(active_process(env), cause)) + proc.target isa Initialize && schedule(proc.target; priority=Inf) + target = schedule(Interrupt(env); priority=Inf, value=InterruptException(active_process(env), cause)) @callback execute_interrupt(target, proc) end - timeout(env; priority=typemax(Int)) + timeout(env; priority=Inf) end diff --git a/src/resources/containers.jl b/src/resources/containers.jl index 56aecc45..751045a5 100755 --- a/src/resources/containers.jl +++ b/src/resources/containers.jl @@ -1,15 +1,15 @@ -struct ContainerKey{N<:Real} <: ResourceKey - priority :: Int +struct ContainerKey{N<:Real, T<:Number} <: ResourceKey id :: UInt amount :: N + priority :: T end """ - Container{N}(env::Environment, capacity::N=one(N); level::N=zero(N)) + Container{N<:Real, T<:Number}(env::Environment, capacity::N=one(N); level::N=zero(N)) A "Container" resource object, storing up to `capacity` units of a resource (of type `N`). -There is a `Resource` alias for `Container{Int}`. +There is a `Resource` alias for `Container{Int, Int}`. `Resource()` with default capacity of `1` is very similar to a typical lock. The [`request`](@ref) and [`unlock`](@ref) functions are a convenient way to interact with such a "lock", @@ -19,27 +19,31 @@ See [`Store`](@ref) for a more channel-like resource. Think of `Resource` and `Container` as locks and of `Store` as channels. They block only if empty (on taking) or full (on storing). """ -mutable struct Container{N<:Real} <: AbstractResource +mutable struct Container{N<:Real, T<:Number} <: AbstractResource env :: Environment capacity :: N level :: N seid :: UInt - put_queue :: DataStructures.PriorityQueue{Put, ContainerKey{N}} - get_queue :: DataStructures.PriorityQueue{Get, ContainerKey{N}} - function Container{N}(env::Environment, capacity::N=one(N); level::N=zero(N)) where {N<:Real} - new(env, capacity, level, zero(UInt), DataStructures.PriorityQueue{Put, ContainerKey{N}}(), DataStructures.PriorityQueue{Get, ContainerKey{N}}()) + put_queue :: DataStructures.PriorityQueue{Put, ContainerKey{N, T}} + get_queue :: DataStructures.PriorityQueue{Get, ContainerKey{N, T}} + function Container{N, T}(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real, T<:Number} + new(env, capacity, N(level), zero(UInt), DataStructures.PriorityQueue{Put, ContainerKey{N, T}}(), DataStructures.PriorityQueue{Get, ContainerKey{N, T}}()) end end -function Container(env::Environment, capacity::N=one(N); level::N=zero(N)) where {N<:Real} - Container{N}(env, capacity, level=level) +function Container(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real} + Container{N, Int}(env, capacity; level=N(level)) end -const Resource = Container{Int} +function Container{T}(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real, T<:Number} + Container{N, T}(env, capacity; level=N(level)) +end + +const Resource = Container{Int, Int} -function put!(con::Container{N}, amount::N; priority::Int=0) where N<:Real +function put!(con::Container{N, T}, amount::N; priority=zero(T)) where {N<:Real, T<:Number} put_ev = Put(con.env) - con.put_queue[put_ev] = ContainerKey(priority, con.seid+=one(UInt), amount) + con.put_queue[put_ev] = ContainerKey{N,T}(con.seid+=one(UInt), amount, T(priority)) @callback trigger_get(put_ev, con) trigger_put(put_ev, con) put_ev @@ -52,7 +56,7 @@ Locks the Container (or Resources) and return the lock event. If the capacity of the Container is greater than 1, multiple requests can be made before blocking occurs. """ -request(res::Container; priority::Int=0) = put!(res, 1; priority=priority) +request(res::Resource; priority=0) = put!(res, 1; priority) """ tryrequest(res::Container) @@ -76,14 +80,14 @@ julia> tryrequest(res) false ``` """ -function tryrequest(res::Container; priority::Int=0) +function tryrequest(res::Container; priority=0) islocked(res) && return false # TODO check priority request(res; priority) end -function get(con::Container{N}, amount::N; priority::Int=0) where N<:Real +function get(con::Container{N, T}, amount::N; priority=zero(T)) where {N<:Real, T<:Number} get_ev = Get(con.env) - con.get_queue[get_ev] = ContainerKey(priority, con.seid+=one(UInt), amount) + con.get_queue[get_ev] = ContainerKey(con.seid+=one(UInt), amount, T(priority)) @callback trigger_put(get_ev, con) trigger_get(get_ev, con) get_ev @@ -94,16 +98,16 @@ end Unlocks the Container and return the unlock event. """ -unlock(res::Container; priority::Int=0) = get(res, 1; priority=priority) +unlock(res::Resource; priority::Number=0) = get(res, 1; priority=priority) -function do_put(con::Container{N}, put_ev::Put, key::ContainerKey{N}) where N<:Real +function do_put(con::Container{N, T}, put_ev::Put, key::ContainerKey{N, T}) where {N<:Real, T<:Number} con.level + key.amount > con.capacity && return false schedule(put_ev) con.level += key.amount true end -function do_get(con::Container{N}, get_ev::Get, key::ContainerKey{N}) where N<:Real +function do_get(con::Container{N, T}, get_ev::Get, key::ContainerKey{N, T}) where {N<:Real, T<:Number} con.level - key.amount < zero(N) && return false schedule(get_ev) con.level -= key.amount diff --git a/src/resources/stores.jl b/src/resources/stores.jl index f8328f0c..a396ef23 100755 --- a/src/resources/stores.jl +++ b/src/resources/stores.jl @@ -1,63 +1,66 @@ -struct StorePutKey{T} <: ResourceKey - priority :: Int +struct StorePutKey{N, T<:Number} <: ResourceKey id :: UInt - item :: T - StorePutKey{T}(priority, id, item) where T = new(priority, id, item) + item :: N + priority :: T end -struct StoreGetKey <: ResourceKey - priority :: Int +struct StoreGetKey{T<:Number} <: ResourceKey id :: UInt filter :: Function + priority :: T end """ - Store{T}(env::Environment; capacity::UInt=typemax(UInt)) + Store{N, T<:Number}(env::Environment; capacity::UInt=typemax(UInt)) -A store is a resource that can hold a number of items of type `T`. It is similar to a `Base.Channel` with a finite capacity ([`put!`](@ref) blocks after reaching capacity). +A store is a resource that can hold a number of items of type `N`. It is similar to a `Base.Channel` with a finite capacity ([`put!`](@ref) blocks after reaching capacity). The [`put!`](@ref) and [`take!`](@ref) functions are a convenient way to interact with such a "channel" in a way mostly compatible with other discrete event and concurrency frameworks. See [`Container`](@ref) for a more lock-like resource. Think of `Resource` and `Container` as locks and of `Store` as channels. They block only if empty (on taking) or full (on storing). """ -mutable struct Store{T} <: AbstractResource +mutable struct Store{N, T<:Number} <: AbstractResource env :: Environment capacity :: UInt load :: UInt - items :: Dict{T, UInt} + items :: Dict{N, UInt} seid :: UInt - put_queue :: DataStructures.PriorityQueue{Put, StorePutKey{T}} - get_queue :: DataStructures.PriorityQueue{Get, StoreGetKey} - function Store{T}(env::Environment; capacity=typemax(UInt)) where {T} - new(env, UInt(capacity), zero(UInt), Dict{T, UInt}(), zero(UInt), DataStructures.PriorityQueue{Put, StorePutKey{T}}(), DataStructures.PriorityQueue{Get, StoreGetKey}()) + put_queue :: DataStructures.PriorityQueue{Put, StorePutKey{N, T}} + get_queue :: DataStructures.PriorityQueue{Get, StoreGetKey{T}} + function Store{N, T}(env::Environment; capacity=typemax(UInt)) where {N, T<:Number} + new(env, UInt(capacity), zero(UInt), Dict{N, UInt}(), zero(UInt), DataStructures.PriorityQueue{Put, StorePutKey{N, T}}(), DataStructures.PriorityQueue{Get, StoreGetKey{T}}()) end end +function Store{N}(env::Environment; capacity=typemax(UInt)) where {N} + Store{N, Int}(env; capacity=UInt(capacity)) +end + """ put!(sto::Store, item::T) Put an item into the store. Returns the put event, blocking if the store is full. """ -function put!(sto::Store{T}, item::T; priority::Int=0) where T +function put!(sto::Store{N, T}, item::N; priority=zero(T)) where {N, T<:Number} put_ev = Put(sto.env) - sto.put_queue[put_ev] = StorePutKey{T}(priority, sto.seid+=one(UInt), item) + sto.put_queue[put_ev] = StorePutKey{N, T}(sto.seid+=one(UInt), item, T(priority)) @callback trigger_get(put_ev, sto) trigger_put(put_ev, sto) put_ev end -get_any_item(::T) where T = true +get_any_item(::N) where N = true -function get(sto::Store{T}, filter::Function=get_any_item; priority::Int=0) where T +function get(sto::Store{N, T}, filter::Function=get_any_item; priority=zero(T)) where {N, T<:Number} get_ev = Get(sto.env) - sto.get_queue[get_ev] = StoreGetKey(priority, sto.seid+=one(UInt), filter) + sto.get_queue[get_ev] = StoreGetKey(sto.seid+=one(UInt), filter, T(priority)) @callback trigger_put(get_ev, sto) trigger_get(get_ev, sto) get_ev end -function do_put(sto::Store{T}, put_ev::Put, key::StorePutKey{T}) where {T} +function do_put(sto::Store{N, T}, put_ev::Put, key::StorePutKey{N, T}) where {N, T<:Number} if sto.load < sto.capacity sto.load += one(UInt) sto.items[key.item] = get(sto.items, key.item, zero(UInt)) + one(UInt) @@ -66,7 +69,7 @@ function do_put(sto::Store{T}, put_ev::Put, key::StorePutKey{T}) where {T} false end -function do_get(sto::Store{T}, get_ev::Get, key::StoreGetKey) where {T} +function do_get(sto::Store{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number} for (item, number) in sto.items if key.filter(item) sto.load -= one(UInt) @@ -126,4 +129,4 @@ tryrequest(::Store) = error("There is no well defined way to \"request\" a Store An alias for `get(::Store)` for easier interoperability with the `Base.Channel` interface. Blocks if the store is empty. """ -take!(sto::Store, filter::Function=get_any_item; priority::Int=0) = get(sto, filter; priority) +take!(sto::Store, filter::Function=get_any_item; priority=0) = get(sto, filter; priority) diff --git a/src/simulations.jl b/src/simulations.jl index b26b2c05..2e01948d 100755 --- a/src/simulations.jl +++ b/src/simulations.jl @@ -8,9 +8,9 @@ end struct EmptySchedule <: Exception end -struct EventKey +struct EventKey{N<:Number} time :: Float64 - priority :: Int + priority :: N id :: UInt end diff --git a/src/utils/time.jl b/src/utils/time.jl index d32c2509..4c3cd64e 100755 --- a/src/utils/time.jl +++ b/src/utils/time.jl @@ -6,7 +6,7 @@ function run(env::Environment, until::DateTime) run(env, Dates.datetime2epochms(until)) end -function timeout(env::Environment, delay::Period; priority::Int=0, value::Any=nothing) +function timeout(env::Environment, delay::Period; priority::Number=0, value::Any=nothing) time = now(env) del = Dates.datetime2epochms(Dates.epochms2datetime(time)+delay)-time timeout(env, del; priority=priority, value=value) diff --git a/test/runtests.jl b/test/runtests.jl index 2c53e56a..2bb15ca6 100755 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -35,6 +35,7 @@ println("Starting tests with $(Threads.nthreads()) threads out of `Sys.CPU_THREA @doset "resources_containers_deprecated" @doset "resources_stores" @doset "resources_stores_deprecated" +@doset "resource_priorities" @doset "utils_time" VERSION >= v"1.9" && @doset "doctests" VERSION >= v"1.9" && @doset "aqua" diff --git a/test/test_resource_priorities.jl b/test/test_resource_priorities.jl new file mode 100644 index 00000000..5b481ad0 --- /dev/null +++ b/test/test_resource_priorities.jl @@ -0,0 +1,59 @@ +using ConcurrentSim + +## Containers with Float64 priority +sim = Simulation() +con = Container{Float64}(sim,0) +put!(con, 1; priority = 10) +put!(con, 1; priority = 9.1) +put!(con, 1; priority = UInt(9)) +put!(con, 1; priority = BigInt(8)) +put!(con, 1; priority = BigFloat(7.1)) +put!(con, 1; priority = 7) +put!(con, 1; priority = 6.1) +put!(con, 1; priority = BigInt(5)) +put!(con, 1; priority = BigFloat(-Inf)) +@show keys(con.put_queue) + +## Containers with Int64 priority +sim = Simulation() +con = Container{Int}(sim,0) +put!(con, 1; priority = 10) +put!(con, 1; priority = 9.0) +put!(con, 1; priority = UInt(9)) +put!(con, 1; priority = BigInt(8)) +put!(con, 1; priority = BigFloat(7.0)) +put!(con, 1; priority = 7) +put!(con, 1; priority = 6.0) +put!(con, 1; priority = BigInt(5)) +put!(con, 1; priority = typemin(Int)) +@show keys(con.put_queue) + +## Stores with Float64 priority +sim = Simulation() +sto = Store{Symbol,Float64}(sim; capacity = UInt(5)) +put!(sto, :a; priority = 10) +put!(sto, :a; priority = 9.1) +put!(sto, :a; priority = UInt(9)) +put!(sto, :a; priority = BigInt(8)) +put!(sto, :a; priority = BigFloat(7.1)) +put!(sto, :b; priority = 7) +put!(sto, :b; priority = 6.1) +put!(sto, :b; priority = BigInt(5)) +put!(sto, :b; priority = BigFloat(-Inf)) +@show sto.items +@show keys(sto.put_queue) + +## Stores with Int64 priority +sim = Simulation() +sto = Store{Symbol,Int}(sim; capacity = UInt(5)) +put!(sto, :a; priority = 10) +put!(sto, :a; priority = 9.0) +put!(sto, :a; priority = UInt(9)) +put!(sto, :a; priority = BigInt(8)) +put!(sto, :a; priority = BigFloat(7.0)) +put!(sto, :b; priority = 7) +put!(sto, :b; priority = 6.0) +put!(sto, :b; priority = BigInt(5)) +put!(sto, :b; priority = typemin(UInt)) +@show sto.items +@show keys(sto.put_queue) \ No newline at end of file diff --git a/test/test_resources_containers.jl b/test/test_resources_containers.jl index f95c3af4..840c1360 100755 --- a/test/test_resources_containers.jl +++ b/test/test_resources_containers.jl @@ -1,7 +1,7 @@ using ConcurrentSim using ResumableFunctions -@resumable function client(sim::Simulation, res::Resource, i::Int, priority::Int) +@resumable function client(sim::Simulation, res::Resource, i::Int, priority::Number) println("$(now(sim)), client $i is waiting") @yield request(res, priority=priority) println("$(now(sim)), client $i is being served")