Skip to content

Commit

Permalink
FolderDict experimental support multithreaded
Browse files Browse the repository at this point in the history
  • Loading branch information
dehann committed Feb 17, 2024
1 parent 7d09d24 commit bedfd09
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
57 changes: 44 additions & 13 deletions src/dev/FolderDict.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ using UUIDs
using DocStringExtensions
using Serialization

import Base: getindex, setindex!, delete!
import Base: getindex, setindex!, delete!, keys

##

Expand All @@ -21,12 +21,12 @@ Special Features:
- User can set working folder for storage
- User can set serialization and deserialization functions, e.g. use JSON3 or Serialization
- User can set how Dict keys map to stored id's (see DFG)
- EXPERIMENTAL: thread safe
Developer Notes
- all keys must always be in `.keydict`, regardless of cache or priority
WIP Constraints:
- not yet thread safe
- Had trouble inheriting from `Base.AbstractDict`
"""
@kwdef struct FolderDict{K,V}
Expand All @@ -41,8 +41,10 @@ WIP Constraints:
""" mapping keys and ids for different use cases, default is always new uuid.
overwrite with `(k) -> k` to make keys and ids identical """
key_to_id::Function = (k) -> uuid4()
# # write lock via Task to
# writetask::Dict{K, Task} = Dict{K, Task}()
""" read lock via Tasks """
readtasks::Dict{K, Task} = Dict{K, Task}()
""" write lock via Tasks """
writetasks::Dict{K, Task} = Dict{K, Task}()
""" working directory where elemental files are stored """
wdir::String = begin
wdir_ = joinpath(tempdir(), "$(uuid4())")
Expand All @@ -62,6 +64,19 @@ function Base.getindex(
sd::FolderDict,
f
)
# first check if there is an ongoing writetask on this key
if haskey(sd.writetasks, f)
wait(sd.writetasks[f])
# now it is safe to proceed in reading newly written value to this key
# TODO slightly excessive lock, since only unlocks once storage write is done, but cache was available sooner.
end
# also check if there is an ongoing reader on this key
if haskey(sd.readtasks, f)
# NOTE super remote possibility that a task is deleted before this dict lookup and wait starts
wait(sd.readtasks[f])
# values should now be cached for multithreaded reads
end

# if already cached, and assuming a write task has not deleted the cache element yet (MUST delete from pqueue first)
if haskey(sd.pqueue, f)
# increase this priority, but be wary of over emphasis for newcomer keys
Expand All @@ -72,7 +87,7 @@ function Base.getindex(
# get id associated with this key (also throw KeyError if not present)
flb = sd.keydict[f]
# performance trick, start async load from IO resources while doing some housekeeping
tsk = @async begin
sd.readtasks[f] = @async begin
# All keys must always be present in keydict
toload = joinpath(sd.wdir, "$flb")
# fetch from cold storage
Expand All @@ -98,16 +113,15 @@ function Base.getindex(
end

# assume middle of the pack priority for this cache-miss
newp = round(Int, maxpriority/2)
sd.pqueue[f] = round(Int, maxpriority/2) # pqueue is arbitor, hence populated last
# block on IO resources fetching data
data = fetch(tsk)

# add to previously missed data to cache and pqueue
sd.cache[f] = data # cache first
sd.pqueue[f] = newp # pqueue is arbitor, hence populated last
sd.cache[f] = fetch(sd.readtasks[f])
# TODO, possible race condition in slight delay betweem writing to cache[f] after fetching data unblocks.
delete!(sd.readtasks, f)

# return data to user
return data
return sd.cache[f]
end


Expand All @@ -116,10 +130,22 @@ function setindex!(
v,
k
)
# first check if there is an ongoing reader on this key
if haskey(sd.readtasks, k)
# NOTE super remote possibility that a task is deleted before this dict lookup and wait starts
wait(sd.readtasks[k])
end
# also check if there is an ongoing writetask on this key
if haskey(sd.writetasks, k)
wait(sd.writetasks[k])
# now it is safe to proceed in reading newly written value to this key
# TODO slightly excessive lock, since only unlocks once storage write is done, but cache was available sooner.
end

# immediately/always insert new data into folder store with a unique id
id = sd.key_to_id(k)
flb = joinpath(sd.wdir, "$id")
wtsk = @async sd.serialize(flb, v) # for sluggish IO
sd.writetasks[k] = @async sd.serialize(flb, v) # for sluggish IO

# should any obsolete files be deleted from the filesystem?
dtsk = if haskey(sd.keydict, k)
Expand Down Expand Up @@ -151,7 +177,9 @@ function setindex!(

# wait for any disk mutations to finish
wait(dtsk)
wait(wtsk)
# last thing is to wait and free write task locks, assuming waiting readers
wait(sd.writetasks[k])
delete!(sd.writetasks, k)

# return the value
return v
Expand Down Expand Up @@ -184,4 +212,7 @@ function delete!(
return sd
end


keys(sd::FolderDict) = keys(sd.keydict)

##
2 changes: 2 additions & 0 deletions test/testFolderDict.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,7 @@ delete!(fd, :b)
@test_throws KeyError fd[:b]


@test 2 == length(intersect([:a; :c], collect(keys(fd))))

##
end

0 comments on commit bedfd09

Please sign in to comment.