Skip to content

Commit

Permalink
Merge pull request #188 from php-enqueue/kafka-serializer
Browse files Browse the repository at this point in the history
[rdkafka] Add abilito change the way a message is serialized.
  • Loading branch information
makasim authored Aug 31, 2017
2 parents 009a5c1 + 4132833 commit 2b1094c
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 39 deletions.
24 changes: 24 additions & 0 deletions docs/transport/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Consume message](#consume-message)
* [Serialize message](#serialize-message)

## Installation

Expand Down Expand Up @@ -88,4 +89,27 @@ $consumer->acknowledge($message);
// $consumer->reject($message);
```

## Serialize message

By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/).
For that you have to implement Serializer interface and set it to the context, producer or consumer.
If a serializer set to context it will be injected to all consumers and producers created by the context.

```php
<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
public function toMessage($string) {}

public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */

$psrContext->setSerializer(new FooSerializer());
```

[back to index](index.md)
45 changes: 45 additions & 0 deletions pkg/rdkafka/JsonSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

namespace Enqueue\RdKafka;

class JsonSerializer implements Serializer
{
/**
* {@inheritdoc}
*/
public function toString(RdKafkaMessage $message)
{
$json = json_encode([
'body' => $message->getBody(),
'properties' => $message->getProperties(),
'headers' => $message->getHeaders(),
]);

if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return $json;
}

/**
* {@inheritdoc}
*/
public function toMessage($string)
{
$data = json_decode($string, true);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']);
}
}
9 changes: 7 additions & 2 deletions pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

class RdKafkaConsumer implements PsrConsumer
{
use SerializerAwareTrait;

/**
* @var KafkaConsumer
*/
Expand Down Expand Up @@ -38,14 +40,17 @@ class RdKafkaConsumer implements PsrConsumer
* @param KafkaConsumer $consumer
* @param RdKafkaContext $context
* @param RdKafkaTopic $topic
* @param Serializer $serializer
*/
public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic)
public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic, Serializer $serializer)
{
$this->consumer = $consumer;
$this->context = $context;
$this->topic = $topic;
$this->subscribed = false;
$this->commitAsync = false;

$this->setSerializer($serializer);
}

/**
Expand Down Expand Up @@ -151,7 +156,7 @@ private function doReceive($timeout)
case RD_KAFKA_RESP_ERR__TIMED_OUT:
break;
case RD_KAFKA_RESP_ERR_NO_ERROR:
$message = RdKafkaMessage::jsonUnserialize($kafkaMessage->payload);
$message = $this->serializer->toMessage($kafkaMessage->payload);
$message->setKey($kafkaMessage->key);
$message->setPartition($kafkaMessage->partition);
$message->setKafkaMessage($kafkaMessage);
Expand Down
13 changes: 11 additions & 2 deletions pkg/rdkafka/RdKafkaContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

class RdKafkaContext implements PsrContext
{
use SerializerAwareTrait;

/**
* @var array
*/
Expand All @@ -33,6 +35,8 @@ class RdKafkaContext implements PsrContext
public function __construct(array $config)
{
$this->config = $config;

$this->setSerializer(new JsonSerializer());
}

/**
Expand Down Expand Up @@ -78,7 +82,7 @@ public function createTemporaryQueue()
*/
public function createProducer()
{
return new RdKafkaProducer($this->getProducer());
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
}

/**
Expand All @@ -90,7 +94,12 @@ public function createConsumer(PsrDestination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);

$consumer = new RdKafkaConsumer(new KafkaConsumer($this->getConf()), $this, $destination);
$consumer = new RdKafkaConsumer(
new KafkaConsumer($this->getConf()),
$this,
$destination,
$this->getSerializer()
);

if (isset($this->config['commit_async'])) {
$consumer->setCommitAsync($this->config['commit_async']);
Expand Down
21 changes: 6 additions & 15 deletions pkg/rdkafka/RdKafkaMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
use Interop\Queue\PsrMessage;
use RdKafka\Message;

/**
* TODO: \JsonSerializable will be removed in next version (probably 0.8.x)
* The serialization logic was moved to JsonSerializer.
*/
class RdKafkaMessage implements PsrMessage, \JsonSerializable
{
/**
Expand Down Expand Up @@ -270,11 +274,7 @@ public function setKafkaMessage(Message $message)
*/
public function jsonSerialize()
{
return [
'body' => $this->getBody(),
'properties' => $this->getProperties(),
'headers' => $this->getHeaders(),
];
return (new JsonSerializer())->toString($this);
}

/**
Expand All @@ -284,15 +284,6 @@ public function jsonSerialize()
*/
public static function jsonUnserialize($json)
{
$data = json_decode($json, true);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return new self($data['body'], $data['properties'], $data['headers']);
return (new JsonSerializer())->toMessage($json);
}
}
11 changes: 8 additions & 3 deletions pkg/rdkafka/RdKafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@

class RdKafkaProducer implements PsrProducer
{
use SerializerAwareTrait;

/**
* @var Producer
*/
private $producer;

/**
* @param Producer $producer
* @param Producer $producer
* @param Serializer $serializer
*/
public function __construct(Producer $producer)
public function __construct(Producer $producer, Serializer $serializer)
{
$this->producer = $producer;

$this->setSerializer($serializer);
}

/**
Expand All @@ -37,7 +42,7 @@ public function send(PsrDestination $destination, PsrMessage $message)

$partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
$key = $message->getKey() ?: $destination->getKey() ?: null;
$payload = json_encode($message);
$payload = $this->serializer->toString($message);

$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
Expand Down
20 changes: 20 additions & 0 deletions pkg/rdkafka/Serializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Enqueue\RdKafka;

interface Serializer
{
/**
* @param RdKafkaMessage $message
*
* @return string
*/
public function toString(RdKafkaMessage $message);

/**
* @param string $string
*
* @return RdKafkaMessage
*/
public function toMessage($string);
}
27 changes: 27 additions & 0 deletions pkg/rdkafka/SerializerAwareTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Enqueue\RdKafka;

trait SerializerAwareTrait
{
/**
* @var Serializer
*/
private $serializer;

/**
* @param Serializer $serializer
*/
public function setSerializer(Serializer $serializer)
{
$this->serializer = $serializer;
}

/**
* @return Serializer
*/
public function getSerializer()
{
return $this->serializer;
}
}
73 changes: 73 additions & 0 deletions pkg/rdkafka/Tests/JsonSerializerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Enqueue\RdKafka\Tests;

use Enqueue\RdKafka\JsonSerializer;
use Enqueue\RdKafka\RdKafkaMessage;
use Enqueue\RdKafka\Serializer;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class JsonSerializerTest extends TestCase
{
use ClassExtensionTrait;

public function testShouldImplementSerializerInterface()
{
$this->assertClassImplements(Serializer::class, JsonSerializer::class);
}

public function testCouldBeConstructedWithoutAnyArguments()
{
new JsonSerializer();
}

public function testShouldConvertMessageToJsonString()
{
$serializer = new JsonSerializer();

$message = new RdKafkaMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']);

$json = $serializer->toString($message);

$this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json);
}

public function testThrowIfFailedToEncodeMessageToJson()
{
$serializer = new JsonSerializer();

$resource = fopen(__FILE__, 'r');

//guard
$this->assertInternalType('resource', $resource);

$message = new RdKafkaMessage('theBody', ['aProp' => $resource]);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The malformed json given.');
$serializer->toString($message);
}

public function testShouldConvertJsonStringToMessage()
{
$serializer = new JsonSerializer();

$message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}');

$this->assertInstanceOf(RdKafkaMessage::class, $message);

$this->assertSame('theBody', $message->getBody());
$this->assertSame(['aProp' => 'aPropVal'], $message->getProperties());
$this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders());
}

public function testThrowIfFailedToDecodeJsonToMessage()
{
$serializer = new JsonSerializer();

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The malformed json given.');
$serializer->toMessage('{]');
}
}
Loading

0 comments on commit 2b1094c

Please sign in to comment.