diff --git a/docs/transport/kafka.md b/docs/transport/kafka.md index 116dc2b5b..915f1b08b 100644 --- a/docs/transport/kafka.md +++ b/docs/transport/kafka.md @@ -7,6 +7,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) * [Consume message](#consume-message) +* [Serialize message](#serialize-message) ## Installation @@ -88,4 +89,27 @@ $consumer->acknowledge($message); // $consumer->reject($message); ``` +## Serialize message + +By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/). +For that you have to implement Serializer interface and set it to the context, producer or consumer. +If a serializer set to context it will be injected to all consumers and producers created by the context. + +```php +setSerializer(new FooSerializer()); +``` + [back to index](index.md) \ No newline at end of file diff --git a/pkg/rdkafka/JsonSerializer.php b/pkg/rdkafka/JsonSerializer.php new file mode 100644 index 000000000..763c432a9 --- /dev/null +++ b/pkg/rdkafka/JsonSerializer.php @@ -0,0 +1,45 @@ + $message->getBody(), + 'properties' => $message->getProperties(), + 'headers' => $message->getHeaders(), + ]); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return $json; + } + + /** + * {@inheritdoc} + */ + public function toMessage($string) + { + $data = json_decode($string, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 0cbc48f8a..491da858c 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -9,6 +9,8 @@ class RdKafkaConsumer implements PsrConsumer { + use SerializerAwareTrait; + /** * @var KafkaConsumer */ @@ -38,14 +40,17 @@ class RdKafkaConsumer implements PsrConsumer * @param KafkaConsumer $consumer * @param RdKafkaContext $context * @param RdKafkaTopic $topic + * @param Serializer $serializer */ - public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic) + public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic, Serializer $serializer) { $this->consumer = $consumer; $this->context = $context; $this->topic = $topic; $this->subscribed = false; $this->commitAsync = false; + + $this->setSerializer($serializer); } /** @@ -151,7 +156,7 @@ private function doReceive($timeout) case RD_KAFKA_RESP_ERR__TIMED_OUT: break; case RD_KAFKA_RESP_ERR_NO_ERROR: - $message = RdKafkaMessage::jsonUnserialize($kafkaMessage->payload); + $message = $this->serializer->toMessage($kafkaMessage->payload); $message->setKey($kafkaMessage->key); $message->setPartition($kafkaMessage->partition); $message->setKafkaMessage($kafkaMessage); diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index d8cbfc028..e158423b9 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -12,6 +12,8 @@ class RdKafkaContext implements PsrContext { + use SerializerAwareTrait; + /** * @var array */ @@ -33,6 +35,8 @@ class RdKafkaContext implements PsrContext public function __construct(array $config) { $this->config = $config; + + $this->setSerializer(new JsonSerializer()); } /** @@ -78,7 +82,7 @@ public function createTemporaryQueue() */ public function createProducer() { - return new RdKafkaProducer($this->getProducer()); + return new RdKafkaProducer($this->getProducer(), $this->getSerializer()); } /** @@ -90,7 +94,12 @@ public function createConsumer(PsrDestination $destination) { InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class); - $consumer = new RdKafkaConsumer(new KafkaConsumer($this->getConf()), $this, $destination); + $consumer = new RdKafkaConsumer( + new KafkaConsumer($this->getConf()), + $this, + $destination, + $this->getSerializer() + ); if (isset($this->config['commit_async'])) { $consumer->setCommitAsync($this->config['commit_async']); diff --git a/pkg/rdkafka/RdKafkaMessage.php b/pkg/rdkafka/RdKafkaMessage.php index c5621c13d..aa51a3cc0 100644 --- a/pkg/rdkafka/RdKafkaMessage.php +++ b/pkg/rdkafka/RdKafkaMessage.php @@ -5,6 +5,10 @@ use Interop\Queue\PsrMessage; use RdKafka\Message; +/** + * TODO: \JsonSerializable will be removed in next version (probably 0.8.x) + * The serialization logic was moved to JsonSerializer. + */ class RdKafkaMessage implements PsrMessage, \JsonSerializable { /** @@ -270,11 +274,7 @@ public function setKafkaMessage(Message $message) */ public function jsonSerialize() { - return [ - 'body' => $this->getBody(), - 'properties' => $this->getProperties(), - 'headers' => $this->getHeaders(), - ]; + return (new JsonSerializer())->toString($this); } /** @@ -284,15 +284,6 @@ public function jsonSerialize() */ public static function jsonUnserialize($json) { - $data = json_decode($json, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); - } - - return new self($data['body'], $data['properties'], $data['headers']); + return (new JsonSerializer())->toMessage($json); } } diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index ffad80075..df0185e34 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -11,17 +11,22 @@ class RdKafkaProducer implements PsrProducer { + use SerializerAwareTrait; + /** * @var Producer */ private $producer; /** - * @param Producer $producer + * @param Producer $producer + * @param Serializer $serializer */ - public function __construct(Producer $producer) + public function __construct(Producer $producer, Serializer $serializer) { $this->producer = $producer; + + $this->setSerializer($serializer); } /** @@ -37,7 +42,7 @@ public function send(PsrDestination $destination, PsrMessage $message) $partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA; $key = $message->getKey() ?: $destination->getKey() ?: null; - $payload = json_encode($message); + $payload = $this->serializer->toString($message); $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); $topic->produce($partition, 0 /* must be 0 */, $payload, $key); diff --git a/pkg/rdkafka/Serializer.php b/pkg/rdkafka/Serializer.php new file mode 100644 index 000000000..e1414150f --- /dev/null +++ b/pkg/rdkafka/Serializer.php @@ -0,0 +1,20 @@ +serializer = $serializer; + } + + /** + * @return Serializer + */ + public function getSerializer() + { + return $this->serializer; + } +} diff --git a/pkg/rdkafka/Tests/JsonSerializerTest.php b/pkg/rdkafka/Tests/JsonSerializerTest.php new file mode 100644 index 000000000..2d9cb3451 --- /dev/null +++ b/pkg/rdkafka/Tests/JsonSerializerTest.php @@ -0,0 +1,73 @@ +assertClassImplements(Serializer::class, JsonSerializer::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new JsonSerializer(); + } + + public function testShouldConvertMessageToJsonString() + { + $serializer = new JsonSerializer(); + + $message = new RdKafkaMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); + + $json = $serializer->toString($message); + + $this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json); + } + + public function testThrowIfFailedToEncodeMessageToJson() + { + $serializer = new JsonSerializer(); + + $resource = fopen(__FILE__, 'r'); + + //guard + $this->assertInternalType('resource', $resource); + + $message = new RdKafkaMessage('theBody', ['aProp' => $resource]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toString($message); + } + + public function testShouldConvertJsonStringToMessage() + { + $serializer = new JsonSerializer(); + + $message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}'); + + $this->assertInstanceOf(RdKafkaMessage::class, $message); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties()); + $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders()); + } + + public function testThrowIfFailedToDecodeJsonToMessage() + { + $serializer = new JsonSerializer(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toMessage('{]'); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 3c40873bf..123c336c9 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -6,6 +6,7 @@ use Enqueue\RdKafka\RdKafkaContext; use Enqueue\RdKafka\RdKafkaMessage; use Enqueue\RdKafka\RdKafkaTopic; +use Enqueue\RdKafka\Serializer; use PHPUnit\Framework\TestCase; use RdKafka\KafkaConsumer; use RdKafka\Message; @@ -14,14 +15,24 @@ class RdKafkaConsumerTest extends TestCase { public function testCouldBeConstructedWithRequiredArguments() { - new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic('')); + new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); } public function testShouldReturnQueueSetInConstructor() { $destination = new RdKafkaTopic(''); - $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), $destination); + $consumer = new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); $this->assertSame($destination, $consumer->getQueue()); } @@ -50,7 +61,12 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() ->method('unsubscribe') ; - $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); $this->assertNull($consumer->receive(1000)); } @@ -59,11 +75,11 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() { $destination = new RdKafkaTopic('dest'); - $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); + $expectedMessage = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); $kafkaMessage = new Message(); $kafkaMessage->err = RD_KAFKA_RESP_ERR_NO_ERROR; - $kafkaMessage->payload = json_encode($message); + $kafkaMessage->payload = 'theSerializedMessage'; $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer @@ -82,19 +98,35 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() ->method('unsubscribe') ; - $consumer = new RdKafkaConsumer($kafkaConsumer, $this->createContextMock(), $destination); + $serializer = $this->createSerializerMock(); + $serializer + ->expects($this->once()) + ->method('toMessage') + ->with('theSerializedMessage') + ->willReturn($expectedMessage) + ; + + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $serializer + ); $actualMessage = $consumer->receive(1000); - $this->assertSame('theBody', $actualMessage->getBody()); - $this->assertSame(['foo' => 'fooVal'], $actualMessage->getProperties()); - $this->assertSame(['bar' => 'barVal'], $actualMessage->getHeaders()); + $this->assertSame($actualMessage, $expectedMessage); $this->assertSame($kafkaMessage, $actualMessage->getKafkaMessage()); } public function testShouldThrowExceptionNotImplementedOnReceiveNoWait() { - $consumer = new RdKafkaConsumer($this->createKafkaConsumerMock(), $this->createContextMock(), new RdKafkaTopic('')); + $consumer = new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); $this->expectException(\LogicException::class); $this->expectExceptionMessage('Not implemented'); @@ -102,6 +134,25 @@ public function testShouldThrowExceptionNotImplementedOnReceiveNoWait() $consumer->receiveNoWait(); } + public function testShouldAllowGetPreviouslySetSerializer() + { + $consumer = new RdKafkaConsumer( + $this->createKafkaConsumerMock(), + $this->createContextMock(), + new RdKafkaTopic(''), + $this->createSerializerMock() + ); + + $expectedSerializer = $this->createSerializerMock(); + + //guard + $this->assertNotSame($consumer->getSerializer(), $expectedSerializer); + + $consumer->setSerializer($expectedSerializer); + + $this->assertSame($expectedSerializer, $consumer->getSerializer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|KafkaConsumer */ @@ -117,4 +168,12 @@ private function createContextMock() { return $this->createMock(RdKafkaContext::class); } + + /** + * @return Serializer|\PHPUnit_Framework_MockObject_MockObject|Serializer + */ + private function createSerializerMock() + { + return $this->createMock(Serializer::class); + } } diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php index b07b82987..4758e0bbd 100644 --- a/pkg/rdkafka/Tests/RdKafkaContextTest.php +++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php @@ -3,7 +3,9 @@ namespace Enqueue\RdKafka\Tests; use Enqueue\Null\NullQueue; +use Enqueue\RdKafka\JsonSerializer; use Enqueue\RdKafka\RdKafkaContext; +use Enqueue\RdKafka\Serializer; use Interop\Queue\InvalidDestinationException; use PHPUnit\Framework\TestCase; @@ -25,4 +27,42 @@ public function testThrowInvalidDestinationIfInvalidDestinationGivenOnCreateCons $this->expectException(InvalidDestinationException::class); $context->createConsumer(new NullQueue('aQueue')); } + + public function testShouldSetJsonSerializerInConstructor() + { + $context = new RdKafkaContext([]); + + $this->assertInstanceOf(JsonSerializer::class, $context->getSerializer()); + } + + public function testShouldAllowGetPreviouslySetSerializer() + { + $context = new RdKafkaContext([]); + + $expectedSerializer = $this->createMock(Serializer::class); + + $context->setSerializer($expectedSerializer); + + $this->assertSame($expectedSerializer, $context->getSerializer()); + } + + public function testShouldInjectItsSerializerToProducer() + { + $context = new RdKafkaContext([]); + + $producer = $context->createProducer(); + + $this->assertSame($context->getSerializer(), $producer->getSerializer()); + } + + public function testShouldInjectItsSerializerToConsumer() + { + $context = new RdKafkaContext(['global' => [ + 'group.id' => uniqid('', true), + ]]); + + $producer = $context->createConsumer($context->createQueue('aQueue')); + + $this->assertSame($context->getSerializer(), $producer->getSerializer()); + } } diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index c31c588ae..8a9ee1588 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -7,6 +7,7 @@ use Enqueue\RdKafka\RdKafkaMessage; use Enqueue\RdKafka\RdKafkaProducer; use Enqueue\RdKafka\RdKafkaTopic; +use Enqueue\RdKafka\Serializer; use Interop\Queue\InvalidDestinationException; use Interop\Queue\InvalidMessageException; use PHPUnit\Framework\TestCase; @@ -16,14 +17,14 @@ class RdKafkaProducerTest extends TestCase { - public function testCouldBeConstructedWithKafkaProducerAsFirstArgument() + public function testCouldBeConstructedWithKafkaProducerAndSerializerAsArguments() { - new RdKafkaProducer($this->createKafkaProducerMock()); + new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); } public function testThrowIfDestinationInvalid() { - $producer = new RdKafkaProducer($this->createKafkaProducerMock()); + $producer = new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\RdKafka\RdKafkaTopic but got Enqueue\Null\NullQueue.'); @@ -32,14 +33,14 @@ public function testThrowIfDestinationInvalid() public function testThrowIfMessageInvalid() { - $producer = new RdKafkaProducer($this->createKafkaProducerMock()); + $producer = new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Enqueue\RdKafka\RdKafkaMessage but it is Enqueue\Null\NullMessage.'); $producer->send(new RdKafkaTopic('aQueue'), new NullMessage()); } - public function testShouldJsonEncodeMessageAndPutToExpectedTube() + public function testShouldUseSerializerToEncodeMessageAndPutToExpectedTube() { $message = new RdKafkaMessage('theBody', ['foo' => 'fooVal'], ['bar' => 'barVal']); $message->setKey('key'); @@ -51,7 +52,7 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube() ->with( RD_KAFKA_PARTITION_UA, 0, - '{"body":"theBody","properties":{"foo":"fooVal"},"headers":{"bar":"barVal"}}', + 'theSerializedMessage', 'key' ) ; @@ -64,11 +65,33 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube() ->willReturn($kafkaTopic) ; - $producer = new RdKafkaProducer($kafkaProducer); + $serializer = $this->createSerializerMock(); + $serializer + ->expects($this->once()) + ->method('toString') + ->with($this->identicalTo($message)) + ->willReturn('theSerializedMessage') + ; + + $producer = new RdKafkaProducer($kafkaProducer, $serializer); $producer->send(new RdKafkaTopic('theQueueName'), $message); } + public function testShouldAllowGetPreviouslySetSerializer() + { + $producer = new RdKafkaProducer($this->createKafkaProducerMock(), $this->createSerializerMock()); + + $expectedSerializer = $this->createSerializerMock(); + + //guard + $this->assertNotSame($producer->getSerializer(), $expectedSerializer); + + $producer->setSerializer($expectedSerializer); + + $this->assertSame($expectedSerializer, $producer->getSerializer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|ProducerTopic */ @@ -84,4 +107,12 @@ private function createKafkaProducerMock() { return $this->createMock(Producer::class); } + + /** + * @return Serializer|\PHPUnit_Framework_MockObject_MockObject|Serializer + */ + private function createSerializerMock() + { + return $this->createMock(Serializer::class); + } }