Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduled jobs breaking with new_unique_for method #101

Closed
wants to merge 7 commits into from

Conversation

pik
Copy link
Contributor

@pik pik commented Aug 9, 2015

#93 #98 So having dug into the code it looks like set_nx was not a viable option for new_unique_for and neither was my update to the old unique_for whereby it was altered to always use item['jid'] without a special assignment if it's being scheduled rather than enqueued. This patch fixes the broken behaviour for both new_unique_for and old_unique_for.

pik added 4 commits August 9, 2015 21:36
 * set_nx is not used except for scheduling a job
 * a job is allowed to be queued if a job with the same payload_ahsh
 is already scheduled but not vice-versa
 * fixes jobs scheduled with perform_in / perform_at not being run
 * use lua script in new_unique_for
@pik pik force-pushed the bug/scheduled-jobs branch from df42767 to 82ee754 Compare August 9, 2015 16:17
pid = conn.get(payload_hash).to_i
if pid == 1 || (pid == 2 && item['at'])
pid = conn.get(payload_hash)
if pid && (pid != 'scheduled' || item['at'])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry but if != || == WAT?!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code was a mess already. What I'd like to see is something more in the lines of:

def old_unique_for?
  connection do |conn|
    conn.watch(payload_hash)
    pid = conn.get(payload_hash)
    return create_lock(conn) unless pid == 'scheduled' || item['at']
    conn.unwatch
  end
end

def create_lock(conn)
  conn.multi do
    return clear_exired_lock(conn) unless expires_at > 0
    conn.setex(payload_hash, expires_at, item['at'] ? 'scheduled' : item['jid'])
  end
end

def clear_expired_lock(conn)
  conn.del(payload_hash) # already expired
end

Thoughts on that?

@mhenrixon
Copy link
Owner

Love the contribution! Had a few notes but if something is out of order or doesn't make sense just say the word and I will spend some time when I am back from the dead.

@pik
Copy link
Contributor Author

pik commented Aug 11, 2015

@mhenrixon Thanks for the feedback on this. I'm going to try and go through my understanding of what's happening just to make sure this is the correct fix and it's not introducing any subtle bugs.

