Skip to content
This repository has been archived by the owner on Dec 19, 2019. It is now read-only.

Commit

Permalink
fix AsynchronousOperations multistore issue by passing store_id from …
Browse files Browse the repository at this point in the history
…in amqp application_headers property
  • Loading branch information
anvasiliev committed May 28, 2019
1 parent 15100ac commit fd6cf5e
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php
/**
* Copyright © Magento, Inc. All rights reserved.
* See COPYING.txt for license details.
*/

declare(strict_types=1);

namespace Magento\Amqp\Plugin\Framework\MessageQueue;

use Magento\Store\Model\StoreManagerInterface;
use Magento\Framework\MessageQueue\EnvelopeFactory;
use PhpAmqpLib\Wire\AMQPTable;

/**
* Plugin to set 'store_id' to the new custom header 'store_id' in amqp
* 'application_headers'.
*/
class EnvelopeFactoryPlugin
{
/**
* @var \Magento\Store\Model\StoreManagerInterface
*/
private $storeManager;

/**
* @param \Magento\Store\Model\StoreManagerInterface $storeManager
*/
public function __construct(
StoreManagerInterface $storeManager
) {
$this->storeManager = $storeManager;
}

/**
* Pass current 'store_id' to the new custom header 'store_id' in amqp
* 'application_headers' Magento\AsynchronousOperations\Model\MassConsumer
* will use store_id to setCurrentStore and will execute messages for
* correct store instead of default.
*
* @return array
* @SuppressWarnings(PHPMD.UnusedFormalParameter)
*/
public function beforeCreate(EnvelopeFactory $subject, array $data = [])
{
if (!isset($data['publisher_flag'])) {
return null;
} else {
unset($data['publisher_flag']);
}
try {
$storeId = $this->storeManager->getStore()->getId();

if (isset($storeId)) {
if (isset($data['properties'])) {
$properties = $data['properties'];
if (isset($properties['application_headers'])) {
$headers = $properties['application_headers'];
if ($headers instanceof AMQPTable) {
$headers->set('store_id', $storeId);
$data['properties']['application_headers'] = $headers;
}
} else {
$data['properties']['application_headers'] = new AMQPTable(['store_id' => $storeId]);
}
}
}
} catch (\Exception $e) {
return null;
}

return [$data];
}
}
3 changes: 3 additions & 0 deletions app/code/Magento/Amqp/etc/di.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,7 @@
<argument name="instanceName" xsi:type="string">\Magento\Framework\Amqp\Bulk\Exchange</argument>
</arguments>
</virtualType>
<type name="Magento\Framework\MessageQueue\EnvelopeFactory">
<plugin name="amqpStoreIdFieldForMessageQueueEnvelopeFactory" type="Magento\Amqp\Plugin\Framework\MessageQueue\EnvelopeFactoryPlugin" />
</type>
</config>
28 changes: 27 additions & 1 deletion app/code/Magento/AsynchronousOperations/Model/MassConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

use Magento\Framework\App\ResourceConnection;
use Magento\Framework\Registry;
use Magento\Store\Model\StoreManagerInterface;
use PhpAmqpLib\Wire\AMQPTable;
use Psr\Log\LoggerInterface;
use Magento\Framework\MessageQueue\MessageLockException;
use Magento\Framework\MessageQueue\ConnectionLostException;
Expand Down Expand Up @@ -63,6 +65,10 @@ class MassConsumer implements ConsumerInterface
* @var Registry
*/
private $registry;
/**
* @var \Magento\Store\Model\StoreManagerInterface
*/
private $storeManager;

/**
* Initialize dependencies.
Expand All @@ -74,6 +80,7 @@ class MassConsumer implements ConsumerInterface
* @param OperationProcessorFactory $operationProcessorFactory
* @param LoggerInterface $logger
* @param Registry $registry
* @param StoreManagerInterface $storeManager
*/
public function __construct(
CallbackInvokerInterface $invoker,
Expand All @@ -82,7 +89,8 @@ public function __construct(
ConsumerConfigurationInterface $configuration,
OperationProcessorFactory $operationProcessorFactory,
LoggerInterface $logger,
Registry $registry = null
Registry $registry = null,
StoreManagerInterface $storeManager = null
) {
$this->invoker = $invoker;
$this->resource = $resource;
Expand All @@ -94,6 +102,8 @@ public function __construct(
$this->logger = $logger;
$this->registry = $registry ?? \Magento\Framework\App\ObjectManager::getInstance()
->get(Registry::class);
$this->storeManager = $storeManager ?? \Magento\Framework\App\ObjectManager::getInstance()
->get(StoreManagerInterface::class);
}

/**
Expand Down Expand Up @@ -126,6 +136,22 @@ private function getTransactionCallback(QueueInterface $queue)
/** @var LockInterface $lock */
$lock = null;
try {
$amqpProperties = $message->getProperties();
if (isset($amqpProperties['application_headers'])) {
$headers = $amqpProperties['application_headers'];
if ($headers instanceof AMQPTable) {
$headers = $headers->getNativeData();
}
if (isset($headers['store_id'])) {
$storeId = $headers['store_id'];
$currentStoreId = $this->storeManager->getStore()->getId();

if (isset($storeId) && $storeId !== $currentStoreId) {
$this->storeManager->setCurrentStore($storeId);
}
}
}

$topicName = $message->getProperties()['topic_name'];
$lock = $this->messageController->lock($message, $this->configuration->getConsumerName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public function publish($topicName, $data)
'properties' => [
'delivery_mode' => 2,
'message_id' => $this->messageIdGenerator->generate($topicName),
]
],
'publisher_flag'=>true
]
);
}
Expand Down

0 comments on commit fd6cf5e

Please sign in to comment.