Skip to content

Commit

Permalink
Merge pull request #83 from dpkp/handle_queue_cancels
Browse files Browse the repository at this point in the history
Handle canceled replyQueue.get
  • Loading branch information
fiorix committed Jun 29, 2015
2 parents 018a83a + 488af3f commit af3e0ac
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 1 deletion.
52 changes: 52 additions & 0 deletions tests/test_cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# coding: utf-8
# Copyright 2009 Alexandre Fiori
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer, reactor
from twisted.trial import unittest

import txredisapi as redis

from tests.mixins import REDIS_HOST, REDIS_PORT


class TestRedisCancels(unittest.TestCase):
@defer.inlineCallbacks
def test_cancel(self):
db = yield redis.Connection(REDIS_HOST, REDIS_PORT)

prefix = 'txredisapi:cancel'

# Set + Get
key = prefix + '1'
value = 'first'
res = yield db.set(key, value)
self.assertEquals('OK', res)
val = yield db.get(key)
self.assertEquals(val, value)

# Cancel a method
d = db.time()
d.addErrback(lambda _: True)
d.cancel()

# And Set + Get
key = prefix + '2'
value = 'second'
res = yield db.set(key, value)
#self.assertEquals('OK', res)
val = yield db.get(key)
self.assertEquals(val, value)

yield db.disconnect()
19 changes: 18 additions & 1 deletion txredisapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,23 @@ def lineLengthExceeded(self, line):
return self.transport.loseConnection()


class ReplyQueue(defer.DeferredQueue):
"""
Subclass defer.DeferredQueue to maintain consistency of
producers / consumers in light of defer.cancel
"""
def _cancelGet(self, d):
# rather than remove(d), the default twisted behavior
# we need to maintain an entry in the waiting list
# because the reply code assumes that every call
# to transport.write() generates a corresponding
# reply value in the queue.
# so we will just replace the cancelled deferred
# with a noop
i = self.waiting.index(d)
self.waiting[i] = defer.Deferred()


class BaseRedisProtocol(LineReceiver, policies.TimeoutMixin):
"""
Redis client protocol.
Expand All @@ -212,7 +229,7 @@ def __init__(self, charset="utf-8", errors="strict"):
self.post_proc = []
self.multi_bulk = MultiBulkStorage()

self.replyQueue = defer.DeferredQueue()
self.replyQueue = ReplyQueue()

self.transactions = 0
self.inTransaction = False
Expand Down

0 comments on commit af3e0ac

Please sign in to comment.