Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a sorted set container type #3

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"mrpoundsign/pheanstalk-5.3": "dev-master",
"aws/aws-sdk-php": "dev-master",
"amazonwebservices/aws-sdk-for-php": "dev-master",
"predis/predis": "v0.8.0",
"predis/predis": "1.*",
"iron-io/iron_mq": "dev-master",
"ext-memcache": "*",
"microsoft/windowsazure": "dev-master"
Expand Down
149 changes: 136 additions & 13 deletions src/PHPQueue/Backend/Predis.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,22 @@
use PHPQueue\Interfaces\FifoQueueStore;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!! Just a few minor comments:

  • Maybe be a little more consistent with terminology? For example, in the PHPdoc on line 11, you talk about a key-value store, but in the exception messages in push() you use "zset".
  • A good approach might be to mention in the PHPdoc that this is backed by a Redis zset... and also maybe note that that's the origin of the term "score".
  • A bit more inline comments explaining the why of some stuff would be cool, I think... Since I'm not looking at the whole codebase I'm not sure what's obvious and what's not, but... it does feel like there could be more inline explanations.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about "score", I'm hiding that detail in favor of "order key" which should be more descriptive.


/**
* NOTE: The FIFO index is not usable as a key-value selector in this backend.
* Wraps several styles of redis use:
* - If constructed with a "order_key" option, the data will be accessible
* as a key-value store, and will also provide pop and push using
* $data[$order_key] as the FIFO ordering. If the ordering value is a
* timestamp, for example, then the queue will have real-world FIFO
* behavior over time, and even if the data comes in out of order, we will
* always pop the true oldest record.
* If you wish to push to this type of store, you'll also need to provide
* the "correlation_key" option so the random-access key can be
* extracted from data.
* - Pushing scalar data will store it as a queue under queue_name.
* - Setting scalar data will store it under the key.
* - If data is an array, setting will store it as a hash, under the key.
*
* TODO: The different behaviors should be modeled as several backends which
* perhaps inherit from an AbstractPredis.
*/
class Predis
extends Base
Expand All @@ -18,9 +33,15 @@ class Predis
const TYPE_SET='set';
const TYPE_NONE='none';

// Internal sub-key to hold the ordering.
const FIFO_INDEX = 'fifo';

public $servers;
public $redis_options = array();
public $queue_name;
public $expiry;
public $order_key;
public $correlation_key;

public function __construct($options=array())
{
Expand All @@ -34,11 +55,21 @@ public function __construct($options=array())
if (!empty($options['queue'])) {
$this->queue_name = $options['queue'];
}
if (!empty($options['expiry'])) {
$this->expiry = $options['expiry'];
}
if (!empty($options['order_key'])) {
$this->order_key = $options['order_key'];
$this->redis_options['prefix'] = $this->queue_name . ':';
}
if (!empty($options['correlation_key'])) {
$this->correlation_key = $options['correlation_key'];
}
}

