Skip to content

Commit

Permalink
Add compatibility with queueingbundle 0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
gggeek committed Nov 7, 2015
1 parent 36fe4fe commit 2224593
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 9 deletions.
12 changes: 10 additions & 2 deletions Adapter/SQS/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Consumer implements ConsumerInterface
/** @var \Aws\Sqs\SqsClient */
protected $client;
protected $queueUrl;
protected $queueName;
protected $callback;
protected $requestBatchSize = 1;
protected $routingKey;
Expand Down Expand Up @@ -105,6 +106,13 @@ public function setRequestBatchSize($amount)
return $this;
}

public function setQueueName($queueName)
{
$this->queueName = $queueName;

return $this;
}

/**
* @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage
* Will throw an exception if $amount is > 10.000
Expand Down Expand Up @@ -163,13 +171,13 @@ public function consume($amount, $timeout=0)
$message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : '';

if ($contentType != '') {
$this->callback->receive(new Message($data, $message, $contentType));
$this->callback->receive(new Message($data, $message, $contentType, $this->queueName));
} else {
if ($this->logger) {
$this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default');
}

$this->callback->receive(new Message($data, $message));
$this->callback->receive(new Message($data, $message, null, $this->queueName));
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions Adapter/SQS/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class Driver extends ContainerAware implements DriverInterface
*/
public function getProducer($queueName)
{

return $this->container->get("kaliop_queueing.sqs.{$queueName}_producer")->setDebug($this->debug);
}

Expand All @@ -37,7 +36,7 @@ public function getProducer($queueName)
*/
public function getConsumer($queueName, MessageConsumerInterface $callback = null)
{
return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug);
return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug)->setQueueName($queueName);
}

public function acceptMessage($message)
Expand Down Expand Up @@ -126,7 +125,7 @@ public function createConsumer($queueName, $queueUrl, $connectionId, $callback=n
{
$class = $this->container->getParameter('kaliop_queueing.sqs.consumer.class');
$consumer = new $class($this->getConnectionConfig($connectionId));
$consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey);
$consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey)->setQueueName($queueName);
if ($callback != null) {
$consumer->setCallBack($callback);
}
Expand Down
19 changes: 15 additions & 4 deletions Adapter/SQS/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ class Message implements MessageInterface
{
protected $body;
protected $properties = array();
protected $contentType;
protected $contentType = 'application/json';
protected $queueName;

public function __construct($body, array $properties = array(), $contentType='application/json')
public function __construct($body, array $properties = array(), $contentType = null, $queueName = '')
{
$this->body = $body;
$this->properties = $properties;
$this->contentType = $contentType;
if ($contentType != null) {
$this->contentType = $contentType;
}
$this->queueName = $queueName;
}

public function getBody()
Expand All @@ -23,14 +27,21 @@ public function getBody()
}

/**
* This is hardcoded because
* @return string
*/
public function getContentType()
{
return $this->contentType;
}

/**
* @return string
*/
public function getQueueName()
{
return $this->queueName;
}

/**
* Check whether a property exists in the 'properties' dictionary
* @param string $name
Expand Down
13 changes: 13 additions & 0 deletions news.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Ver 0.3

* NEW: introduce compatibility with queueingbundle 0.4


# Ver 0.2.1

* FIXED: php error when received messages miss the content-type attribute


# Ver 0.2

* first release announced to the world

0 comments on commit 2224593

Please sign in to comment.