Skip to content

Commit

Permalink
Remove broker_id as first argument of all Lua scripts
Browse files Browse the repository at this point in the history
This was intended to register the broker each time a Lua script
was executed. It was never used.
  • Loading branch information
NicolasLM committed Mar 28, 2021
1 parent 0c6bce2 commit 86efca1
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 45 deletions.
5 changes: 2 additions & 3 deletions spinach/brokers/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,13 @@ def _load_script(self, filename: str) -> Script:
return rv

def _run_script(self, script: Script, *args):
# Always insert broker_id as first argument
args = [str(self._id)] + list(args)

if script not in self._idempotency_protected_scripts:
return script(args=args)

# Script is protected by idempotency token, can be retried safely.
# Insert idempotency token as first argument.
idempotency_token = generate_idempotency_token()
args = list(args)
args.insert(
0, self._to_namespaced('idempotency_{}'.format(idempotency_token))
)
Expand Down Expand Up @@ -236,6 +234,7 @@ def _subscriber_func(self):
logger.debug('Deregistering broker')
self._run_script(
self._deregister,
str(self._id),
self._to_namespaced(ALL_BROKERS_HASH_KEY),
self._to_namespaced(ALL_BROKERS_ZSET_KEY)
)
Expand Down
13 changes: 6 additions & 7 deletions spinach/brokers/redis_scripts/enqueue_job.lua
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
-- idempotency protected script, do not remove comment
local idempotency_token = ARGV[1]
local broker_id = ARGV[2]
local notifications = ARGV[3]
local running_jobs_key = ARGV[4]
local namespace = ARGV[5]
local future_jobs = ARGV[6]
-- jobs starting at ARGV[7]
local notifications = ARGV[2]
local running_jobs_key = ARGV[3]
local namespace = ARGV[4]
local future_jobs = ARGV[5]
-- jobs starting at ARGV[6]

if not redis.call('set', idempotency_token, 'true', 'EX', 3600, 'NX') then
redis.log(redis.LOG_WARNING, "Not reprocessing script")
return -1
end

for i=7, #ARGV do
for i=6, #ARGV do
local job_json = ARGV[i]
local job = cjson.decode(job_json)
if job["status"] == 2 then
Expand Down
13 changes: 6 additions & 7 deletions spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
local broker_id = ARGV[1]
local dead_broker_id = ARGV[2]
local running_jobs_key = ARGV[3]
local all_brokers_hash_key = ARGV[4]
local all_brokers_zset_key = ARGV[5]
local namespace = ARGV[6]
local notifications = ARGV[7]
local dead_broker_id = ARGV[1]
local running_jobs_key = ARGV[2]
local all_brokers_hash_key = ARGV[3]
local all_brokers_zset_key = ARGV[4]
local namespace = ARGV[5]
local notifications = ARGV[6]

local num_enqueued_jobs = 0

Expand Down
4 changes: 2 additions & 2 deletions spinach/brokers/redis_scripts/flush.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
local broker_id = ARGV[1]
local namespace = ARGV[2]
local namespace = ARGV[1]

local pattern = string.format("%s/*", namespace)

for _, key in ipairs(redis.call('keys', pattern)) do
Expand Down
9 changes: 4 additions & 5 deletions spinach/brokers/redis_scripts/get_jobs_from_queue.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
local broker_id = ARGV[1]
local queue = ARGV[2]
local running_jobs_key = ARGV[3]
local job_status_running = tonumber(ARGV[4])
local max_jobs = tonumber(ARGV[5])
local queue = ARGV[1]
local running_jobs_key = ARGV[2]
local job_status_running = tonumber(ARGV[3])
local max_jobs = tonumber(ARGV[4])

local jobs = {}

Expand Down
29 changes: 14 additions & 15 deletions spinach/brokers/redis_scripts/move_future_jobs.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
local broker_id = ARGV[1]
local namespace = ARGV[2]
local future_jobs = ARGV[3]
local notifications = ARGV[4]
local now = ARGV[5]
local job_status_queued = tonumber(ARGV[6])
local periodic_tasks_hash = ARGV[7]
local periodic_tasks_queue = ARGV[8]
local all_brokers_hash_key = ARGV[9]
local all_brokers_zset_key = ARGV[10]
local broker_info_json = ARGV[11]
local broker_dead_threshold_seconds = ARGV[12]
-- uuids starting at ARGV[13]
local namespace = ARGV[1]
local future_jobs = ARGV[2]
local notifications = ARGV[3]
local now = ARGV[4]
local job_status_queued = tonumber(ARGV[5])
local periodic_tasks_hash = ARGV[6]
local periodic_tasks_queue = ARGV[7]
local all_brokers_hash_key = ARGV[8]
local all_brokers_zset_key = ARGV[9]
local broker_info_json = ARGV[10]
local broker_dead_threshold_seconds = ARGV[11]
-- uuids starting at ARGV[12]
-- lua in Redis cannot generate random UUIDs, so they are generated in Python
-- and passed with each calls

Expand All @@ -28,7 +27,7 @@ local jobs_json = redis.call('zrangebyscore', future_jobs, '-inf', now, 'LIMIT',
local jobs_moved = 0

-- Create jobs from due periodic tasks
local number_of_uuids = #ARGV + 1 - 13 -- as uuids start at ARGV[13]
local number_of_uuids = #ARGV + 1 - 12 -- as uuids start at ARGV[12]
local task_names = redis.call('zrangebyscore', periodic_tasks_queue, '-inf', now, 'LIMIT', 0, number_of_uuids)
for i, task_name in ipairs(task_names) do

Expand All @@ -40,7 +39,7 @@ for i, task_name in ipairs(task_names) do
else
local task = cjson.decode(task_json)
local job = {}
job["id"] = ARGV[13 + i - 1]
job["id"] = ARGV[12 + i - 1]
job["status"] = job_status_queued
job["task_name"] = task_name
job["queue"] = task["queue"]
Expand Down
11 changes: 5 additions & 6 deletions spinach/brokers/redis_scripts/register_periodic_tasks.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
local broker_id = ARGV[1]
local now = ARGV[2]
local periodic_tasks_hash = ARGV[3]
local periodic_tasks_queue = ARGV[4]
-- tasks to register starting at ARGV[5]
local now = ARGV[1]
local periodic_tasks_hash = ARGV[2]
local periodic_tasks_queue = ARGV[3]
-- tasks to register starting at ARGV[4]


local function contains(t, e)
Expand All @@ -13,7 +12,7 @@ end
local old_task_names = redis.call('hkeys', periodic_tasks_hash)
local new_task_names = {}

for i=5, #ARGV do
for i=4, #ARGV do
local task_json = ARGV[i]
local task = cjson.decode(task_json)
local next_event_time = now + task["periodicity"]
Expand Down

0 comments on commit 86efca1

Please sign in to comment.