Skip to content

Commit

Permalink
Document code (#296)
Browse files Browse the repository at this point in the history
* Adds documentation

* Fix broken build
  • Loading branch information
mhenrixon authored Jul 22, 2018
1 parent 22cfbc8 commit c747674
Show file tree
Hide file tree
Showing 22 changed files with 361 additions and 17 deletions.
21 changes: 21 additions & 0 deletions lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
require 'sidekiq_unique_jobs/sidekiq_unique_ext'
require 'sidekiq_unique_jobs/on_conflict'

# Namespace for this gem
#
# Contains configuration and utility methods that belongs top level
#
# @author Mikael Henriksson <[email protected]>
module SidekiqUniqueJobs
include SidekiqUniqueJobs::Connection

Expand All @@ -44,6 +49,7 @@ module SidekiqUniqueJobs
:logger,
)

# The current configuration (See: {.configure} on how to configure)
def config
# Arguments here need to match the definition of the new class (see above)
@config ||= Concurrent::MutableStruct::Config.new(
Expand All @@ -54,14 +60,20 @@ def config
)
end

# The current logger
# @return [Logger] the configured logger
def logger
config.logger
end

# Set a new logger
# @param [Logger] other a new logger
def logger=(other)
config.logger = other
end

# Change global configuration while yielding
# @yield control to the caller
def use_config(tmp_config)
fail ::ArgumentError, "#{name}.#{__method__} needs a block" unless block_given?

Expand All @@ -71,6 +83,15 @@ def use_config(tmp_config)
configure(old_config)
end

# Configure the gem
#
# This is usually called once at startup of an application
# @param [Hash] options global gem options
# @option options [Integer] :default_lock_timeout (default is 0)
# @option options [true,false] :enabled (default is true)
# @option options [String] :unique_prefix (default is 'uniquejobs')
# @option options [Logger] :logger (default is Sidekiq.logger)
# @yield control to the caller when given block
def configure(options = {})
if block_given?
yield config
Expand Down
13 changes: 12 additions & 1 deletion lib/sidekiq_unique_jobs/client/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,20 @@

module SidekiqUniqueJobs
module Client
# The unique sidekiq middleware for the client push
#
# @author Mikael Henriksson <[email protected]>
class Middleware
include SidekiqUniqueJobs::Logging
include OptionsWithFallback

# :reek:LongParameterList { max_params: 4 }
# Calls this client middleware
# Used from Sidekiq.process_single
# @param [String] worker_class name of the sidekiq worker class
# @param [Hash] item a sidekiq job hash
# @param [String] queue name of the queue
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
# @yield when uniqueness is disable or lock successful
def call(worker_class, item, queue, redis_pool = nil)
@worker_class = worker_class
@item = item
Expand All @@ -20,6 +29,8 @@ def call(worker_class, item, queue, redis_pool = nil)

private

# The sidekiq job hash
# @return [Hash] the Sidekiq job hash
attr_reader :item

def success?
Expand Down
6 changes: 5 additions & 1 deletion lib/sidekiq_unique_jobs/connection.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# Shared module for dealing with redis connections
#
# @author Mikael Henriksson <[email protected]>
module Connection
def self.included(base)
base.send(:extend, self)
end

# :reek:UtilityFunction { enabled: false }
# Creates a connection to redis
# @return [Sidekiq::RedisConnection, ConnectionPool] a connection to redis
def redis(redis_pool = nil)
if redis_pool
redis_pool.with { |conn| yield conn }
Expand Down
11 changes: 11 additions & 0 deletions lib/sidekiq_unique_jobs/exceptions.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# Error raised when a Lua script fails to execute
#
# @author Mikael Henriksson <[email protected]>
class Conflict < StandardError
def initialize(item)
super("Item with the key: #{item[UNIQUE_DIGEST_KEY]} is already scheduled or processing")
end
end

# Error raised from {OnConflict::Raise}
#
# @author Mikael Henriksson <[email protected]>
class ScriptError < StandardError
# @param [Symbol] file_name the name of the lua script
# @param [Redis::CommandError] ex exception to handle
def initialize(file_name:, source_exception:)
super("Problem compiling #{file_name}. Message: #{source_exception.message}")
end
end

# Error raised from {OptionsWithFallback#lock_class}
#
# @author Mikael Henriksson <[email protected]>
class UnknownLock < StandardError
end
end
37 changes: 35 additions & 2 deletions lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,25 @@

module SidekiqUniqueJobs
class Lock
# Abstract base class for locks
#
# @abstract
# @author Mikael Henriksson <[email protected]>
class BaseLock
include SidekiqUniqueJobs::Logging

# @param [Hash] item the Sidekiq job hash
# @param [Proc] callback the callback to use after unlock
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
def initialize(item, callback, redis_pool = nil)
@item = prepare_item(item)
@callback = callback
@redis_pool = redis_pool
end

