Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Because replace is a client strategy, it should only remove client locks aka queue locks. #778

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 45 additions & 12 deletions lib/sidekiq_unique_jobs/digests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ module SidekiqUniqueJobs
# @author Mikael Henriksson <[email protected]>
#
class Digests < Redis::SortedSet
#
# @return [Integer] the number of matches to return by default
DEFAULT_COUNT = 1_000
#
# @return [String] the default pattern to use for matching
SCAN_PATTERN = "*"
#
# @return [Array(String, String, String, String)] The empty runtime or queuetime keys.
EMPTY_KEYS_SEGMENT = ["", "", "", ""].freeze

def initialize(digests_key = DIGESTS)
super(digests_key)
end
Expand Down Expand Up @@ -41,19 +51,14 @@ def delete_by_pattern(pattern, count: DEFAULT_COUNT)
# Also deletes the :AVAILABLE, :EXPIRED etc keys
#
# @param [String] digest a unique digest to delete
def delete_by_digest(digest) # rubocop:disable Metrics/MethodLength
# @param queuetime [Boolean] Whether to delete queue locks.
# @param runtime [Boolean] Whether to delete run locks.
def delete_by_digest(digest, queuetime: true, runtime: true)
result, elapsed = timed do
call_script(:delete_by_digest, [
digest,
"#{digest}:QUEUED",
"#{digest}:PRIMED",
"#{digest}:LOCKED",
"#{digest}:RUN",
"#{digest}:RUN:QUEUED",
"#{digest}:RUN:PRIMED",
"#{digest}:RUN:LOCKED",
key,
])
call_script(
:delete_by_digest,
queuetime_keys(queuetime ? digest : nil) + runtime_keys(runtime ? digest : nil) + [key],
)
end

log_info("#{__method__}(#{digest}) completed in #{elapsed}ms")
Expand Down Expand Up @@ -97,5 +102,33 @@ def page(cursor: 0, pattern: SCAN_PATTERN, page_size: 100)
]
end
end

private

# @param digest [String, nil] The digest to form runtime keys from.
# @return [Array(String, String, String, String)] The list of runtime keys or empty strings if +digest+ was +nil+.
def runtime_keys(digest)
return EMPTY_KEYS_SEGMENT unless digest

[
"#{digest}:RUN",
"#{digest}:RUN:QUEUED",
"#{digest}:RUN:PRIMED",
"#{digest}:RUN:LOCKED",
]
end

# @param digest [String, nil] The digest to form queuetime keys from.
# @return [Array(String, String, String, String)] The list of queuetime keys or empty strings if +digest+ was +nil+.
def queuetime_keys(digest)
return EMPTY_KEYS_SEGMENT unless digest

[
digest,
"#{digest}:QUEUED",
"#{digest}:PRIMED",
"#{digest}:LOCKED",
]
end
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/on_conflict/replace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def delete_job_by_digest
# @return [Integer] the number of keys deleted
#
def delete_lock
digests.delete_by_digest(lock_digest)
digests.delete_by_digest(lock_digest, runtime: false)
end

#
Expand Down