Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPM-253 Add avro-validator #43

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
}
},
"suggest": {
"flix-tech/avro-serde-php": "Is needed for Avro support"
"flix-tech/avro-serde-php": "Is needed for Avro support",
"jobcloud/avro-validator": "Useful for debug purposes in development, not recommended for production use"
},
"extra": {
"branch-alias": {
Expand Down
4 changes: 4 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
parameters:
level: 8
paths: [ src ]
ignoreErrors:
- '#Comparison operation ">" between int and RdKafka\\TopicPartition results in an error.#'
- '#Call to static method fromSchema\(\) on an unknown class Jobcloud\\Avro\\Validator\\RecordRegistry#'
- '#Parameter \#[0-9] [a-z\$]+ of method [a-zA-Z0-9\\_]+::setOffset\(\) expects string, int given.#'
10 changes: 10 additions & 0 deletions src/Exception/AvroValidatorException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Jobcloud\Kafka\Exception;

class AvroValidatorException extends \Exception
{

}
56 changes: 45 additions & 11 deletions src/Message/Encoder/AvroEncoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
namespace Jobcloud\Kafka\Message\Encoder;

use AvroSchema;
use FlixTech\AvroSerializer\Objects\Exceptions\AvroEncodingException;
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;
use Jobcloud\Avro\Validator\Exception\RecordRegistryException;
use Jobcloud\Avro\Validator\Exception\ValidatorException;
use Jobcloud\Avro\Validator\RecordRegistry;
use Jobcloud\Avro\Validator\Validator;
use Jobcloud\Kafka\Exception\AvroEncoderException;
use Jobcloud\Kafka\Exception\AvroValidatorException;
use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface;
Expand Down Expand Up @@ -40,8 +46,8 @@ public function __construct(
/**
* @param KafkaProducerMessageInterface $producerMessage
* @return KafkaProducerMessageInterface
* @throws AvroValidatorException
* @throws SchemaRegistryException
* @throws AvroEncoderException
*/
public function encode(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
{
Expand All @@ -53,6 +59,7 @@ public function encode(KafkaProducerMessageInterface $producerMessage): KafkaPro
/**
* @param KafkaProducerMessageInterface $producerMessage
* @return KafkaProducerMessageInterface
* @throws AvroValidatorException
* @throws SchemaRegistryException
*/
private function encodeBody(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
Expand All @@ -70,18 +77,15 @@ private function encodeBody(KafkaProducerMessageInterface $producerMessage): Kaf

$avroSchema = $this->registry->getBodySchemaForTopic($topicName);

$encodedBody = $this->recordSerializer->encodeRecord(
$avroSchema->getName(),
$this->getAvroSchemaDefinition($avroSchema),
$body
);
$encodedBody = $this->encodeRecord($avroSchema, $body, $topicName);

return $producerMessage->withBody($encodedBody);
}

/**
* @param KafkaProducerMessageInterface $producerMessage
* @return KafkaProducerMessageInterface
* @throws AvroValidatorException
* @throws SchemaRegistryException
*/
private function encodeKey(KafkaProducerMessageInterface $producerMessage): KafkaProducerMessageInterface
Expand All @@ -99,11 +103,7 @@ private function encodeKey(KafkaProducerMessageInterface $producerMessage): Kafk

$avroSchema = $this->registry->getKeySchemaForTopic($topicName);

$encodedKey = $this->recordSerializer->encodeRecord(
$avroSchema->getName(),
$this->getAvroSchemaDefinition($avroSchema),
$key
);
$encodedKey = $this->encodeRecord($avroSchema, $key, $topicName);

return $producerMessage->withKey($encodedKey);
}
Expand Down Expand Up @@ -131,4 +131,38 @@ public function getRegistry(): AvroSchemaRegistryInterface
{
return $this->registry;
}

/**
* @param KafkaAvroSchemaInterface $avroSchema
* @param mixed $data
* @param string $topicName
* @return string
* @throws SchemaRegistryException
* @throws AvroValidatorException
*/
private function encodeRecord(KafkaAvroSchemaInterface $avroSchema, $data, string $topicName): string
{
try {
$encodedData = $this->recordSerializer->encodeRecord(
$avroSchema->getName(),
$this->getAvroSchemaDefinition($avroSchema),
$data
);
} catch (AvroEncodingException $exception) {
if (class_exists(Validator::class)) {
/** @var AvroSchema $schemaDefinition */
$schemaDefinition = $avroSchema->getDefinition();
$recordRegistry = RecordRegistry::fromSchema(json_encode($schemaDefinition->to_avro()));
$validator = new Validator($recordRegistry);

$validationErrors = $validator->validate(json_encode($data), $topicName);

throw new AvroValidatorException((string) json_encode($validationErrors));
}

throw $exception;
}

return $encodedData;
}
}
170 changes: 168 additions & 2 deletions tests/Unit/Message/Encoder/AvroEncoderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,51 @@

namespace Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder;

use FlixTech\AvroSerializer\Objects\Exceptions\AvroEncodingException;
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use Jobcloud\Kafka\Exception\AvroEncoderException;
use Jobcloud\Kafka\Message\Encoder\AvroEncoderInterface;
use Jobcloud\Kafka\Exception\AvroValidatorException;
use Jobcloud\Kafka\Message\KafkaAvroSchemaInterface;
use Jobcloud\Kafka\Message\KafkaProducerMessageInterface;
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistryInterface;
use PHPStan\Testing\TestCase;
use \AvroSchema;

/**
* @covers \Jobcloud\Kafka\Message\Encoder\AvroEncoder
*/
class AvroEncoderTest extends TestCase
{
private $avroValidatorClass = "./src/Message/Encoder/AvroEncoder.php";

private $originalNamespaces = [
"Jobcloud\Avro\Validator\RecordRegistry",
"Jobcloud\Avro\Validator\Validator"
];

private $replacedNamespaces = [
"Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\RecordRegistry",
"Jobcloud\Kafka\Tests\Unit\Kafka\Message\Encoder\Validator"
];

protected function setUp(): void
{
$avroEncoderContent = file_get_contents($this->avroValidatorClass);

$avroEncoderContent = str_replace($this->originalNamespaces, $this->replacedNamespaces, $avroEncoderContent);

file_put_contents($this->avroValidatorClass, $avroEncoderContent);
}

protected function tearDown(): void
{
$avroEncoderContent = file_get_contents($this->avroValidatorClass);

$avroEncoderContent = str_replace($this->replacedNamespaces, $this->originalNamespaces, $avroEncoderContent);

file_put_contents($this->avroValidatorClass, $avroEncoderContent);
}

public function testEncodeTombstone()
{
$producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class);
Expand Down Expand Up @@ -169,4 +199,140 @@ public function testGetRegistry()

self::assertSame($registry, $encoder->getRegistry());
}

public function testAvroEncodingException()
{
$schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock();

$avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class);
$avroSchema->expects(self::exactly(4))->method('getName')->willReturn('schemaName');
$avroSchema->expects(self::never())->method('getVersion');
$avroSchema->expects(self::exactly(4))->method('getDefinition')->willReturn($schemaDefinition);

$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
$registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true);
$registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true);

$producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class);
$producerMessage->expects(self::exactly(2))->method('getTopicName')->willReturn('test');
$producerMessage->expects(self::once())->method('getBody')->willReturn([]);
$producerMessage->expects(self::once())->method('getKey')->willReturn('test-key');
$producerMessage->expects(self::once())->method('withBody')->with('encodedValue')->willReturn($producerMessage);

$avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock();

$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$recordSerializer
->expects(self::exactly(2))
->method('encodeRecord')
->withConsecutive(
[$avroSchema->getName(), $avroSchema->getDefinition(), []],
[$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key']
)
->willReturnOnConsecutiveCalls('encodedValue', $this->throwException($avroEncodingException));

$encoder = new AvroEncoder($registry, $recordSerializer);

self::expectException(AvroEncodingException::class);
self::assertNotSame($producerMessage, $encoder->encode($producerMessage));
}

public function testAvroValidatorBodyException()
{
$schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock();

$avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class);
$avroSchema->expects(self::once())->method('getName')->willReturn('schemaName');
$avroSchema->expects(self::exactly(2))->method('getDefinition')->willReturn($schemaDefinition);
$schemaDefinition->method('to_avro')->willReturn(['type' => 'record']);

$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
$registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true);

$producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class);
$producerMessage->expects(self::once())->method('getTopicName')->willReturn('test');
$producerMessage->expects(self::once())->method('getBody')->willReturn(['id' => 123]);

$avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)
->disableOriginalConstructor()
->getMock();
$recordSerializer = $this->getMockBuilder(RecordSerializer::class)
->disableOriginalConstructor()
->getMock();
$recordSerializer
->expects(self::once())
->method('encodeRecord')
->willReturnOnConsecutiveCalls('encodedValue')
->willThrowException($avroEncodingException);

