Skip to content

Commit

Permalink
Priority as number (#90)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Stefan Krastanov <[email protected]>
Co-authored-by: Stefan Krastanov <[email protected]>
  • Loading branch information
3 people authored Aug 3, 2023
1 parent c62a6e5 commit bb72add
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 58 deletions.
2 changes: 1 addition & 1 deletion docs/src/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ Private = false
```

```@docs
unlock(res::Container; priority::Int=0)
unlock(res::Resource; priority::Number=0)
take!(sto::Store, filter::Function=get_any_item; priority::Int=0)
```
2 changes: 1 addition & 1 deletion src/base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ function remove_callback(cb::Function, ev::AbstractEvent)
i != 0 && deleteat!(ev.bev.callbacks, i)
end

function schedule(ev::AbstractEvent, delay::Number=zero(Float64); priority::Int=0, value::Any=nothing)
function schedule(ev::AbstractEvent, delay::Number=0; priority::Number=0, value::Any=nothing)
state(ev) === processed && throw(EventProcessed(ev))
env = environment(ev)
bev = ev.bev
Expand Down
2 changes: 1 addition & 1 deletion src/deprecated.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Base.@deprecate put(args...) put!(args...)
Base.@deprecate put(args...; kwargs...) put!(args...; kwargs...)
#Base.@deprecate request(args...; kwargs...) lock(args...; kwargs...) # Not the same: `request` needs to be yielded, while `lock` yields itself
#Base.@deprecate tryrequest(args...; kwargs...) trylock(args...; kwargs...) # Not the same: `request` needs to be yielded, while `lock` yields itself
Base.@deprecate release(args...; kwargs...) unlock(args...; kwargs...)
8 changes: 4 additions & 4 deletions src/events.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ struct Event <: AbstractEvent
end
end

function succeed(ev::Event; priority::Int=0, value::Any=nothing) :: Event
function succeed(ev::Event; priority::Number=0, value::Any=nothing) :: Event
state(ev) !== idle && throw(EventNotIdle(ev))
schedule(ev; priority=priority, value=value)
end

function fail(ev::Event, exc::Exception; priority::Int=0) :: Event
function fail(ev::Event, exc::Exception; priority::Number=0) :: Event
succeed(ev; priority=priority, value=exc)
end

Expand All @@ -21,10 +21,10 @@ struct Timeout <: AbstractEvent
end
end

function timeout(env::Environment, delay::Number=0; priority::Int=0, value::Any=nothing)
function timeout(env::Environment, delay::Number=0; priority::Number=0, value::Any=nothing)
schedule(Timeout(env), delay; priority=priority, value=value)
end

function run(env::Environment, until::Number=typemax(Float64))
function run(env::Environment, until::Number=Inf)
run(env, timeout(env, until-now(env)))
end
2 changes: 1 addition & 1 deletion src/operators.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function check(ev::AbstractEvent, op::Operator, event_state_values::Dict{Abstrac
end
elseif state(op) === scheduled
if isa(val, Exception)
schedule(op; priority=typemax(Int), value=val)
schedule(op; priority=Inf, value=val)
else
event_state_values[ev] = StateValue(state(ev), val)
end
Expand Down
6 changes: 3 additions & 3 deletions src/processes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ end
function interrupt(proc::Process, cause::Any=nothing)
env = environment(proc)
if proc.fsmi._state !== 0xff
proc.target isa Initialize && schedule(proc.target; priority=typemax(Int))
target = schedule(Interrupt(env); priority=typemax(Int), value=InterruptException(active_process(env), cause))
proc.target isa Initialize && schedule(proc.target; priority=Inf)
target = schedule(Interrupt(env); priority=Inf, value=InterruptException(active_process(env), cause))
@callback execute_interrupt(target, proc)
end
timeout(env; priority=typemax(Int))
timeout(env; priority=Inf)
end
46 changes: 25 additions & 21 deletions src/resources/containers.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
struct ContainerKey{N<:Real} <: ResourceKey
priority :: Int
struct ContainerKey{N<:Real, T<:Number} <: ResourceKey
id :: UInt
amount :: N
priority :: T
end

"""
Container{N}(env::Environment, capacity::N=one(N); level::N=zero(N))
Container{N<:Real, T<:Number}(env::Environment, capacity::N=one(N); level::N=zero(N))
A "Container" resource object, storing up to `capacity` units of a resource (of type `N`).
There is a `Resource` alias for `Container{Int}`.
There is a `Resource` alias for `Container{Int, Int}`.
`Resource()` with default capacity of `1` is very similar to a typical lock.
The [`request`](@ref) and [`unlock`](@ref) functions are a convenient way to interact with such a "lock",
Expand All @@ -19,27 +19,31 @@ See [`Store`](@ref) for a more channel-like resource.
Think of `Resource` and `Container` as locks and of `Store` as channels. They block only if empty (on taking) or full (on storing).
"""
mutable struct Container{N<:Real} <: AbstractResource
mutable struct Container{N<:Real, T<:Number} <: AbstractResource
env :: Environment
capacity :: N
level :: N
seid :: UInt
put_queue :: DataStructures.PriorityQueue{Put, ContainerKey{N}}
get_queue :: DataStructures.PriorityQueue{Get, ContainerKey{N}}
function Container{N}(env::Environment, capacity::N=one(N); level::N=zero(N)) where {N<:Real}
new(env, capacity, level, zero(UInt), DataStructures.PriorityQueue{Put, ContainerKey{N}}(), DataStructures.PriorityQueue{Get, ContainerKey{N}}())
put_queue :: DataStructures.PriorityQueue{Put, ContainerKey{N, T}}
get_queue :: DataStructures.PriorityQueue{Get, ContainerKey{N, T}}
function Container{N, T}(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real, T<:Number}
new(env, capacity, N(level), zero(UInt), DataStructures.PriorityQueue{Put, ContainerKey{N, T}}(), DataStructures.PriorityQueue{Get, ContainerKey{N, T}}())
end
end

function Container(env::Environment, capacity::N=one(N); level::N=zero(N)) where {N<:Real}
Container{N}(env, capacity, level=level)
function Container(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real}
Container{N, Int}(env, capacity; level=N(level))
end

const Resource = Container{Int}
function Container{T}(env::Environment, capacity::N=one(N); level=zero(N)) where {N<:Real, T<:Number}
Container{N, T}(env, capacity; level=N(level))
end

const Resource = Container{Int, Int}

function put!(con::Container{N}, amount::N; priority::Int=0) where N<:Real
function put!(con::Container{N, T}, amount::N; priority=zero(T)) where {N<:Real, T<:Number}
put_ev = Put(con.env)
con.put_queue[put_ev] = ContainerKey(priority, con.seid+=one(UInt), amount)
con.put_queue[put_ev] = ContainerKey{N,T}(con.seid+=one(UInt), amount, T(priority))
@callback trigger_get(put_ev, con)
trigger_put(put_ev, con)
put_ev
Expand All @@ -52,7 +56,7 @@ Locks the Container (or Resources) and return the lock event.
If the capacity of the Container is greater than 1,
multiple requests can be made before blocking occurs.
"""
request(res::Container; priority::Int=0) = put!(res, 1; priority=priority)
request(res::Resource; priority=0) = put!(res, 1; priority)

"""
tryrequest(res::Container)
Expand All @@ -76,14 +80,14 @@ julia> tryrequest(res)
false
```
"""
function tryrequest(res::Container; priority::Int=0)
function tryrequest(res::Container; priority=0)
islocked(res) && return false # TODO check priority
request(res; priority)
end

function get(con::Container{N}, amount::N; priority::Int=0) where N<:Real
function get(con::Container{N, T}, amount::N; priority=zero(T)) where {N<:Real, T<:Number}
get_ev = Get(con.env)
con.get_queue[get_ev] = ContainerKey(priority, con.seid+=one(UInt), amount)
con.get_queue[get_ev] = ContainerKey(con.seid+=one(UInt), amount, T(priority))
@callback trigger_put(get_ev, con)
trigger_get(get_ev, con)
get_ev
Expand All @@ -94,16 +98,16 @@ end
Unlocks the Container and return the unlock event.
"""
unlock(res::Container; priority::Int=0) = get(res, 1; priority=priority)
unlock(res::Resource; priority::Number=0) = get(res, 1; priority=priority)

function do_put(con::Container{N}, put_ev::Put, key::ContainerKey{N}) where N<:Real
function do_put(con::Container{N, T}, put_ev::Put, key::ContainerKey{N, T}) where {N<:Real, T<:Number}
con.level + key.amount > con.capacity && return false
schedule(put_ev)
con.level += key.amount
true
end

function do_get(con::Container{N}, get_ev::Get, key::ContainerKey{N}) where N<:Real
function do_get(con::Container{N, T}, get_ev::Get, key::ContainerKey{N, T}) where {N<:Real, T<:Number}
con.level - key.amount < zero(N) && return false
schedule(get_ev)
con.level -= key.amount
Expand Down
47 changes: 25 additions & 22 deletions src/resources/stores.jl
Original file line number Diff line number Diff line change
@@ -1,63 +1,66 @@
struct StorePutKey{T} <: ResourceKey
priority :: Int
struct StorePutKey{N, T<:Number} <: ResourceKey
id :: UInt
item :: T
StorePutKey{T}(priority, id, item) where T = new(priority, id, item)
item :: N
priority :: T
end

struct StoreGetKey <: ResourceKey
priority :: Int
struct StoreGetKey{T<:Number} <: ResourceKey
id :: UInt
filter :: Function
priority :: T
end

"""
Store{T}(env::Environment; capacity::UInt=typemax(UInt))
Store{N, T<:Number}(env::Environment; capacity::UInt=typemax(UInt))
A store is a resource that can hold a number of items of type `T`. It is similar to a `Base.Channel` with a finite capacity ([`put!`](@ref) blocks after reaching capacity).
A store is a resource that can hold a number of items of type `N`. It is similar to a `Base.Channel` with a finite capacity ([`put!`](@ref) blocks after reaching capacity).
The [`put!`](@ref) and [`take!`](@ref) functions are a convenient way to interact with such a "channel" in a way mostly compatible with other discrete event and concurrency frameworks.
See [`Container`](@ref) for a more lock-like resource.
Think of `Resource` and `Container` as locks and of `Store` as channels. They block only if empty (on taking) or full (on storing).
"""
mutable struct Store{T} <: AbstractResource
mutable struct Store{N, T<:Number} <: AbstractResource
env :: Environment
capacity :: UInt
load :: UInt
items :: Dict{T, UInt}
items :: Dict{N, UInt}
seid :: UInt
put_queue :: DataStructures.PriorityQueue{Put, StorePutKey{T}}
get_queue :: DataStructures.PriorityQueue{Get, StoreGetKey}
function Store{T}(env::Environment; capacity=typemax(UInt)) where {T}
new(env, UInt(capacity), zero(UInt), Dict{T, UInt}(), zero(UInt), DataStructures.PriorityQueue{Put, StorePutKey{T}}(), DataStructures.PriorityQueue{Get, StoreGetKey}())
put_queue :: DataStructures.PriorityQueue{Put, StorePutKey{N, T}}
get_queue :: DataStructures.PriorityQueue{Get, StoreGetKey{T}}
function Store{N, T}(env::Environment; capacity=typemax(UInt)) where {N, T<:Number}
new(env, UInt(capacity), zero(UInt), Dict{N, UInt}(), zero(UInt), DataStructures.PriorityQueue{Put, StorePutKey{N, T}}(), DataStructures.PriorityQueue{Get, StoreGetKey{T}}())
end
end

function Store{N}(env::Environment; capacity=typemax(UInt)) where {N}
Store{N, Int}(env; capacity=UInt(capacity))
end

"""
put!(sto::Store, item::T)
Put an item into the store. Returns the put event, blocking if the store is full.
"""
function put!(sto::Store{T}, item::T; priority::Int=0) where T
function put!(sto::Store{N, T}, item::N; priority=zero(T)) where {N, T<:Number}
put_ev = Put(sto.env)
sto.put_queue[put_ev] = StorePutKey{T}(priority, sto.seid+=one(UInt), item)
sto.put_queue[put_ev] = StorePutKey{N, T}(sto.seid+=one(UInt), item, T(priority))
@callback trigger_get(put_ev, sto)
trigger_put(put_ev, sto)
put_ev
end

get_any_item(::T) where T = true
get_any_item(::N) where N = true

function get(sto::Store{T}, filter::Function=get_any_item; priority::Int=0) where T
function get(sto::Store{N, T}, filter::Function=get_any_item; priority=zero(T)) where {N, T<:Number}
get_ev = Get(sto.env)
sto.get_queue[get_ev] = StoreGetKey(priority, sto.seid+=one(UInt), filter)
sto.get_queue[get_ev] = StoreGetKey(sto.seid+=one(UInt), filter, T(priority))
@callback trigger_put(get_ev, sto)
trigger_get(get_ev, sto)
get_ev
end

function do_put(sto::Store{T}, put_ev::Put, key::StorePutKey{T}) where {T}
function do_put(sto::Store{N, T}, put_ev::Put, key::StorePutKey{N, T}) where {N, T<:Number}
if sto.load < sto.capacity
sto.load += one(UInt)
sto.items[key.item] = get(sto.items, key.item, zero(UInt)) + one(UInt)
Expand All @@ -66,7 +69,7 @@ function do_put(sto::Store{T}, put_ev::Put, key::StorePutKey{T}) where {T}
false
end

function do_get(sto::Store{T}, get_ev::Get, key::StoreGetKey) where {T}
function do_get(sto::Store{N, T}, get_ev::Get, key::StoreGetKey{T}) where {N, T<:Number}
for (item, number) in sto.items
if key.filter(item)
sto.load -= one(UInt)
Expand Down Expand Up @@ -126,4 +129,4 @@ tryrequest(::Store) = error("There is no well defined way to \"request\" a Store
An alias for `get(::Store)` for easier interoperability with the `Base.Channel` interface. Blocks if the store is empty.
"""
take!(sto::Store, filter::Function=get_any_item; priority::Int=0) = get(sto, filter; priority)
take!(sto::Store, filter::Function=get_any_item; priority=0) = get(sto, filter; priority)
4 changes: 2 additions & 2 deletions src/simulations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ end

struct EmptySchedule <: Exception end

struct EventKey
struct EventKey{N<:Number}
time :: Float64
priority :: Int
priority :: N
id :: UInt
end

Expand Down
2 changes: 1 addition & 1 deletion src/utils/time.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ function run(env::Environment, until::DateTime)
run(env, Dates.datetime2epochms(until))
end

function timeout(env::Environment, delay::Period; priority::Int=0, value::Any=nothing)
function timeout(env::Environment, delay::Period; priority::Number=0, value::Any=nothing)
time = now(env)
del = Dates.datetime2epochms(Dates.epochms2datetime(time)+delay)-time
timeout(env, del; priority=priority, value=value)
Expand Down
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ println("Starting tests with $(Threads.nthreads()) threads out of `Sys.CPU_THREA
@doset "resources_containers_deprecated"
@doset "resources_stores"
@doset "resources_stores_deprecated"
@doset "resource_priorities"
@doset "utils_time"
VERSION >= v"1.9" && @doset "doctests"
VERSION >= v"1.9" && @doset "aqua"
Expand Down
59 changes: 59 additions & 0 deletions test/test_resource_priorities.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
using ConcurrentSim

## Containers with Float64 priority
sim = Simulation()
con = Container{Float64}(sim,0)
put!(con, 1; priority = 10)
put!(con, 1; priority = 9.1)
put!(con, 1; priority = UInt(9))
put!(con, 1; priority = BigInt(8))
put!(con, 1; priority = BigFloat(7.1))
put!(con, 1; priority = 7)
put!(con, 1; priority = 6.1)
put!(con, 1; priority = BigInt(5))
put!(con, 1; priority = BigFloat(-Inf))
@show keys(con.put_queue)

## Containers with Int64 priority
sim = Simulation()
con = Container{Int}(sim,0)
put!(con, 1; priority = 10)
put!(con, 1; priority = 9.0)
put!(con, 1; priority = UInt(9))
put!(con, 1; priority = BigInt(8))
put!(con, 1; priority = BigFloat(7.0))
put!(con, 1; priority = 7)
put!(con, 1; priority = 6.0)
put!(con, 1; priority = BigInt(5))
put!(con, 1; priority = typemin(Int))
@show keys(con.put_queue)

## Stores with Float64 priority
sim = Simulation()
sto = Store{Symbol,Float64}(sim; capacity = UInt(5))
put!(sto, :a; priority = 10)
put!(sto, :a; priority = 9.1)
put!(sto, :a; priority = UInt(9))
put!(sto, :a; priority = BigInt(8))
put!(sto, :a; priority = BigFloat(7.1))
put!(sto, :b; priority = 7)
put!(sto, :b; priority = 6.1)
put!(sto, :b; priority = BigInt(5))
put!(sto, :b; priority = BigFloat(-Inf))
@show sto.items
@show keys(sto.put_queue)

## Stores with Int64 priority
sim = Simulation()
sto = Store{Symbol,Int}(sim; capacity = UInt(5))
put!(sto, :a; priority = 10)
put!(sto, :a; priority = 9.0)
put!(sto, :a; priority = UInt(9))
put!(sto, :a; priority = BigInt(8))
put!(sto, :a; priority = BigFloat(7.0))
put!(sto, :b; priority = 7)
put!(sto, :b; priority = 6.0)
put!(sto, :b; priority = BigInt(5))
put!(sto, :b; priority = typemin(UInt))
@show sto.items
@show keys(sto.put_queue)
2 changes: 1 addition & 1 deletion test/test_resources_containers.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using ConcurrentSim
using ResumableFunctions

@resumable function client(sim::Simulation, res::Resource, i::Int, priority::Int)
@resumable function client(sim::Simulation, res::Resource, i::Int, priority::Number)
println("$(now(sim)), client $i is waiting")
@yield request(res, priority=priority)
println("$(now(sim)), client $i is being served")
Expand Down

0 comments on commit bb72add

Please sign in to comment.