Skip to content

Commit

Permalink
rdoc'ing
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Jun 8, 2022
1 parent 7f3f6ac commit f8c7579
Showing 1 changed file with 31 additions and 21 deletions.
52 changes: 31 additions & 21 deletions lib/sidekiq/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,24 +217,30 @@ class Queue
include Enumerable

##
# Return all known queues within Redis.
# Fetch all known queues within Redis.
#
# @return [Array<Sidekiq::Queue>]
def self.all
Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
end

attr_reader :name

# @param name [String] the name of the queue
def initialize(name = "default")
@name = name.to_s
@rname = "queue:#{name}"
end

# The current size of the queue within Redis.
# This value is real-time and can change between calls.
#
# @return [Integer] the size
def size
Sidekiq.redis { |con| con.llen(@rname) }
end

# Sidekiq Pro overrides this
# @return [Boolean] if the queue is currently paused
def paused?
false
end
Expand All @@ -243,7 +249,7 @@ def paused?
# Calculates this queue's latency, the difference in seconds since the oldest
# job in the queue was enqueued.
#
# @return Float
# @return [Float] in seconds
def latency
entry = Sidekiq.redis { |conn|
conn.lrange(@rname, -1, -1)
Expand Down Expand Up @@ -279,24 +285,21 @@ def each
##
# Find the job with the given JID within this queue.
#
# This is a slow, inefficient operation. Do not use under
# This is a *slow, inefficient* operation. Do not use under
# normal conditions.
#
# @param jid string - looks for job id within the queue
#
# @return success: A Sidekiq::JobRecord
# @return failure: nil
# @param jid [String] the job_id to look for
# @return [Sidekiq::JobRecord]
# @return [nil] if not found
def find_job(jid)
detect { |j| j.jid == jid }
end

##
# multioperation transaction
#
# delete all jobs within this queue
def clear
Sidekiq.redis do |conn|
conn.multi do |transaction|
transaction.unlink(@rname) # high performance delete
transaction.unlink(@rname)
transaction.srem("queues", name)
end
end
Expand All @@ -318,8 +321,9 @@ def as_json(options = nil) # :nodoc:
class JobRecord
attr_reader :item
attr_reader :value
attr_reader :queue

def initialize(item, queue_name = nil)
def initialize(item, queue_name = nil) # :nodoc:
@args = nil
@value = item
@item = item.is_a?(Hash) ? item : parse(item)
Expand All @@ -337,7 +341,7 @@ def parse(item) # :nodoc:
{}
end

def klass # :nodoc:
def klass
self["class"]
end

Expand Down Expand Up @@ -424,22 +428,20 @@ def error_backtrace
end
end

attr_reader :queue

def latency
now = Time.now.to_f
now - (@item["enqueued_at"] || @item["created_at"] || now)
end

##
# Remove this job from the queue.
# Remove this job from the queue
def delete
count = Sidekiq.redis { |conn|
conn.lrem("queue:#{@queue}", 1, @value)
}
count != 0
end

# Access arbitrary attributes within the job hash
def [](name)
# nil will happen if the JSON fails to parse.
# We don't guarantee Sidekiq will work with bad job JSON but we should
Expand Down Expand Up @@ -477,11 +479,13 @@ def uncompress_backtrace(backtrace)
end
end

# Represents a job within a Redis sorted set where the score
# represents a timestamp for the job.
class SortedEntry < JobRecord
attr_reader :score
attr_reader :parent

def initialize(parent, score, item)
def initialize(parent, score, item) # :nodoc:
super(item)
@score = Float(score)
@parent = parent
Expand All @@ -499,19 +503,26 @@ def delete
end
end

# Change the scheduled time for this job.
#
# @param [Time] the new timestamp when this job will be enqueued.
def reschedule(at)
Sidekiq.redis do |conn|
conn.zincrby(@parent.name, at.to_f - @score, Sidekiq.dump_json(@item))
end
end

# Enqueue this job from the scheduled or dead set so it will
# be executed at some point in the near future.
def add_to_queue
remove_job do |message|
msg = Sidekiq.load_json(message)
Sidekiq::Client.push(msg)
end
end

# enqueue this job from the retry set so it will be executed
# at some point in the near future.
def retry
remove_job do |message|
msg = Sidekiq.load_json(message)
Expand All @@ -520,8 +531,7 @@ def retry
end
end

##
# Place job in the dead set
# Move this job from its current set into the Dead set.
def kill
remove_job do |message|
DeadSet.new.kill(message)
Expand Down

0 comments on commit f8c7579

Please sign in to comment.