public function connect()
{
if (empty($this->servers)) {
if (!$this->servers) {
throw new BackendException("No servers specified");
}
$this->connection = new \Predis\Client($this->servers, $this->redis_options);
Expand All @@ -47,7 +78,7 @@ public function connect()
/** @deprecated */
public function add($data=array())
{
if (empty($data)) {
if (!$data) {
throw new BackendException("No data.");
}
$this->push($data);
Expand All @@ -61,21 +92,64 @@ public function push($data)
throw new BackendException("No queue specified.");
}
$encoded_data = json_encode($data);
// Note that we're ignoring the "new length" return value, cos I don't
// see how to make it useful.
$this->getConnection()->rpush($this->queue_name, $encoded_data);
if ($this->order_key) {
if (!$this->correlation_key) {
throw new BackendException("Cannot push to indexed fifo queue without a correlation key.");
}
$key = $data[$this->correlation_key];
if (!$key) {
throw new BackendException("Cannot push to indexed fifo queue without correlation data.");
}
$status = $this->addToIndexedFifoQueue($key, $data);
if (!$status) {
throw new BackendException("Couldn't push to indexed fifo queue.");
}
} else {
// Note that we're ignoring the "new length" return value, cos I don't
// see how to make it useful.
$this->getConnection()->rpush($this->queue_name, $encoded_data);
}
}

/**
* @return array|null
*/
public function pop()
{
$data = null;
$this->beforeGet();
if (!$this->hasQueue()) {
throw new BackendException("No queue specified.");
}
$data = $this->getConnection()->lpop($this->queue_name);
if ($this->order_key) {
// Pop the first element.
//
// Adapted from https://github.com/nrk/predis/blob/v1.0/examples/transaction_using_cas.php
$options = array(
'cas' => true,
'watch' => self::FIFO_INDEX,
'retry' => 3,
);
$order_key = $this->order_key;
$this->getConnection()->transaction($options, function ($tx) use ($order_key, &$data) {
// Look up the first element in the FIFO ordering.
$values = $tx->zrange(self::FIFO_INDEX, 0, 0);
if ($values) {
// Use that value as a key into the key-value block, to get the data.
$key = $values[0];
$data = $tx->get($key);

// Begin transaction.
$tx->multi();

// Remove from both indexes.
$tx->zrem(self::FIFO_INDEX, $key);
$tx->del($key);
}
});
} else {
$data = $this->getConnection()->lpop($this->queue_name);
}
if (!$data) {
return null;
}
Expand Down Expand Up @@ -111,24 +185,30 @@ public function setKey($key=null, $data=null)
/**
* @param string $key
* @param array|string $data
* @return boolean
* @throws \PHPQueue\Exception
*/
public function set($key, $data)
{
if (empty($key) && !is_string($key)) {
if (!$key || !is_string($key)) {
throw new BackendException("Key is invalid.");
}
if (empty($data)) {
if (!$data) {
throw new BackendException("No data.");
}
$this->beforeAdd();
try {
$status = false;
if (is_array($data)) {
if ($this->order_key) {
$status = $this->addToIndexedFifoQueue($key, $data);
} elseif (is_array($data)) {
// FIXME: Assert
$status = $this->getConnection()->hmset($key, $data);
} elseif (is_string($data) || is_numeric($data)) {
$status = $this->getConnection()->set($key, $data);
if ($this->expiry) {
$status = $this->getConnection()->setex($key, $this->expiry, $data);
} else {
$status = $this->getConnection()->set($key, $data);
}
}
if (!$status) {
throw new BackendException("Unable to save data.");
Expand All @@ -138,6 +218,35 @@ public function set($key, $data)
}
}

/**
* Store the data under its order and correlation keys
*
* @param string $key
* @param array $data
*/
protected function addToIndexedFifoQueue($key, $data)
{
$options = array(
'cas' => true,
'watch' => self::FIFO_INDEX,
'retry' => 3,
);
$score = $data[$this->order_key];
$encoded_data = json_encode($data);
$status = false;
$expiry = $this->expiry;
$this->getConnection()->transaction($options, function ($tx) use ($key, $score, $encoded_data, $expiry, &$status) {
$tx->multi();
$tx->zadd(self::FIFO_INDEX, $score, $key);
if ($expiry) {
$status = $tx->setex($key, $expiry, $encoded_data);
} else {
$status = $tx->set($key, $encoded_data);
}
});
return $status;
}

/** @deprecated */
public function getKey($key=null)
{
Expand All @@ -159,6 +268,10 @@ public function get($key=null)
return null;
}
$this->beforeGet($key);
if ($this->order_key) {
$data = $this->getConnection()->get($key);
return json_decode($data, true);
}
$type = $this->getConnection()->type($key);
switch ($type) {
case self::TYPE_STRING:
Expand Down Expand Up @@ -193,7 +306,17 @@ public function clearKey($key=null)
public function clear($key)
{
$this->beforeClear($key);
$num_removed = $this->getConnection()->del($key);

if ($this->order_key) {
$result = $this->getConnection()->pipeline()
->zrem(self::FIFO_INDEX, $key)
->del($key)
->execute();

$num_removed = $result[1];
} else {
$num_removed = $this->getConnection()->del($key);
}

$this->afterClearRelease();

Expand Down
106 changes: 106 additions & 0 deletions test/PHPQueue/Backend/PredisZsetTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php
namespace PHPQueue\Backend;
class PredisZsetTest extends \PHPUnit_Framework_TestCase
{
private $object;

public function setUp()
{
parent::setUp();
if (!class_exists('\Predis\Client')) {
$this->markTestSkipped('Predis not installed');
} else {
$options = array(
'servers' => array('host' => '127.0.0.1', 'port' => 6379)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go somewhere central? (It's repeated in a few places... Just a thought, may not be worth consolidating, tho...)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I started documenting that as CoderKungfu#42

, 'queue' => 'testqueue-' . mt_rand()
, 'score_key' => 'timestamp'
, 'correlation_key' => 'txn_id'
);
$this->object = new Predis($options);
}
}

public function tearDown()
{
if ($this->object) {
$this->object->getConnection()->flushall();
}
parent::tearDown();
}

public function testSet()
{
$key = 'A0001';
$data = array('name' => 'Michael', 'timestamp' => 1);
$this->object->set($key, $data);

$key = 'A0001';
$data = array('name' => 'Michael Cheng', 'timestamp' => 2);
$this->object->set($key, $data);

$key = 'A0002';
$data = array('name' => 'Michael Cheng', 'timestamp' => 3);
$this->object->set($key, $data);
}

public function testGet()
{
$key = 'A0001';
$data1 = array('name' => 'Michael', 'timestamp' => 1);
$this->object->set($key, $data1);

$key = 'A0001';
$data2 = array('name' => 'Michael Cheng', 'timestamp' => 2);
$this->object->set($key, $data2);

$key = 'A0002';
$data3 = array('name' => 'Michael Cheng', 'timestamp' => 3);
$this->object->set($key, $data3);

$result = $this->object->get('A0001');
$this->assertEquals($data2, $result);

$result = $this->object->getKey('A0002');
$this->assertEquals($data3, $result);
}

public function testClear()
{
$key = 'A0002';
$data = array('name' => 'Adam Wight', 'timestamp' => 2718);
$result = $this->object->set($key, $data);

$result = $this->object->clear($key);
$this->assertTrue($result);

$result = $this->object->get($key);
$this->assertNull($result);
}

public function testClearEmpty()
{
$jobId = 'xxx';
$this->assertFalse($this->object->clear($jobId));
}

public function testPushPop()
{
$data = array(
'name' => 'Weezle-' . mt_rand(),
'timestamp' => mt_rand(),
'txn_id' => mt_rand(),
);
$this->object->push($data);

$this->assertEquals($data, $this->object->get($data['txn_id']));

$this->assertEquals($data, $this->object->pop());

$this->assertNull($this->object->get($data['txn_id']));
}

public function testPopEmpty()
{
$this->assertNull($this->object->pop());
}
}