Skip to content

Commit

Permalink
Merge pull request #5897 from magento-performance/MC-35827-2
Browse files Browse the repository at this point in the history
Fixed issues:
- MC-35884 Consumers improvement
- MC-35827 Test and merge PR with extended message queue consumer configuration
  • Loading branch information
slavvka authored Jul 17, 2020
2 parents 1e64fa5 + 3d9136d commit e0bf5fe
Show file tree
Hide file tree
Showing 26 changed files with 788 additions and 60 deletions.
10 changes: 9 additions & 1 deletion app/code/Magento/AsynchronousOperations/Model/MassConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,19 @@ public function process($maxNumberOfMessages = null)
$this->registry->register('isSecureArea', true, true);

$queue = $this->configuration->getQueue();
$maxIdleTime = $this->configuration->getMaxIdleTime();
$sleep = $this->configuration->getSleep();

if (!isset($maxNumberOfMessages)) {
$queue->subscribe($this->getTransactionCallback($queue));
} else {
$this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
$this->invoker->invoke(
$queue,
$maxNumberOfMessages,
$this->getTransactionCallback($queue),
$maxIdleTime,
$sleep
);
}

$this->registry->unregister('isSecureArea');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php
/**
* Copyright © Magento, Inc. All rights reserved.
* See COPYING.txt for license details.
*/
declare(strict_types=1);

namespace Magento\MessageQueue\Model;

use Magento\Framework\MessageQueue\QueueRepository;

/**
* Class CheckIsAvailableMessagesInQueue for checking messages available in queue
*/
class CheckIsAvailableMessagesInQueue
{
/**
* @var QueueRepository
*/
private $queueRepository;

/**
* Initialize dependencies.
*
* @param QueueRepository $queueRepository
*/
public function __construct(QueueRepository $queueRepository)
{
$this->queueRepository = $queueRepository;
}

/**
* Checks if there is available messages in the queue
*
* @param string $connectionName connection name
* @param string $queueName queue name
* @return bool
* @throws \LogicException if queue is not available
*/
public function execute($connectionName, $queueName)
{
$queue = $this->queueRepository->get($connectionName, $queueName);
$message = $queue->dequeue();
if ($message) {
$queue->reject($message);
return true;
}
return false;
}
}
31 changes: 30 additions & 1 deletion app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\PhpExecutableFinder;
use Magento\Framework\Lock\LockManagerInterface;
use Magento\MessageQueue\Model\CheckIsAvailableMessagesInQueue;

/**
* Class for running consumers processes by cron
Expand Down Expand Up @@ -65,6 +66,11 @@ class ConsumersRunner
*/
private $lockManager;

/**
* @var CheckIsAvailableMessagesInQueue
*/
private $checkIsAvailableMessages;