The old_unique_for never removed the locks for a scheduled job - since it was possible to just promote (over-write) a scheduled lock to a queued-lock (I think these were 1 and 2 respectively). (The alternative would be to have a Mutex for schedule and a Mutex for queue, but this seems more involved as I'm not sure there is an easy way to tell when a Job is being queued whether it had previously been scheduled or not? (once item['at'] has been deleted).

As far as I understand the Sidekiq behaviour:
Job has item['at'] sent - Sidekiq client atomic_push sends it to the schedule queue if item['at'] != nil, middleware is called once. At schedule time when the job is fetched if item['at'] is in the past this is deleted and middleware is called again.

So the old_unique_for behaviour would have been (pseudo-code):

Middleware run 1 (client):
  # Mutex is nil
If not Mutex or Mutex is 1 ('scheduled') and not item['at']:
  Set Mutex to -> 1 ('scheduled')
  Added to 'schedule' queue
else:
  # Job is not scheduled

Middleware run 2 (client):
# Mutex is 1
# Item['at'] has been deleted
If not Mutex or Mutex is 1 ('scheduled') and not item['at']:
  Set Mutex to -> 2 (normal queue)
  Added to the get_sidekiq_options['queue'] queue
else:
  # Do not Schedule another job, or add a duplicate job to a normal queue.

Middleware run 3 (server):
  # Jobs are always trying to Unlock, even if Unique: True is not set.
  # Issue #100
  # Not checking whether Job Timeout and Mutex belongs to it or not #Fixed by checking JID
  If before_yield:
    Set Mutex -> Nil
    # If other jobs are queued and fetched after this they can start to run at the same-time with this one
    Run Job
  Else:
    Run Job
    # If Jobs are dropped while this is running some values might remain out-dated
    Set Mutex -> Nil

The reason SETNX was not working with the new_unique_for is because SETNX is always false if any value is present. Meaning that it would have looked
like this for step 2 (above):

Middleware run 2 (client):
# Mutex is 1
# Item['at'] has been deleted
If SETNX(<key>, 2):
  # This fails
  # Payload is considered duplicate
  Set Mutex to -> 2 (normal queue)
  Added to the get_sidekiq_options['queue'] queue
else:
  # Do not Schedule another job, or add a duplicate job to a normal queue.

New Version:

Middleware run 1 (client):
# Mutex is nil
If not Mutex or Mutex is 'scheduled'(1) and not item['at']:
  Set Mutex to -> 'scheduled' (1)
  Added to 'schedule' queue
else:
  # Job is not scheduled


Middleware run 2 (client):
# Mutex is 'scheduled' (1)
# Item['at'] has been deleted
If not Mutex or Mutex is 'scheduled' (1) and not item['at']:
  Set Mutex to -> item ['jid']
  Added to the get_sidekiq_options['queue'] queue
else:
  # Do not Schedule another job, or add a duplicate job to a normal queue.

Not checking whether Job Timeout and Mutex belongs to it or not,

Fixed by checking JID b51e733

Middleware run 3 (server):
  If before_yield:
    # Compare JID
    If Mutex Set By Me:
      Set Mutex -> Nil
    # If other jobs are queued and fetched after this they can start to run at the same-time with this one
    Run Job
  else if after_yield:
    Run Job
    # If Jobs are dropped while this is running some values might remain out-dated
    # Compare JID
    If Mutex Set By Me:
      Set Mutex -> Nil

RunLock PR: #99

  else if run_lock:
    # Optional spin-lock to acquire
    If acquire Run-Mutex:
      Set Run-Mutex -> Item['jid']
      Set Mutex -> Nil
      Run Job
      # Another job can still be queued, but will not run till this is complete
      Set Run-Mutex -> Nil
    Else:
      Drop or Reschedule
  else:
    # Not using Unique
    # Run Job

pik@74bafc4#diff-40bba6fd31628d2a95fd0ab341dcde73R42

@mhenrixon
Copy link
Owner

There is still a problem with #98. The thing we need to do is to create a composite key with the time to run the job and the argument. We need to then make sure that the jobs unique arguments are used for locking the run as to prevent the same job running twice (simultaneously) if the first job gets delayed.

Does that make sense at all? I don't see the code taking this into consideration.

@pik
Copy link
Contributor Author

pik commented Aug 11, 2015

@mhenrixon Are you sure? All of the jobs in #98 run correctly for me with this patch.

We need to then make sure that the jobs unique arguments are used for locking the run as to prevent the same job running twice (simultaneously) if the first job gets delayed.

This should already happen because an JID type lock can only over-ride a 'scheduled' lock, so when the 'scheduled' job comes up and runs middleware again either it can promote it's previous 'scheduled' lock to an JID or a JID will already be there and it will fail. I can try and add some more specs for this?

@pik pik force-pushed the bug/scheduled-jobs branch 2 times, most recently from 8608d35 to 6afc38a Compare August 19, 2015 10:24
 * split old_unique_for? into unique_schedule_old and unique_enqueue_old
@deltaroe
Copy link
Contributor

I'd like to propose another solution. #105

@pik
Copy link
Contributor Author

pik commented Aug 21, 2015

@deltaroe That doesn't solve anything - please see failing specs, I added a reversion for it in #107.

@mhenrixon RE: Tests
This branch is not passing only on versions of Sidekiq 3.1 and older - I believe this has to do with a bug in the redis connection pool implementation for watch/multi/exec on those versions of Sidekiq. Since this problem doesn't exist on the newer versions of Sidekiq I think it would just make sense to set the minimum requirement to at least 3.2 (Sidekiq is at 3.4.2 now and a lot of things have changed since).

@deltaroe
Copy link
Contributor

@pix It sounds like we have different use cases. In our environment we don't want a job to be able to run now if it's scheduled to run in the future. This was brought up by my colleague in #91, however PR #96 changed the behavior to what we expected and was merged between when the issue was reported and @mhenrixon tried to reproduce the issue.

I wonder if adding some method to specify which mode is desired would make sense here

@pik
Copy link
Contributor Author

pik commented Aug 22, 2015

@deltaroe #96 only changed the old_unique_for to work the same way as the new_unique_for method - I was unaware at the time that the new_unique_for method was responsible for the scheduling issue -- hence this current PR.

@mhenrixon mhenrixon closed this in 745ebea Aug 29, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants