Skip to content

Commit

Permalink
Merge pull request #203 from php-enqueue/fs-edge-case-with-message-si…
Browse files Browse the repository at this point in the history
…ze-2

[fs] fix bugs introduced in #181.
  • Loading branch information
makasim authored Sep 20, 2017
2 parents c4cf691 + ac81bca commit bdfdaf6
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 1 deletion.
16 changes: 15 additions & 1 deletion pkg/fs/FsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ public function receiveNoWait()
$count = $this->preFetchCount;
while ($count) {
$frame = $this->readFrame($file, 1);

//guards
if ($frame && false == ('|' == $frame[0] || ' ' == $frame[0])) {
throw new \LogicException(sprintf('The frame could start from either " " or "|". The malformed frame starts with "%s".', $frame[0]));
}
if (0 !== $reminder = strlen($frame) % 64) {
throw new \LogicException(sprintf('The frame size is "%d" and it must divide exactly to 64 but it leaves a reminder "%d".', strlen($frame), $reminder));
}

ftruncate($file, fstat($file)['size'] - strlen($frame));
rewind($file);

Expand Down Expand Up @@ -212,7 +221,12 @@ private function readFrame($file, $frameNumber)
$previousFrame = $this->readFrame($file, $frameNumber + 1);

if ('|' === substr($previousFrame, -1) && '{' === $frame[0]) {
return '|'.$frame;
$matched = [];
if (false === preg_match('/\ *?\|$/', $previousFrame, $matched)) {
throw new \LogicException('Something went completely wrong.');
}

return $matched[0].$frame;
}

return $previousFrame.$frame;
Expand Down
80 changes: 80 additions & 0 deletions pkg/fs/Tests/Functional/FsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Enqueue\Fs\FsConnectionFactory;
use Enqueue\Fs\FsContext;
use Enqueue\Fs\FsDestination;
use Enqueue\Fs\FsMessage;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -69,6 +70,7 @@ public function testShouldConsumeMessagesFromFileOneByOne()

/**
* @group bug
* @group bug170
*/
public function testShouldNotFailOnSpecificMessageSize()
{
Expand All @@ -91,4 +93,82 @@ public function testShouldNotFailOnSpecificMessageSize()
$message = $consumer->receiveNoWait();
$this->assertNull($message);
}

/**
* @group bug
* @group bug170
*/
public function testShouldNotCorruptFrameSize()
{
$context = $this->fsContext;
$queue = $context->createQueue('fs_test_queue');
$context->purge($queue);

$consumer = $context->createConsumer($queue);
$producer = $context->createProducer();

$producer->send($queue, $context->createMessage(str_repeat('a', 23)));
$producer->send($queue, $context->createMessage(str_repeat('b', 24)));

$message = $consumer->receiveNoWait();
$this->assertNotNull($message);
$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
$this->assertSame(0, fstat($file)['size'] % 64);
});

$message = $consumer->receiveNoWait();
$this->assertNotNull($message);
$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
$this->assertSame(0, fstat($file)['size'] % 64);
});

$message = $consumer->receiveNoWait();
$this->assertNull($message);
}

/**
* @group bug
* @group bug202
*/
public function testShouldThrowExceptionForTheCorruptedQueueFile()
{
$context = $this->fsContext;
$queue = $context->createQueue('fs_test_queue');
$context->purge($queue);

$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
fwrite($file, '|{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_red_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"46fdc345-5d0c-426e-95ac-227c7e657839","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_black_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"c4d60e39-3a8c-42df-b536-c8b7c13e006d","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_green_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"3a6aa176-c879-4435-9626-c48e0643defa","timestamp":1505379216,"reply_to":null,"correlation_id":""}}');
});

$consumer = $context->createConsumer($queue);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The frame could start from either " " or "|". The malformed frame starts with """.');
$consumer->receiveNoWait();
}

/**
* @group bug
* @group bug202
*/
public function testShouldThrowExceptionWhenFrameSizeNotDivideExactly()
{
$context = $this->fsContext;
$queue = $context->createQueue('fs_test_queue');
$context->purge($queue);

$context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) {
$msg = '|{"body":""}';
//guard
$this->assertNotSame(0, strlen($msg) % 64);

fwrite($file, $msg);
});

$consumer = $context->createConsumer($queue);

$this->expectException(\LogicException::class);
$this->expectExceptionMessage('The frame size is "12" and it must divide exactly to 64 but it leaves a reminder "12".');
$consumer->receiveNoWait();
}
}

0 comments on commit bdfdaf6

Please sign in to comment.