/**
* @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed
* for the PHP executable
Expand All @@ -74,6 +80,7 @@ class ConsumersRunner
* @param LockManagerInterface $lockManager The lock manager
* @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
* @param LoggerInterface $logger Logger
* @param CheckIsAvailableMessagesInQueue $checkIsAvailableMessages
*/
public function __construct(
PhpExecutableFinder $phpExecutableFinder,
Expand All @@ -82,7 +89,8 @@ public function __construct(
ShellInterface $shellBackground,
LockManagerInterface $lockManager,
ConnectionTypeResolver $mqConnectionTypeResolver = null,
LoggerInterface $logger = null
LoggerInterface $logger = null,
CheckIsAvailableMessagesInQueue $checkIsAvailableMessages = null
) {
$this->phpExecutableFinder = $phpExecutableFinder;
$this->consumerConfig = $consumerConfig;
Expand All @@ -93,6 +101,8 @@ public function __construct(
?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class);
$this->logger = $logger
?: ObjectManager::getInstance()->get(LoggerInterface::class);
$this->checkIsAvailableMessages = $checkIsAvailableMessages
?: ObjectManager::getInstance()->get(CheckIsAvailableMessagesInQueue::class);
}

/**
Expand Down Expand Up @@ -166,6 +176,25 @@ private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $al
return false;
}

if ($consumerConfig->getOnlySpawnWhenMessageAvailable()) {
try {
return $this->checkIsAvailableMessages->execute(
$connectionName,
$consumerConfig->getQueue()
);
} catch (\LogicException $e) {
$this->logger->info(
sprintf(
'Consumer "%s" skipped as its related queue "%s" is not available. %s',
$consumerName,
$consumerConfig->getQueue(),
$e->getMessage()
)
);
return false;
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface;
use Magento\Framework\ShellInterface;
use Magento\MessageQueue\Model\Cron\ConsumersRunner;
use Magento\MessageQueue\Model\CheckIsAvailableMessagesInQueue;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\PhpExecutableFinder;
Expand Down Expand Up @@ -48,10 +49,15 @@ class ConsumersRunnerTest extends TestCase
*/
private $phpExecutableFinderMock;

/**
* @var CheckIsAvailableMessagesInQueue|MockObject
*/
private $checkIsAvailableMessagesMock;

/**
* @var ConnectionTypeResolver
*/
private $connectionTypeResover;
private $connectionTypeResolver;

/**
* @var ConsumersRunner
Expand All @@ -77,18 +83,21 @@ protected function setUp(): void
$this->deploymentConfigMock = $this->getMockBuilder(DeploymentConfig::class)
->disableOriginalConstructor()
->getMock();
$this->connectionTypeResover = $this->getMockBuilder(ConnectionTypeResolver::class)
$this->checkIsAvailableMessagesMock = $this->createMock(CheckIsAvailableMessagesInQueue::class);
$this->connectionTypeResolver = $this->getMockBuilder(ConnectionTypeResolver::class)
->disableOriginalConstructor()
->getMock();
$this->connectionTypeResover->method('getConnectionType')->willReturn('something');
$this->connectionTypeResolver->method('getConnectionType')->willReturn('something');

$this->consumersRunner = new ConsumersRunner(
$this->phpExecutableFinderMock,
$this->consumerConfigMock,
$this->deploymentConfigMock,
$this->shellBackgroundMock,
$this->lockManagerMock,
$this->connectionTypeResover
$this->connectionTypeResolver,
null,
$this->checkIsAvailableMessagesMock
);
}

Expand Down Expand Up @@ -262,4 +271,95 @@ public function runDataProvider()
],
];
}

/**
* @param boolean $onlySpawnWhenMessageAvailable
* @param boolean $isMassagesAvailableInTheQueue
* @param int $shellBackgroundExpects
* @dataProvider runBasedOnOnlySpawnWhenMessageAvailableConsumerConfigurationDataProvider
*/
public function testRunBasedOnOnlySpawnWhenMessageAvailableConsumerConfiguration(
$onlySpawnWhenMessageAvailable,
$isMassagesAvailableInTheQueue,
$shellBackgroundExpects
) {
$consumerName = 'consumerName';
$connectionName = 'connectionName';
$queueName = 'queueName';
$this->deploymentConfigMock->expects($this->exactly(3))
->method('get')
->willReturnMap(
[
['cron_consumers_runner/cron_run', true, true],
['cron_consumers_runner/max_messages', 10000, 1000],
['cron_consumers_runner/consumers', [], []],
]
);

/** @var ConsumerConfigInterface|MockObject $firstCunsumer */
$consumer = $this->getMockBuilder(ConsumerConfigItemInterface::class)
->getMockForAbstractClass();
$consumer->expects($this->any())
->method('getName')
->willReturn($consumerName);
$consumer->expects($this->once())
->method('getConnection')
->willReturn($connectionName);
$consumer->expects($this->any())
->method('getQueue')
->willReturn($queueName);
$consumer->expects($this->once())
->method('getOnlySpawnWhenMessageAvailable')
->willReturn($onlySpawnWhenMessageAvailable);
$this->consumerConfigMock->expects($this->once())
->method('getConsumers')
->willReturn([$consumer]);

$this->phpExecutableFinderMock->expects($this->once())
->method('find')
->willReturn('');

$this->lockManagerMock->expects($this->once())
->method('isLocked')
->willReturn(false);

$this->checkIsAvailableMessagesMock->expects($this->exactly((int)$onlySpawnWhenMessageAvailable))
->method('execute')
->willReturn($isMassagesAvailableInTheQueue);

$this->shellBackgroundMock->expects($this->exactly($shellBackgroundExpects))
->method('execute');

$this->consumersRunner->run();
}

/**
* @return array
*/
public function runBasedOnOnlySpawnWhenMessageAvailableConsumerConfigurationDataProvider()
{
return [
[
'onlySpawnWhenMessageAvailable' => true,
'isMassagesAvailableInTheQueue' => true,
'shellBackgroundExpects' => 1
],
[
'onlySpawnWhenMessageAvailable' => true,
'isMassagesAvailableInTheQueue' => false,
'shellBackgroundExpects' => 0
],
[
'onlySpawnWhenMessageAvailable' => false,
'isMassagesAvailableInTheQueue' => true,
'shellBackgroundExpects' => 1
],
[
'onlySpawnWhenMessageAvailable' => false,
'isMassagesAvailableInTheQueue' => false,
'shellBackgroundExpects' => 1
],

];
}
}
21 changes: 18 additions & 3 deletions lib/internal/Magento/Framework/MessageQueue/CallbackInvoker.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,31 @@ public function __construct(
* @param QueueInterface $queue
* @param int $maxNumberOfMessages
* @param \Closure $callback
* @param int|null $maxIdleTime
* @param int|null $sleep
* @return void
*
* @SuppressWarnings(PHPMD.CyclomaticComplexity)
*/
public function invoke(QueueInterface $queue, $maxNumberOfMessages, $callback)
{
public function invoke(
QueueInterface $queue,
$maxNumberOfMessages,
$callback,
$maxIdleTime = null,
$sleep = null
) {
$this->poisonPillVersion = $this->poisonPillRead->getLatestVersion();
$sleep = (int) $sleep ?: 1;
$maxIdleTime = $maxIdleTime ? (int) $maxIdleTime : PHP_INT_MAX;
for ($i = $maxNumberOfMessages; $i > 0; $i--) {
$idleStartTime = microtime(true);
do {
$message = $queue->dequeue();
if (!$message && microtime(true) - $idleStartTime > $maxIdleTime) {
break 2;
}
// phpcs:ignore Magento2.Functions.DiscouragedFunction
} while ($message === null && $this->isWaitingNextMessage() && (sleep(1) === 0));
} while ($message === null && $this->isWaitingNextMessage() && (sleep($sleep) === 0));

if ($message === null) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace Magento\Framework\MessageQueue;

/**
* Callback invoker interface
* Callback invoker interface. Invoke callbacks for consumer classes.
*/
interface CallbackInvokerInterface
{
Expand All @@ -18,7 +18,15 @@ interface CallbackInvokerInterface
* @param QueueInterface $queue
* @param int $maxNumberOfMessages
* @param \Closure $callback
* @param int|null $maxIdleTime
* @param int|null $sleep
* @return void
*/
public function invoke(QueueInterface $queue, $maxNumberOfMessages, $callback);
public function invoke(
QueueInterface $queue,
$maxNumberOfMessages,
$callback,
$maxIdleTime = null,
$sleep = null
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ private function getConsumerConfigDataFromQueueConfig()
'consumerInstance' => $consumerData['instance_type'],
'handlers' => $handlers,
'connection' => $consumerData['connection'],
'maxMessages' => $consumerData['max_messages']
'maxMessages' => $consumerData['max_messages'],
'maxIdleTime' => null,
'sleep' => null,
'onlySpawnWhenMessageAvailable' => false
];
}

Expand Down
11 changes: 9 additions & 2 deletions lib/internal/Magento/Framework/MessageQueue/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,18 @@ public function __construct(
public function process($maxNumberOfMessages = null)
{
$queue = $this->configuration->getQueue();

$maxIdleTime = $this->configuration->getMaxIdleTime();
$sleep = $this->configuration->getSleep();
if (!isset($maxNumberOfMessages)) {
$queue->subscribe($this->getTransactionCallback($queue));
} else {
$this->invoker->invoke($queue, $maxNumberOfMessages, $this->getTransactionCallback($queue));
$this->invoker->invoke(
$queue,
$maxNumberOfMessages,
$this->getTransactionCallback($queue),
$maxIdleTime,
$sleep
);
}
}

Expand Down
Loading

0 comments on commit e0bf5fe

Please sign in to comment.