# Handles locking of sidekiq jobs.
# Will call a conflict strategy if lock can't be achieved.
# @return [String] the sidekiq job id
def lock
if (token = locksmith.lock(item[LOCK_TIMEOUT_KEY]))
token
Expand All @@ -19,30 +29,53 @@ def lock
end
end

# Execute the job in the Sidekiq server processor
# @raise [NotImplementedError] needs to be implemented in child class
def execute
raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}"
end

# Unlocks the job from redis
# @return [String] sidekiq job id when successful
# @return [false] when unsuccessful
def unlock
locksmith.signal(item[JID_KEY]) # Only signal to release the lock
end

# Deletes the job from redis if it is locked.
def delete
locksmith.delete # Soft delete (don't forcefully remove when expiration is set)
end

# Forcefully deletes the job from redis.
# This is good for jobs when a previous lock was not unlocked
def delete!
locksmith.delete! # Force delete the lock
end

# Checks if the item has achieved a lock
# @return [true] when this jid has locked the job
# @return [false] when this jid has not locked the job
def locked?
locksmith.locked?(item[JID_KEY])
end

private

attr_reader :item, :redis_pool, :callback
# The sidekiq job hash
# @return [Hash] the Sidekiq job hash
attr_reader :item

# The sidekiq redis pool
# @return [Sidekiq::RedisConnection, ConnectionPool, NilClass] the redis connection
attr_reader :redis_pool

# The sidekiq job hash
# @return [Proc] the callback to use after unlock
attr_reader :callback

# The interface to the locking mechanism
# @return [SidekiqUniqueJobs::Locksmith]
def locksmith
@locksmith ||= SidekiqUniqueJobs::Locksmith.new(item, redis_pool)
end
Expand Down Expand Up @@ -84,7 +117,7 @@ def callback_safely
end

def strategy
OnConflict.find_strategy(item[ON_CONFLICT_KEY]).new(item)
@strategy ||= OnConflict.find_strategy(item[ON_CONFLICT_KEY]).new(item)
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs while the job is executing in the server process
# - Locks on perform_in or perform_async (see {UntilExecuting})
# - Unlocks before yielding to the worker's perform method (see {UntilExecuting})
# - Locks before yielding to the worker's perform method (see {WhileExecuting})
# - Unlocks after yielding to the worker's perform method (see {WhileExecuting})
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <[email protected]>
class UntilAndWhileExecuting < BaseLock
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return unless locked?
unlock
Expand Down
7 changes: 7 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_executed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs until the server is done executing the job
# - Locks on perform_in or perform_async
# - Unlocks after yielding to the worker's perform method
#
# @author Mikael Henriksson <[email protected]>
class UntilExecuted < BaseLock
OK ||= 'OK'

# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return unless locked?
with_cleanup { yield }
Expand Down
7 changes: 7 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs until {#execute} starts
# - Locks on perform_in or perform_async
# - Unlocks after yielding to the worker's perform method
#
# @author Mikael Henriksson <[email protected]>
class UntilExecuting < BaseLock
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
unlock_with_callback
yield
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_expired.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,23 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs until the lock has expired
# - Locks on perform_in or perform_async
# - Unlocks when the expiration is hit
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <[email protected]>
class UntilExpired < BaseLock
# Prevents these locks from being unlocked
# @return [true] always returns true
def unlock
true
end

# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return unless locked?
yield
Expand Down
20 changes: 17 additions & 3 deletions lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,35 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs while the job is executing in the server process
# - Locks before yielding to the worker's perform method
# - Unlocks after yielding to the worker's perform method
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <[email protected]>
class WhileExecuting < BaseLock
RUN_SUFFIX ||= ':RUN'

# @param [Hash] item the Sidekiq job hash
# @param [Proc] callback callback to call after unlock
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
def initialize(item, callback, redis_pool = nil)
super(item, callback, redis_pool)
append_unique_key_suffix
end

# Returning true makes sure the client
# can push the job on the queue
# Simulate that a client lock was achieved.
# These locks should only ever be created in the server process.
# @return [true] always returns true
def lock
true
end

# Locks the job with the RUN_SUFFIX appended
# Executes in the Sidekiq server process.
# These jobs are locked in the server process not from the client
# @yield to the worker class perform method
def execute
return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY])
with_cleanup { yield }
Expand Down
12 changes: 12 additions & 0 deletions lib/sidekiq_unique_jobs/lock/while_executing_reject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,25 @@

module SidekiqUniqueJobs
class Lock
# Locks jobs while executing
# Locks from the server process
# Unlocks after the server is done processing
#
# See {#lock} for more information about the client.
# See {#execute} for more information about the server
#
# @author Mikael Henriksson <[email protected]>
class WhileExecutingReject < WhileExecuting
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
return strategy.call unless locksmith.lock(item[LOCK_TIMEOUT_KEY])

with_cleanup { yield }
end

# Overridden with a forced {OnConflict::Reject} strategy
# @return [OnConflict::Reject] a reject strategy
def strategy
@strategy ||= OnConflict.find_strategy(:reject).new(item)
end
Expand Down
Loading

0 comments on commit c747674

Please sign in to comment.