Skip to content

Commit

Permalink
prepare redis interface and impl to consume from multiple queues.
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Aug 13, 2018
1 parent d6d2148 commit 64d907a
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 72 deletions.
30 changes: 20 additions & 10 deletions PRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public function __construct(array $config)
/**
* {@inheritdoc}
*/
public function lpush($key, $value)
public function lpush(string $key, string $value): int
{
try {
$this->redis->lpush($key, [$value]);
return $this->redis->lpush($key, [$value]);
} catch (PRedisServerException $e) {
throw new ServerException('lpush command has failed', null, $e);
}
Expand All @@ -51,12 +51,14 @@ public function lpush($key, $value)
/**
* {@inheritdoc}
*/
public function brpop($key, $timeout)
public function brpop(array $keys, int $timeout): ?RedisResult
{
try {
if ($result = $this->redis->brpop([$key], $timeout)) {
return $result[1];
if ($result = $this->redis->brpop($keys, $timeout)) {
return new RedisResult($result[0], $result[1]);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('brpop command has failed', null, $e);
}
Expand All @@ -65,10 +67,14 @@ public function brpop($key, $timeout)
/**
* {@inheritdoc}
*/
public function rpop($key)
public function rpop(string $key): ?RedisResult
{
try {
return $this->redis->rpop($key);
if ($message = $this->redis->rpop($key)) {
return new RedisResult($key, $message);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('rpop command has failed', null, $e);
}
Expand All @@ -77,8 +83,12 @@ public function rpop($key)
/**
* {@inheritdoc}
*/
public function connect()
public function connect(): void
{
if ($this->redis) {
return;
}

$this->redis = new Client($this->config, ['exceptions' => true]);

if ($this->config['pass']) {
Expand All @@ -91,15 +101,15 @@ public function connect()
/**
* {@inheritdoc}
*/
public function disconnect()
public function disconnect(): void
{
$this->redis->disconnect();
}

/**
* {@inheritdoc}
*/
public function del($key)
public function del(string $key): void
{
$this->redis->del([$key]);
}
Expand Down
82 changes: 44 additions & 38 deletions PhpRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,80 +24,86 @@ public function __construct(array $config)
'port' => null,
'pass' => null,
'user' => null,
'timeout' => null,
'timeout' => .0,
'reserved' => null,
'retry_interval' => null,
'persisted' => false,
'database' => 0,
], $config);

var_dump($this->config);
}

/**
* {@inheritdoc}
*/
public function lpush($key, $value)
public function lpush(string $key, string $value): int
{
if (false == $this->redis->lPush($key, $value)) {
throw new ServerException($this->redis->getLastError());
}
return $this->redis->lPush($key, $value);
}

/**
* {@inheritdoc}
*/
public function brpop($key, $timeout)
public function brpop(array $keys, int $timeout): ?RedisResult
{
if ($result = $this->redis->brPop([$key], $timeout)) {
return $result[1];
if ($result = $this->redis->brPop($keys, $timeout)) {
return new RedisResult($result[0], $result[1]);
}

return null;
}

/**
* {@inheritdoc}
*/
public function rpop($key)
public function rpop(string $key): ?RedisResult
{
return $this->redis->rPop($key);
if ($message = $this->redis->rPop($key)) {
return new RedisResult($key, $message);
}

return null;
}

/**
* {@inheritdoc}
*/
public function connect()
public function connect(): void
{
if (false == $this->redis) {
$this->redis = new \Redis();

if ($this->config['persisted']) {
$this->redis->pconnect(
$this->config['host'],
$this->config['port'],
$this->config['timeout']
);
} else {
$this->redis->connect(
$this->config['host'],
$this->config['port'],
$this->config['timeout'],
$this->config['reserved'],
$this->config['retry_interval']
);
}

if ($this->config['pass']) {
$this->redis->auth($this->config['pass']);
}

$this->redis->select($this->config['database']);
if ($this->redis) {
return;
}

$this->redis = new \Redis();

if ($this->config['persisted']) {
$this->redis->pconnect(
$this->config['host'],
$this->config['port'],
$this->config['timeout']
);
} else {
$this->redis->connect(
$this->config['host'],
$this->config['port'],
$this->config['timeout'],
$this->config['reserved'],
$this->config['retry_interval']
);
}

if ($this->config['pass']) {
$this->redis->auth($this->config['pass']);
}

return $this->redis;
$this->redis->select($this->config['database']);
}

/**
* {@inheritdoc}
*/
public function disconnect()
public function disconnect(): void
{
if ($this->redis) {
$this->redis->close();
Expand All @@ -107,7 +113,7 @@ public function disconnect()
/**
* {@inheritdoc}
*/
public function del($key)
public function del(string $key): void
{
$this->redis->del($key);
}
Expand Down
22 changes: 12 additions & 10 deletions Redis.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

interface Redis
Expand All @@ -10,29 +12,29 @@ interface Redis
*
* @return int length of the list
*/
public function lpush($key, $value);
public function lpush(string $key, string $value): int;

/**
* @param string $key
* @param int $timeout in seconds
* @param string[] $keys
* @param int $timeout in seconds
*
* @return string|null
* @return RedisResult|null
*/
public function brpop($key, $timeout);
public function brpop(array $keys, int $timeout): ?RedisResult;

/**
* @param string $key
*
* @return string|null
* @return RedisResult|null
*/
public function rpop($key);
public function rpop(string $key): ?RedisResult;

public function connect();
public function connect(): void;

public function disconnect();
public function disconnect(): void;

/**
* @param string $key
*/
public function del($key);
public function del(string $key): void;
}
6 changes: 5 additions & 1 deletion RedisConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ private function parseDsn($dsn)
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}

if (array_key_exists('port', $config)) {
$config['port'] = (int) $config['port'];
}

if ($query = parse_url($dsn, PHP_URL_QUERY)) {
$queryConfig = [];
parse_str($query, $queryConfig);
Expand All @@ -159,7 +163,7 @@ private function defaultConfig()
return [
'host' => 'localhost',
'port' => 6379,
'timeout' => null,
'timeout' => .0,
'reserved' => null,
'retry_interval' => null,
'vendor' => 'phpredis',
Expand Down
18 changes: 9 additions & 9 deletions RedisConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public function receive($timeout = 0)
{
$timeout = (int) ($timeout / 1000);
if (empty($timeout)) {
// Caused by
// Predis\Response\ServerException: ERR timeout is not an integer or out of range
// /mqdev/vendor/predis/predis/src/Client.php:370

return $this->receiveNoWait();
while (true) {
if ($message = $this->receive(5000)) {
return $message;
}
}
}

if ($message = $this->getRedis()->brpop($this->queue->getName(), $timeout)) {
return RedisMessage::jsonUnserialize($message);
if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
}

Expand All @@ -66,8 +66,8 @@ public function receive($timeout = 0)
*/
public function receiveNoWait()
{
if ($message = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($message);
if ($result = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
}

Expand Down
32 changes: 32 additions & 0 deletions RedisResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Enqueue\Redis;

class RedisResult
{
/**
* @var string
*/
private $key;

/**
* @var string
*/
private $message;

public function __construct(string $key, string $message)
{
$this->key = $key;
$this->message = $message;
}

public function getKey(): string
{
return $this->key;
}

public function getMessage(): string
{
return $this->message;
}
}
Loading

0 comments on commit 64d907a

Please sign in to comment.