From 2224593d699bc7dad77ed5547a1080af9e8652dd Mon Sep 17 00:00:00 2001 From: gggeek Date: Sat, 7 Nov 2015 22:45:13 +0000 Subject: [PATCH] Add compatibility with queueingbundle 0.4 --- Adapter/SQS/Consumer.php | 12 ++++++++++-- Adapter/SQS/Driver.php | 5 ++--- Adapter/SQS/Message.php | 19 +++++++++++++++---- news.md | 13 +++++++++++++ 4 files changed, 40 insertions(+), 9 deletions(-) create mode 100644 news.md diff --git a/Adapter/SQS/Consumer.php b/Adapter/SQS/Consumer.php index 27c1310..4b10f19 100644 --- a/Adapter/SQS/Consumer.php +++ b/Adapter/SQS/Consumer.php @@ -16,6 +16,7 @@ class Consumer implements ConsumerInterface /** @var \Aws\Sqs\SqsClient */ protected $client; protected $queueUrl; + protected $queueName; protected $callback; protected $requestBatchSize = 1; protected $routingKey; @@ -105,6 +106,13 @@ public function setRequestBatchSize($amount) return $this; } + public function setQueueName($queueName) + { + $this->queueName = $queueName; + + return $this; + } + /** * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#receivemessage * Will throw an exception if $amount is > 10.000 @@ -163,13 +171,13 @@ public function consume($amount, $timeout=0) $message['MessageAttributes'][$this->contentTypeAttribute]['StringValue'] : ''; if ($contentType != '') { - $this->callback->receive(new Message($data, $message, $contentType)); + $this->callback->receive(new Message($data, $message, $contentType, $this->queueName)); } else { if ($this->logger) { $this->logger->warning('The SQS Consumer received a message with no content-type attribute. Assuming default'); } - $this->callback->receive(new Message($data, $message)); + $this->callback->receive(new Message($data, $message, null, $this->queueName)); } } } diff --git a/Adapter/SQS/Driver.php b/Adapter/SQS/Driver.php index 2463062..b48891a 100644 --- a/Adapter/SQS/Driver.php +++ b/Adapter/SQS/Driver.php @@ -21,7 +21,6 @@ class Driver extends ContainerAware implements DriverInterface */ public function getProducer($queueName) { - return $this->container->get("kaliop_queueing.sqs.{$queueName}_producer")->setDebug($this->debug); } @@ -37,7 +36,7 @@ public function getProducer($queueName) */ public function getConsumer($queueName, MessageConsumerInterface $callback = null) { - return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug); + return $this->container->get("kaliop_queueing.sqs.{$queueName}_consumer")->setDebug($this->debug)->setQueueName($queueName); } public function acceptMessage($message) @@ -126,7 +125,7 @@ public function createConsumer($queueName, $queueUrl, $connectionId, $callback=n { $class = $this->container->getParameter('kaliop_queueing.sqs.consumer.class'); $consumer = new $class($this->getConnectionConfig($connectionId)); - $consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey); + $consumer->setQueueUrl($queueUrl)->setRoutingKey($routingKey)->setQueueName($queueName); if ($callback != null) { $consumer->setCallBack($callback); } diff --git a/Adapter/SQS/Message.php b/Adapter/SQS/Message.php index d3c3b1a..a6f2c17 100644 --- a/Adapter/SQS/Message.php +++ b/Adapter/SQS/Message.php @@ -8,13 +8,17 @@ class Message implements MessageInterface { protected $body; protected $properties = array(); - protected $contentType; + protected $contentType = 'application/json'; + protected $queueName; - public function __construct($body, array $properties = array(), $contentType='application/json') + public function __construct($body, array $properties = array(), $contentType = null, $queueName = '') { $this->body = $body; $this->properties = $properties; - $this->contentType = $contentType; + if ($contentType != null) { + $this->contentType = $contentType; + } + $this->queueName = $queueName; } public function getBody() @@ -23,7 +27,6 @@ public function getBody() } /** - * This is hardcoded because * @return string */ public function getContentType() @@ -31,6 +34,14 @@ public function getContentType() return $this->contentType; } + /** + * @return string + */ + public function getQueueName() + { + return $this->queueName; + } + /** * Check whether a property exists in the 'properties' dictionary * @param string $name diff --git a/news.md b/news.md new file mode 100644 index 0000000..1c67fac --- /dev/null +++ b/news.md @@ -0,0 +1,13 @@ +# Ver 0.3 + +* NEW: introduce compatibility with queueingbundle 0.4 + + +# Ver 0.2.1 + +* FIXED: php error when received messages miss the content-type attribute + + +# Ver 0.2 + +* first release announced to the world