From 71afeae32155128edade012fdb947184b73b6070 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 30 Aug 2017 11:27:19 +0300 Subject: [PATCH 1/5] [rdkafka] Add abilito change the way a message is serialized. --- pkg/rdkafka/JsonSerializer.php | 45 +++++++++++++ pkg/rdkafka/RdKafkaConsumer.php | 9 ++- pkg/rdkafka/RdKafkaContext.php | 13 +++- pkg/rdkafka/RdKafkaMessage.php | 21 ++---- pkg/rdkafka/RdKafkaProducer.php | 11 +++- pkg/rdkafka/Serializer.php | 20 ++++++ pkg/rdkafka/SerializerAwareTrait.php | 27 ++++++++ pkg/rdkafka/Tests/JsonSerializerTest.php | 68 +++++++++++++++++++ pkg/rdkafka/Tests/RdKafkaConsumerTest.php | 79 ++++++++++++++++++++--- pkg/rdkafka/Tests/RdKafkaContextTest.php | 38 +++++++++++ pkg/rdkafka/Tests/RdKafkaProducerTest.php | 45 +++++++++++-- 11 files changed, 337 insertions(+), 39 deletions(-) create mode 100644 pkg/rdkafka/JsonSerializer.php create mode 100644 pkg/rdkafka/Serializer.php create mode 100644 pkg/rdkafka/SerializerAwareTrait.php create mode 100644 pkg/rdkafka/Tests/JsonSerializerTest.php diff --git a/pkg/rdkafka/JsonSerializer.php b/pkg/rdkafka/JsonSerializer.php new file mode 100644 index 000000000..763c432a9 --- /dev/null +++ b/pkg/rdkafka/JsonSerializer.php @@ -0,0 +1,45 @@ + $message->getBody(), + 'properties' => $message->getProperties(), + 'headers' => $message->getHeaders(), + ]); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return $json; + } + + /** + * {@inheritdoc} + */ + public function toMessage($string) + { + $data = json_decode($string, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 0cbc48f8a..491da858c 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -9,6 +9,8 @@ class RdKafkaConsumer implements PsrConsumer { + use SerializerAwareTrait; + /** * @var KafkaConsumer */ @@ -38,14 +40,17 @@ class RdKafkaConsumer implements PsrConsumer * @param KafkaConsumer $consumer * @param RdKafkaContext $context * @param RdKafkaTopic $topic + * @param Serializer $serializer */ - public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic) + public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic, Serializer $serializer) { $this->consumer = $consumer; $this->context = $context; $this->topic = $topic; $this->subscribed = false; $this->commitAsync = false; + + $this->setSerializer($serializer); } /** @@ -151,7 +156,7 @@ private function doReceive($timeout) case RD_KAFKA_RESP_ERR__TIMED_OUT: break; case RD_KAFKA_RESP_ERR_NO_ERROR: - $message = RdKafkaMessage::jsonUnserialize($kafkaMessage->payload); + $message = $this->serializer->toMessage($kafkaMessage->payload); $message->setKey($kafkaMessage->key); $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index d8cbfc028..e158423b9 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -12,6 +12,8 @@ class RdKafkaContext implements PsrContext { + use SerializerAwareTrait; + /** * @var array */ @@ -33,6 +35,8 @@ class RdKafkaContext implements PsrContext public function __construct(array $config) { $this->config = $config; + + $this->setSerializer(new JsonSerializer()); } /** @@ -78,7 +82,7 @@ public function createTemporaryQueue() */ public function createProducer() { - return new RdKafkaProducer($this->getProducer()); + return new RdKafkaProducer($this->getProducer(), $this->getSerializer()); } /** @@ -90,7 +94,12 @@ public function createConsumer(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class); - $consumer = new RdKafkaConsumer(new KafkaConsumer($this->getConf()), $this, $destination); + $consumer = new RdKafkaConsumer( + new KafkaConsumer($this->getConf()), + $this, + $destination, + $this->getSerializer() + ); if (isset($this->config['commit_async'])) { $consumer->setCommitAsync($this->config['commit_async']); diff --git a/pkg/rdkafka/RdKafkaMessage.php b/pkg/rdkafka/RdKafkaMessage.php index c5621c13d..aa51a3cc0 100644 --- a/pkg/rdkafka/RdKafkaMessage.php +++ b/pkg/rdkafka/RdKafkaMessage.php @@ -5,6 +5,10 @@ use Interop\Queue\PsrMessage; use RdKafka\Message; +/** + * TODO: \JsonSerializable will be removed in next version (probably 0.8.x) + * The serialization logic was moved to JsonSerializer. + */ class RdKafkaMessage implements PsrMessage, \JsonSerializable { /** @@ -270,11 +274,7 @@ public function setKafkaMessage(Message $message) */ public function jsonSerialize() { - return [ - 'body' => $this->getBody(), - 'properties' => $this->getProperties(), - 'headers' => $this->getHeaders(), - ]; + return (new JsonSerializer())->toString($this); } /** @@ -284,15 +284,6 @@ public function jsonSerialize() */ public static function jsonUnserialize($json) { - $data = json_decode($json, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); - } - - return new self($data['body'], $data['properties'], $data['headers']); + return (new JsonSerializer())->toMessage($json); } } diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index ffad80075..df0185e34 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -11,17 +11,22 @@ class RdKafkaProducer implements PsrProducer { + use SerializerAwareTrait; + /** * @var Producer */ private $producer; /** - * @param Producer $producer + * @param Producer $producer + * @param Serializer $serializer */ - public function __construct(Producer $producer) + public function __construct(Producer $producer, Serializer $serializer) { $this->producer = $producer; + + $this->setSerializer($serializer); } /** @@ -37,7 +42,7 @@ public function send(PsrDestination $destination, PsrMessage $message) $partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA; $key = $message->getKey() ?: $destination->getKey() ?: null; - $payload = json_encode($message); + $payload = $this->serializer->toString($message); $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); $topic->produce($partition, 0 /* must be 0 */, $payload, $key); diff --git a/pkg/rdkafka/Serializer.php b/pkg/rdkafka/Serializer.php new file mode 100644 index 000000000..e1414150f --- /dev/null +++ b/pkg/rdkafka/Serializer.php @@ -0,0 +1,20 @@ +serializer = $serializer; + } + + /** + * @return Serializer + */ + public function getSerializer() + { + return $this->serializer; + } +} diff --git a/pkg/rdkafka/Tests/JsonSerializerTest.php b/pkg/rdkafka/Tests/JsonSerializerTest.php new file mode 100644 index 000000000..033ee1ced --- /dev/null +++ b/pkg/rdkafka/Tests/JsonSerializerTest.php @@ -0,0 +1,68 @@ +assertClassImplements(Serializer::class, JsonSerializer::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new JsonSerializer(); + } + + public function testShouldConvertMessageToJsonString() + { + $serializer = new JsonSerializer(); + + $message = new RdKafkaMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); + + $json = $serializer->toString($message); + + $this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json); + } + + public function testThrowIfFailedToEncodeMessageToJson() + { + $serializer = new JsonSerializer(); + + $message = new RdKafkaMessage('theBody', ['aProp' => STDIN]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given. Error 8 and message Type is not supported'); + $serializer->toString($message); + } + + public function testShouldConvertJsonStringToMessage() + { + $serializer = new JsonSerializer(); + + $message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}'); + + $this->assertInstanceOf(RdKafkaMessage::class, $message); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties()); + $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders()); + } + + public function testThrowIfFailedToDecodeJsonToMessage() + { + $serializer = new JsonSerializer(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given. Error 2 and message State mismatch (invalid or malformed JSON)'); + $serializer->toMessage('{]'); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 3c40873bf..123c336c9 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -6,6 +6,7 @@ use Enqueue\RdKafka\RdKafkaContext; use Enqueue\RdKafka\RdKafkaMessage; use Enqueue\RdKafka\RdKafkaTopic; +use Enqueue\RdKafka\Serializer; use PHPUnit\Framework\TestCase; use RdKafka\KafkaConsumer; use RdKafka\Message; @@ -14,14 +15,24 @@ class RdKafkaConsumerTest extends TestCase { public function testCouldBeConstructedWithRequiredArguments() { - new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic('')); + new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); } public function testShouldReturnQueueSetInConstructor() { $destination = new RdKafkaTopic(''); - $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), $destination); + $consumer = new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); $this->assertSame($destination, $consumer->getQueue()); } @@ -50,7 +61,12 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() ->method('unsubscribe') ; - $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); $this->assertNull($consumer->receive(1000)); } @@ -59,11 +75,11 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() { $destination = new RdKafkaTopic('dest'); - $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $expectedMessage = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); $kafkaMessage = new Message(); $kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; - $kafkaMessage->payload = json_encode($message); + $kafkaMessage->payload = 'theSerializedMessage'; $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer @@ -82,19 +98,35 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() ->method('unsubscribe') ; - $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + $serializer = $this->createSerializerMock(); + $serializer + ->expects($this->once()) + ->method('toMessage') + ->with('theSerializedMessage') + ->willReturn($expectedMessage) + ; + + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $serializer + ); $actualMessage = $consumer->receive(1000); - $this->assertSame('theBody', $actualMessage->getBody()); - $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties()); - $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders()); + $this->assertSame($actualMessage, $expectedMessage); $this->assertSame($kafkaMessage, $actualMessage->getKafkaMessage()); } public function testShouldThrowExceptionNotImplementedOnReceiveNoWait() { - $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic('')); + $consumer = new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Not implemented'); @@ -102,6 +134,25 @@ public function testShouldThrowExceptionNotImplementedOnReceiveNoWait() $consumer->receiveNoWait(); } + public function testShouldAllowGetPreviouslySetSerializer() + { + $consumer = new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); + + $expectedSerializer = $this->createSerializerMock(); + + //guard + $this->assertNotSame($consumer->getSerializer(), $expectedSerializer); + + $consumer->setSerializer($expectedSerializer); + + $this->assertSame($expectedSerializer, $consumer->getSerializer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|KafkaConsumer */ @@ -117,4 +168,12 @@ private function createContextMock() { return $this->createMock(RdKafkaContext::class); } + + /** + * @return Serializer|\PHPUnit_Framework_MockObject_MockObject|Serializer + */ + private function createSerializerMock() + { + return $this->createMock(Serializer::class); + } } diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php index b07b82987..da42750fd 100644 --- a/pkg/rdkafka/Tests/RdKafkaContextTest.php +++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php @@ -3,7 +3,9 @@ namespace Enqueue\RdKafka\Tests; use Enqueue\Null\NullQueue; +use Enqueue\RdKafka\JsonSerializer; use Enqueue\RdKafka\RdKafkaContext; +use Enqueue\RdKafka\Serializer; use Interop\Queue\InvalidDestinationException; use PHPUnit\Framework\TestCase; @@ -25,4 +27,40 @@ public function testThrowInvalidDestinationIfInvalidDestinationGivenOnCreateCons $this->expectException(InvalidDestinationException::class); $context->createConsumer(new NullQueue('aQueue')); } + + public function testShouldSetJsonSerializerInConstructor() + { + $context = new RdKafkaContext([]); + + $this->assertInstanceOf(JsonSerializer::class, $context->getSerializer()); + } + + public function testShouldAllowGetPreviouslySetSerializer() + { + $context = new RdKafkaContext([]); + + $expectedSerializer = $this->createMock(Serializer::class); + + $context->setSerializer($expectedSerializer); + + $this->assertSame($expectedSerializer, $context->getSerializer()); + } + + public function testShouldInjectItsSerializerToProducer() + { + $context = new RdKafkaContext([]); + + $producer = $context->createProducer(); + + $this->assertSame($context->getSerializer(), $producer->getSerializer()); + } + + public function testShouldInjectItsSerializerToConsumer() + { + $context = new RdKafkaContext([]); + + $producer = $context->createConsumer($context->createQueue('aQueue')); + + $this->assertSame($context->getSerializer(), $producer->getSerializer()); + } } diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index c31c588ae..8a9ee1588 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -7,6 +7,7 @@ use Enqueue\RdKafka\RdKafkaMessage; use Enqueue\RdKafka\RdKafkaProducer; use Enqueue\RdKafka\RdKafkaTopic; +use Enqueue\RdKafka\Serializer; use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; use PHPUnit\Framework\TestCase; @@ -16,14 +17,14 @@ class RdKafkaProducerTest extends TestCase { - public function testCouldBeConstructedWithKafkaProducerAsFirstArgument() + public function testCouldBeConstructedWithKafkaProducerAndSerializerAsArguments() { - new RdKafkaProducer($this->createKafkaProducerMock()); + new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); } public function testThrowIfDestinationInvalid() { - $producer = new RdKafkaProducer($this->createKafkaProducerMock()); + $producer = new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\RdKafka\RdKafkaTopic but got Enqueue\Null\NullQueue.'); @@ -32,14 +33,14 @@ public function testThrowIfDestinationInvalid() public function testThrowIfMessageInvalid() { - $producer = new RdKafkaProducer($this->createKafkaProducerMock()); + $producer = new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Enqueue\RdKafka\RdKafkaMessage but it is Enqueue\Null\NullMessage.'); $producer->send(new RdKafkaTopic('aQueue'), new NullMessage()); } - public function testShouldJsonEncodeMessageAndPutToExpectedTube() + public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube() { $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); $message->setKey('key'); @@ -51,7 +52,7 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube() ->with( RD_KAFKA_PARTITION_UA, 0, - '{"body":"theBody","properties":{"foo":"fooVal"},"headers":{"bar":"barVal"}}', + 'theSerializedMessage', 'key' ) ; @@ -64,11 +65,33 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube() ->willReturn($kafkaTopic) ; - $producer = new RdKafkaProducer($kafkaProducer); + $serializer = $this->createSerializerMock(); + $serializer + ->expects($this->once()) + ->method('toString') + ->with($this->identicalTo($message)) + ->willReturn('theSerializedMessage') + ; + + $producer = new RdKafkaProducer($kafkaProducer, $serializer); $producer->send(new RdKafkaTopic('theQueueName'), $message); } + public function testShouldAllowGetPreviouslySetSerializer() + { + $producer = new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); + + $expectedSerializer = $this->createSerializerMock(); + + //guard + $this->assertNotSame($producer->getSerializer(), $expectedSerializer); + + $producer->setSerializer($expectedSerializer); + + $this->assertSame($expectedSerializer, $producer->getSerializer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|ProducerTopic */ @@ -84,4 +107,12 @@ private function createKafkaProducerMock() { return $this->createMock(Producer::class); } + + /** + * @return Serializer|\PHPUnit_Framework_MockObject_MockObject|Serializer + */ + private function createSerializerMock() + { + return $this->createMock(Serializer::class); + } } From 00b3bc5fd1853cae8de7f0c441aec00e2cf3e82d Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 30 Aug 2017 12:38:50 +0300 Subject: [PATCH 2/5] [rdkafka] fix tests. --- pkg/rdkafka/Tests/JsonSerializerTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Tests/JsonSerializerTest.php b/pkg/rdkafka/Tests/JsonSerializerTest.php index 033ee1ced..c0626c2be 100644 --- a/pkg/rdkafka/Tests/JsonSerializerTest.php +++ b/pkg/rdkafka/Tests/JsonSerializerTest.php @@ -40,7 +40,7 @@ public function testThrowIfFailedToEncodeMessageToJson() $message = new RdKafkaMessage('theBody', ['aProp' => STDIN]); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The malformed json given. Error 8 and message Type is not supported'); + $this->expectExceptionMessage('The malformed json given.'); $serializer->toString($message); } @@ -62,7 +62,7 @@ public function testThrowIfFailedToDecodeJsonToMessage() $serializer = new JsonSerializer(); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The malformed json given. Error 2 and message State mismatch (invalid or malformed JSON)'); + $this->expectExceptionMessage('The malformed json given.'); $serializer->toMessage('{]'); } } From f1601fdb9ad69b2e55b6dd805e8816a582c8caf7 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 30 Aug 2017 12:38:58 +0300 Subject: [PATCH 3/5] [rdkafka] add docs --- docs/transport/kafka.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/transport/kafka.md b/docs/transport/kafka.md index 116dc2b5b..915f1b08b 100644 --- a/docs/transport/kafka.md +++ b/docs/transport/kafka.md @@ -7,6 +7,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) * [Consume message](#consume-message) +* [Serialize message](#serialize-message) ## Installation @@ -88,4 +89,27 @@ $consumer->acknowledge($message); // $consumer->reject($message); ``` +## Serialize message + +By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/). +For that you have to implement Serializer interface and set it to the context, producer or consumer. +If a serializer set to context it will be injected to all consumers and producers created by the context. + +```php +setSerializer(new FooSerializer()); +``` + [back to index](index.md) \ No newline at end of file From 387f5c6f6fbb8b0c4cc64c4fa95d87b912de9fa6 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 31 Aug 2017 11:44:35 +0300 Subject: [PATCH 4/5] fix test --- pkg/rdkafka/Tests/RdKafkaContextTest.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php index da42750fd..4758e0bbd 100644 --- a/pkg/rdkafka/Tests/RdKafkaContextTest.php +++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php @@ -57,7 +57,9 @@ public function testShouldInjectItsSerializerToProducer() public function testShouldInjectItsSerializerToConsumer() { - $context = new RdKafkaContext([]); + $context = new RdKafkaContext(['global' => [ + 'group.id' => uniqid('', true), + ]]); $producer = $context->createConsumer($context->createQueue('aQueue')); From 4132833e8e6a46b90af8ecb3674e9e2604b37e21 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 31 Aug 2017 11:46:58 +0300 Subject: [PATCH 5/5] attempt fix test. --- pkg/rdkafka/Tests/JsonSerializerTest.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/Tests/JsonSerializerTest.php b/pkg/rdkafka/Tests/JsonSerializerTest.php index c0626c2be..2d9cb3451 100644 --- a/pkg/rdkafka/Tests/JsonSerializerTest.php +++ b/pkg/rdkafka/Tests/JsonSerializerTest.php @@ -37,7 +37,12 @@ public function testThrowIfFailedToEncodeMessageToJson() { $serializer = new JsonSerializer(); - $message = new RdKafkaMessage('theBody', ['aProp' => STDIN]); + $resource = fopen(__FILE__, 'r'); + + //guard + $this->assertInternalType('resource', $resource); + + $message = new RdKafkaMessage('theBody', ['aProp' => $resource]); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The malformed json given.');