$encoder = new AvroEncoder($registry, $recordSerializer);

self::expectException(AvroValidatorException::class);
self::expectExceptionMessage(json_encode(['test' => 'test']));
self::assertNotSame($producerMessage, $encoder->encode($producerMessage));
}

public function testAvroValidatorKeyException()
{
$schemaDefinition = $this->getMockBuilder(\AvroSchema::class)->disableOriginalConstructor()->getMock();

$avroSchema = $this->getMockForAbstractClass(KafkaAvroSchemaInterface::class);
$avroSchema->expects(self::exactly(4))->method('getName')->willReturn('schemaName');
$avroSchema->expects(self::never())->method('getVersion');
$avroSchema->expects(self::exactly(5))->method('getDefinition')->willReturn($schemaDefinition);
$schemaDefinition->method('to_avro')->willReturn([]);

$registry = $this->getMockForAbstractClass(AvroSchemaRegistryInterface::class);
$registry->expects(self::once())->method('getBodySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('getKeySchemaForTopic')->willReturn($avroSchema);
$registry->expects(self::once())->method('hasBodySchemaForTopic')->willReturn(true);
$registry->expects(self::once())->method('hasKeySchemaForTopic')->willReturn(true);

$producerMessage = $this->getMockForAbstractClass(KafkaProducerMessageInterface::class);
$producerMessage->expects(self::exactly(2))->method('getTopicName')->willReturn('test');
$producerMessage->expects(self::once())->method('getBody')->willReturn([]);
$producerMessage->expects(self::once())->method('getKey')->willReturn('test-key');
$producerMessage->expects(self::once())->method('withBody')->with('encodedValue')->willReturn($producerMessage);

$avroEncodingException = $this->getMockBuilder(AvroEncodingException::class)->disableOriginalConstructor()->getMock();

$recordSerializer = $this->getMockBuilder(RecordSerializer::class)->disableOriginalConstructor()->getMock();
$recordSerializer
->expects(self::exactly(2))
->method('encodeRecord')
->withConsecutive(
[$avroSchema->getName(), $avroSchema->getDefinition(), []],
[$avroSchema->getName(), $avroSchema->getDefinition(), 'test-key']
)
->willReturnOnConsecutiveCalls('encodedValue', $this->throwException($avroEncodingException));

$encoder = new AvroEncoder($registry, $recordSerializer);

self::expectException(AvroValidatorException::class);
self::expectExceptionMessage(json_encode(['test' => 'test']));
self::assertNotSame($producerMessage, $encoder->encode($producerMessage));
}
}

class RecordRegistry {
public function fromSchema(string $schema): string
{
return $schema;
}
}

class Validator {
public function validate(): array
{
return [
'test' => 'test',
];
}
}

class AvroValidationException {

}