Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rdkafka] Add abilito change the way a message is serialized. #188

Merged
merged 5 commits into from
Aug 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/transport/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Consume message](#consume-message)
* [Serialize message](#serialize-message)

## Installation

Expand Down Expand Up @@ -88,4 +89,27 @@ $consumer->acknowledge($message);
// $consumer->reject($message);
```

## Serialize message

By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/).
For that you have to implement Serializer interface and set it to the context, producer or consumer.
If a serializer set to context it will be injected to all consumers and producers created by the context.

```php
<?php
use Enqueue\RdKafka\Serializer;
use Enqueue\RdKafka\RdKafkaMessage;

class FooSerializer implements Serializer
{
public function toMessage($string) {}

public function toString(RdKafkaMessage $message) {}
}

/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */

$psrContext->setSerializer(new FooSerializer());
```

[back to index](index.md)
45 changes: 45 additions & 0 deletions pkg/rdkafka/JsonSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

namespace Enqueue\RdKafka;

class JsonSerializer implements Serializer
{
/**
* {@inheritdoc}
*/
public function toString(RdKafkaMessage $message)
{
$json = json_encode([
'body' => $message->getBody(),
'properties' => $message->getProperties(),
'headers' => $message->getHeaders(),
]);

if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return $json;
}

/**
* {@inheritdoc}
*/
public function toMessage($string)
{
$data = json_decode($string, true);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']);
}
}
9 changes: 7 additions & 2 deletions pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

class RdKafkaConsumer implements PsrConsumer
{
use SerializerAwareTrait;

/**
* @var KafkaConsumer
*/
Expand Down Expand Up @@ -38,14 +40,17 @@ class RdKafkaConsumer implements PsrConsumer
* @param KafkaConsumer $consumer
* @param RdKafkaContext $context
* @param RdKafkaTopic $topic
* @param Serializer $serializer
*/
public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic)
public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, RdKafkaTopic $topic, Serializer $serializer)
{
$this->consumer = $consumer;
$this->context = $context;
$this->topic = $topic;
$this->subscribed = false;
$this->commitAsync = false;

$this->setSerializer($serializer);
}

/**
Expand Down Expand Up @@ -151,7 +156,7 @@ private function doReceive($timeout)
case RD_KAFKA_RESP_ERR__TIMED_OUT:
break;
case RD_KAFKA_RESP_ERR_NO_ERROR:
$message = RdKafkaMessage::jsonUnserialize($kafkaMessage->payload);
$message = $this->serializer->toMessage($kafkaMessage->payload);
$message->setKey($kafkaMessage->key);
$message->setPartition($kafkaMessage->partition);
$message->setKafkaMessage($kafkaMessage);
Expand Down
13 changes: 11 additions & 2 deletions pkg/rdkafka/RdKafkaContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

class RdKafkaContext implements PsrContext
{
use SerializerAwareTrait;

/**
* @var array
*/
Expand All @@ -33,6 +35,8 @@ class RdKafkaContext implements PsrContext
public function __construct(array $config)
{
$this->config = $config;

$this->setSerializer(new JsonSerializer());
}

/**
Expand Down Expand Up @@ -78,7 +82,7 @@ public function createTemporaryQueue()
*/
public function createProducer()
{
return new RdKafkaProducer($this->getProducer());
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
}

/**
Expand All @@ -90,7 +94,12 @@ public function createConsumer(PsrDestination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);

$consumer = new RdKafkaConsumer(new KafkaConsumer($this->getConf()), $this, $destination);
$consumer = new RdKafkaConsumer(
new KafkaConsumer($this->getConf()),
$this,
$destination,
$this->getSerializer()
);

if (isset($this->config['commit_async'])) {
$consumer->setCommitAsync($this->config['commit_async']);
Expand Down
21 changes: 6 additions & 15 deletions pkg/rdkafka/RdKafkaMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
use Interop\Queue\PsrMessage;
use RdKafka\Message;

/**
* TODO: \JsonSerializable will be removed in next version (probably 0.8.x)
* The serialization logic was moved to JsonSerializer.
*/
class RdKafkaMessage implements PsrMessage, \JsonSerializable
{
/**
Expand Down Expand Up @@ -270,11 +274,7 @@ public function setKafkaMessage(Message $message)
*/
public function jsonSerialize()
{
return [
'body' => $this->getBody(),
'properties' => $this->getProperties(),
'headers' => $this->getHeaders(),
];
return (new JsonSerializer())->toString($this);
}

/**
Expand All @@ -284,15 +284,6 @@ public function jsonSerialize()
*/
public static function jsonUnserialize($json)
{
$data = json_decode($json, true);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return new self($data['body'], $data['properties'], $data['headers']);
return (new JsonSerializer())->toMessage($json);
}
}
11 changes: 8 additions & 3 deletions pkg/rdkafka/RdKafkaProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@

class RdKafkaProducer implements PsrProducer
{
use SerializerAwareTrait;

/**
* @var Producer
*/
private $producer;

/**
* @param Producer $producer
* @param Producer $producer
* @param Serializer $serializer
*/
public function __construct(Producer $producer)
public function __construct(Producer $producer, Serializer $serializer)
{
$this->producer = $producer;

$this->setSerializer($serializer);
}

/**
Expand All @@ -37,7 +42,7 @@ public function send(PsrDestination $destination, PsrMessage $message)

$partition = $message->getPartition() ?: $destination->getPartition() ?: RD_KAFKA_PARTITION_UA;
$key = $message->getKey() ?: $destination->getKey() ?: null;
$payload = json_encode($message);
$payload = $this->serializer->toString($message);

$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
Expand Down
20 changes: 20 additions & 0 deletions pkg/rdkafka/Serializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Enqueue\RdKafka;

interface Serializer
{
/**
* @param RdKafkaMessage $message
*
* @return string
*/
public function toString(RdKafkaMessage $message);

/**
* @param string $string
*
* @return RdKafkaMessage
*/
public function toMessage($string);
}
27 changes: 27 additions & 0 deletions pkg/rdkafka/SerializerAwareTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Enqueue\RdKafka;

trait SerializerAwareTrait
{
/**
* @var Serializer
*/
private $serializer;

/**
* @param Serializer $serializer
*/
public function setSerializer(Serializer $serializer)
{
$this->serializer = $serializer;
}

/**
* @return Serializer
*/
public function getSerializer()
{
return $this->serializer;
}
}
73 changes: 73 additions & 0 deletions pkg/rdkafka/Tests/JsonSerializerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Enqueue\RdKafka\Tests;

use Enqueue\RdKafka\JsonSerializer;
use Enqueue\RdKafka\RdKafkaMessage;
use Enqueue\RdKafka\Serializer;
use Enqueue\Test\ClassExtensionTrait;
use PHPUnit\Framework\TestCase;

class JsonSerializerTest extends TestCase
{
use ClassExtensionTrait;

public function testShouldImplementSerializerInterface()
{
$this->assertClassImplements(Serializer::class, JsonSerializer::class);
}

public function testCouldBeConstructedWithoutAnyArguments()
{
new JsonSerializer();
}

public function testShouldConvertMessageToJsonString()
{
$serializer = new JsonSerializer();

$message = new RdKafkaMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']);

$json = $serializer->toString($message);

$this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json);
}

public function testThrowIfFailedToEncodeMessageToJson()
{
$serializer = new JsonSerializer();

$resource = fopen(__FILE__, 'r');

//guard
$this->assertInternalType('resource', $resource);

$message = new RdKafkaMessage('theBody', ['aProp' => $resource]);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The malformed json given.');
$serializer->toString($message);
}

public function testShouldConvertJsonStringToMessage()
{
$serializer = new JsonSerializer();

$message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}');

$this->assertInstanceOf(RdKafkaMessage::class, $message);

$this->assertSame('theBody', $message->getBody());
$this->assertSame(['aProp' => 'aPropVal'], $message->getProperties());
$this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders());
}

public function testThrowIfFailedToDecodeJsonToMessage()
{
$serializer = new JsonSerializer();

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The malformed json given.');
$serializer->toMessage('{]');
}
}
Loading