From ac81bcae4947ed043e123f220db06a2311cc552b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 20 Sep 2017 16:47:54 +0300 Subject: [PATCH] [fs] fix bugs intrdoced in #181. The #181 introduce a bug that corrupt the file structure and make it impossible to read. --- pkg/fs/FsConsumer.php | 16 ++++- pkg/fs/Tests/Functional/FsConsumerTest.php | 80 ++++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/pkg/fs/FsConsumer.php b/pkg/fs/FsConsumer.php index 1bd36c498..c42ea4dfc 100644 --- a/pkg/fs/FsConsumer.php +++ b/pkg/fs/FsConsumer.php @@ -119,6 +119,15 @@ public function receiveNoWait() $count = $this->preFetchCount; while ($count) { $frame = $this->readFrame($file, 1); + + //guards + if ($frame && false == ('|' == $frame[0] || ' ' == $frame[0])) { + throw new \LogicException(sprintf('The frame could start from either " " or "|". The malformed frame starts with "%s".', $frame[0])); + } + if (0 !== $reminder = strlen($frame) % 64) { + throw new \LogicException(sprintf('The frame size is "%d" and it must divide exactly to 64 but it leaves a reminder "%d".', strlen($frame), $reminder)); + } + ftruncate($file, fstat($file)['size'] - strlen($frame)); rewind($file); @@ -212,7 +221,12 @@ private function readFrame($file, $frameNumber) $previousFrame = $this->readFrame($file, $frameNumber + 1); if ('|' === substr($previousFrame, -1) && '{' === $frame[0]) { - return '|'.$frame; + $matched = []; + if (false === preg_match('/\ *?\|$/', $previousFrame, $matched)) { + throw new \LogicException('Something went completely wrong.'); + } + + return $matched[0].$frame; } return $previousFrame.$frame; diff --git a/pkg/fs/Tests/Functional/FsConsumerTest.php b/pkg/fs/Tests/Functional/FsConsumerTest.php index 33e676ae4..f3a641159 100644 --- a/pkg/fs/Tests/Functional/FsConsumerTest.php +++ b/pkg/fs/Tests/Functional/FsConsumerTest.php @@ -4,6 +4,7 @@ use Enqueue\Fs\FsConnectionFactory; use Enqueue\Fs\FsContext; +use Enqueue\Fs\FsDestination; use Enqueue\Fs\FsMessage; use PHPUnit\Framework\TestCase; @@ -69,6 +70,7 @@ public function testShouldConsumeMessagesFromFileOneByOne() /** * @group bug + * @group bug170 */ public function testShouldNotFailOnSpecificMessageSize() { @@ -91,4 +93,82 @@ public function testShouldNotFailOnSpecificMessageSize() $message = $consumer->receiveNoWait(); $this->assertNull($message); } + + /** + * @group bug + * @group bug170 + */ + public function testShouldNotCorruptFrameSize() + { + $context = $this->fsContext; + $queue = $context->createQueue('fs_test_queue'); + $context->purge($queue); + + $consumer = $context->createConsumer($queue); + $producer = $context->createProducer(); + + $producer->send($queue, $context->createMessage(str_repeat('a', 23))); + $producer->send($queue, $context->createMessage(str_repeat('b', 24))); + + $message = $consumer->receiveNoWait(); + $this->assertNotNull($message); + $context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) { + $this->assertSame(0, fstat($file)['size'] % 64); + }); + + $message = $consumer->receiveNoWait(); + $this->assertNotNull($message); + $context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) { + $this->assertSame(0, fstat($file)['size'] % 64); + }); + + $message = $consumer->receiveNoWait(); + $this->assertNull($message); + } + + /** + * @group bug + * @group bug202 + */ + public function testShouldThrowExceptionForTheCorruptedQueueFile() + { + $context = $this->fsContext; + $queue = $context->createQueue('fs_test_queue'); + $context->purge($queue); + + $context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) { + fwrite($file, '|{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_red_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"46fdc345-5d0c-426e-95ac-227c7e657839","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_black_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"c4d60e39-3a8c-42df-b536-c8b7c13e006d","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_green_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"3a6aa176-c879-4435-9626-c48e0643defa","timestamp":1505379216,"reply_to":null,"correlation_id":""}}'); + }); + + $consumer = $context->createConsumer($queue); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The frame could start from either " " or "|". The malformed frame starts with """.'); + $consumer->receiveNoWait(); + } + + /** + * @group bug + * @group bug202 + */ + public function testShouldThrowExceptionWhenFrameSizeNotDivideExactly() + { + $context = $this->fsContext; + $queue = $context->createQueue('fs_test_queue'); + $context->purge($queue); + + $context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) { + $msg = '|{"body":""}'; + //guard + $this->assertNotSame(0, strlen($msg) % 64); + + fwrite($file, $msg); + }); + + $consumer = $context->createConsumer($queue); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The frame size is "12" and it must divide exactly to 64 but it leaves a reminder "12".'); + $consumer->receiveNoWait(); + } }