From 7e03d70dffd16025c673192b7b7a0c2f84076622 Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Thu, 30 May 2024 12:18:15 +0200 Subject: [PATCH 01/18] Add new command for emptying queus in RabbitMq and MySql --- Console/Command/NostoClearQueueCommand.php | 214 +++++++++++++++++++++ etc/di.xml | 9 + 2 files changed, 223 insertions(+) create mode 100644 Console/Command/NostoClearQueueCommand.php mode change 100644 => 100755 etc/di.xml diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php new file mode 100644 index 000000000..2457274e9 --- /dev/null +++ b/Console/Command/NostoClearQueueCommand.php @@ -0,0 +1,214 @@ +resourceConnection = $resourceConnection; + $this->amqpConfig = $amqpConfig; + + parent::__construct(); + } + + protected function configure() + { + // Define command name. + $this->setName('nosto:clear:queue') + ->setDescription('Clear all message queues for Nosto product sync topics.'); + + parent::configure(); + } + + protected function execute(InputInterface $input, OutputInterface $output) + { + $io = new SymfonyStyle($input, $output); + + try { + foreach (self::QUEUE_TOPICS as $topicName) { + $this->clearQueue($topicName, $io); + } + + $io->success('Successfully cleared message queues.'); + return 0; + } catch (NostoException $e) { + $io->error('An error occurred while clearing message queues: ' . $e->getMessage()); + return 1; + } + } + + /** + * Clear MySql and RabbitMq queues by name. + * + * @param string $topicName + * @param SymfonyStyle $io + * @return void + */ + private function clearQueue(string $topicName, SymfonyStyle $io) + { + $this->clearRabbitMQQueue($topicName, $io); + $this->clearDBQueues($topicName, $io); + } + + /** + * Clear DB. + * + * @param string $topicName + * @param $io + * + * @return void + */ + private function clearDBQueues(string $topicName, $io) + { + // Get connection. + $connection = $this->resourceConnection->getConnection(); + + // Start DB transaction. + $connection->beginTransaction(); + try { + // Emtyig DB tables. + $this->clearQueueMessages($topicName, $connection); + $this->clearRelatedRecords($topicName, $connection, $io); + + $connection->commit(); + } catch (\Exception $exception) { + $connection->rollBack(); + $io->error('An error occurred while clearing DB queues for topic ' . $topicName . ': ' . $exception->getMessage()); + } + } + + /** + * Emtying queue message tables. + * + * @param $topicName + * @param $connection + * + * @return void + */ + private function clearQueueMessages($topicName, $connection) + { + $queueMessageTable = $this->resourceConnection->getTableName('queue_message'); + $queueMessageStatusTable = $this->resourceConnection->getTableName('queue_message_status'); + + // Get all IDs from "queue_message" table. + $select = $connection->select() + ->from($queueMessageTable, ['id']) + ->where('topic_name = ?', $topicName); + $messageIds = $connection->fetchCol($select); + + // Delete related records from "queue_message_status" table. + if (!empty($messageIds)) { + $connection->delete($queueMessageStatusTable, ['message_id IN (?)' => $messageIds]); + } + + // Delete records from "queue_message" table. + $connection->delete($queueMessageTable, ['topic_name = ?' => $topicName]); + } + + /** + * Emtying related tables. + * + * @param $topicName + * @param $connection + * @param $io + * + * @return void + */ + private function clearRelatedRecords($topicName, $connection, $io) + { + $magentoOperationTable = $this->resourceConnection->getTableName('magento_operation'); + $magentoBulkTable = $this->resourceConnection->getTableName('magento_bulk'); + + // Get all IDs from "magento_operation" table. + $selectBulkUuids = $connection->select() + ->from($magentoOperationTable, ['bulk_uuid']) + ->where('topic_name = ?', $topicName); + $bulkUuids = $connection->fetchCol($selectBulkUuids); + + // Delete related records from "magento_bulk" table. + if (!empty($bulkUuids)) { + $connection->delete($magentoBulkTable, ['uuid IN (?)' => $bulkUuids]); + } + + // Delete records from "magento_operation" table. + $connection->delete($magentoOperationTable, ['topic_name = ?' => $topicName]); + } + + /** + * Clear RabbitMq Queues by name. + * + * @param $queueName + * @param $io + * + * @return void + */ + private function clearRabbitMQQueue($queueName, $io) + { + try { + // Get RabbitMq channel. + $channel = $this->amqpConfig->getChannel(); + + // Empty queue if queue exists. + if ($this->queueExists($channel, $queueName)) { + $channel->queue_purge($queueName); + } + } catch (\Exception $e) { + // Log the error or handle it as required. + $io->error('An error occurred while clearing RabbitMQ queue ' . $queueName . ': ' . $e->getMessage()); + throw new \RuntimeException('Failed to clear RabbitMQ queue: ' . $e->getMessage()); + } + } + + /** + * Check queue exist. + * + * @param $channel + * @param $queueName + * + * @return bool + */ + protected function queueExists($channel, $queueName) + { + $queueInfo = $channel->queue_declare($queueName, passive: true); + + return !empty($queueInfo); + } +} diff --git a/etc/di.xml b/etc/di.xml old mode 100644 new mode 100755 index 3f85137fd..c274eba3d --- a/etc/di.xml +++ b/etc/di.xml @@ -98,6 +98,9 @@ Nosto\Tagging\Console\Command\NostoGenerateCustomerReferenceCommand + + Nosto\Tagging\Console\Command\NostoClearQueueCommand + @@ -234,4 +237,10 @@ 500 + + + Magento\Framework\App\ResourceConnection + Magento\Framework\Amqp\Config + + From 26ee8ccaf11c268d94af2c384a2cc1eb939b0c22 Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Thu, 30 May 2024 12:23:46 +0200 Subject: [PATCH 02/18] Clear code --- Console/Command/NostoClearQueueCommand.php | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index 2457274e9..10f8c7f6b 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -41,7 +41,7 @@ class NostoClearQueueCommand extends Command public function __construct( ResourceConnection $resourceConnection, - Config $amqpConfig, + Config $amqpConfig ){ $this->resourceConnection = $resourceConnection; $this->amqpConfig = $amqpConfig; @@ -106,7 +106,7 @@ private function clearDBQueues(string $topicName, $io) try { // Emtyig DB tables. $this->clearQueueMessages($topicName, $connection); - $this->clearRelatedRecords($topicName, $connection, $io); + $this->clearRelatedRecords($topicName, $connection); $connection->commit(); } catch (\Exception $exception) { @@ -148,11 +148,10 @@ private function clearQueueMessages($topicName, $connection) * * @param $topicName * @param $connection - * @param $io * * @return void */ - private function clearRelatedRecords($topicName, $connection, $io) + private function clearRelatedRecords($topicName, $connection) { $magentoOperationTable = $this->resourceConnection->getTableName('magento_operation'); $magentoBulkTable = $this->resourceConnection->getTableName('magento_bulk'); From f733ad9d8906fc9b0b5b323967e621ef9bb2ec4c Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Thu, 30 May 2024 15:43:09 +0200 Subject: [PATCH 03/18] Formatting --- Console/Command/NostoClearQueueCommand.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index 10f8c7f6b..fbc91e16c 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -111,7 +111,10 @@ private function clearDBQueues(string $topicName, $io) $connection->commit(); } catch (\Exception $exception) { $connection->rollBack(); - $io->error('An error occurred while clearing DB queues for topic ' . $topicName . ': ' . $exception->getMessage()); + $io->error('An error occurred while clearing DB queues for topic ' + . $topicName . ': ' + . $exception->getMessage() + ); } } From a9f269a4aaac5f023da285fac5083ab880da1675 Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Thu, 30 May 2024 16:06:37 +0200 Subject: [PATCH 04/18] Formatting Code --- Console/Command/NostoClearQueueCommand.php | 52 ++++++++-------------- 1 file changed, 19 insertions(+), 33 deletions(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index fbc91e16c..7b8fe48b3 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -4,20 +4,11 @@ use Magento\Framework\Amqp\Config; use Magento\Framework\App\ResourceConnection; -use Magento\Framework\Bulk\BulkManagementInterface; -use Magento\Framework\MessageQueue\Publisher\Config\PublisherConfigItemInterface; use Nosto\NostoException; -use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Message\AMQPMessage; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\Console\Input\InputInterface; -use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use Magento\Framework\App\ObjectManager; -use Magento\Framework\App\Config\ScopeConfigInterface; -use Magento\Framework\App\Bootstrap; -use Magento\Framework\App\DeploymentConfig; class NostoClearQueueCommand extends Command { @@ -42,7 +33,7 @@ class NostoClearQueueCommand extends Command public function __construct( ResourceConnection $resourceConnection, Config $amqpConfig - ){ + ) { $this->resourceConnection = $resourceConnection; $this->amqpConfig = $amqpConfig; @@ -58,7 +49,7 @@ protected function configure() parent::configure(); } - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); @@ -82,7 +73,7 @@ protected function execute(InputInterface $input, OutputInterface $output) * @param SymfonyStyle $io * @return void */ - private function clearQueue(string $topicName, SymfonyStyle $io) + private function clearQueue(string $topicName, SymfonyStyle $io): void { $this->clearRabbitMQQueue($topicName, $io); $this->clearDBQueues($topicName, $io); @@ -92,11 +83,10 @@ private function clearQueue(string $topicName, SymfonyStyle $io) * Clear DB. * * @param string $topicName - * @param $io - * + * @param SymfonyStyle $io * @return void */ - private function clearDBQueues(string $topicName, $io) + private function clearDBQueues(string $topicName, SymfonyStyle $io): void { // Get connection. $connection = $this->resourceConnection->getConnection(); @@ -104,7 +94,7 @@ private function clearDBQueues(string $topicName, $io) // Start DB transaction. $connection->beginTransaction(); try { - // Emtyig DB tables. + // Emptying DB tables. $this->clearQueueMessages($topicName, $connection); $this->clearRelatedRecords($topicName, $connection); @@ -119,14 +109,13 @@ private function clearDBQueues(string $topicName, $io) } /** - * Emtying queue message tables. + * Emptying queue message tables. * - * @param $topicName + * @param string $topicName * @param $connection - * * @return void */ - private function clearQueueMessages($topicName, $connection) + private function clearQueueMessages(string $topicName, $connection): void { $queueMessageTable = $this->resourceConnection->getTableName('queue_message'); $queueMessageStatusTable = $this->resourceConnection->getTableName('queue_message_status'); @@ -147,14 +136,13 @@ private function clearQueueMessages($topicName, $connection) } /** - * Emtying related tables. + * Emptying related tables. * - * @param $topicName + * @param string $topicName * @param $connection - * * @return void */ - private function clearRelatedRecords($topicName, $connection) + private function clearRelatedRecords(string $topicName, $connection): void { $magentoOperationTable = $this->resourceConnection->getTableName('magento_operation'); $magentoBulkTable = $this->resourceConnection->getTableName('magento_bulk'); @@ -175,14 +163,13 @@ private function clearRelatedRecords($topicName, $connection) } /** - * Clear RabbitMq Queues by name. - * - * @param $queueName - * @param $io + * Clear RabbitMq Queues by name. * + * @param string $queueName + * @param SymfonyStyle $io * @return void */ - private function clearRabbitMQQueue($queueName, $io) + private function clearRabbitMQQueue(string $queueName, SymfonyStyle $io): void { try { // Get RabbitMq channel. @@ -203,13 +190,12 @@ private function clearRabbitMQQueue($queueName, $io) * Check queue exist. * * @param $channel - * @param $queueName - * + * @param string $queueName * @return bool */ - protected function queueExists($channel, $queueName) + protected function queueExists($channel, string $queueName): bool { - $queueInfo = $channel->queue_declare($queueName, passive: true); + $queueInfo = $channel->queue_declare($queueName, true); return !empty($queueInfo); } From af5452696eb73da79f7cb3c14038b23f8883a610 Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Thu, 30 May 2024 16:09:56 +0200 Subject: [PATCH 05/18] Formatting Code --- Console/Command/NostoClearQueueCommand.php | 3 --- 1 file changed, 3 deletions(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index 7b8fe48b3..5467c78fa 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -36,7 +36,6 @@ public function __construct( ) { $this->resourceConnection = $resourceConnection; $this->amqpConfig = $amqpConfig; - parent::__construct(); } @@ -45,7 +44,6 @@ protected function configure() // Define command name. $this->setName('nosto:clear:queue') ->setDescription('Clear all message queues for Nosto product sync topics.'); - parent::configure(); } @@ -97,7 +95,6 @@ private function clearDBQueues(string $topicName, SymfonyStyle $io): void // Emptying DB tables. $this->clearQueueMessages($topicName, $connection); $this->clearRelatedRecords($topicName, $connection); - $connection->commit(); } catch (\Exception $exception) { $connection->rollBack(); From 378dc83355e43f2eb539be4c896357eab7d247ff Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Fri, 31 May 2024 09:47:01 +0200 Subject: [PATCH 06/18] Formatting Code --- Console/Command/NostoClearQueueCommand.php | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index 5467c78fa..4b3e04c0a 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -2,9 +2,11 @@ namespace Nosto\Tagging\Console\Command; +use Exception; use Magento\Framework\Amqp\Config; use Magento\Framework\App\ResourceConnection; use Nosto\NostoException; +use RuntimeException; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\Console\Input\InputInterface; @@ -23,12 +25,12 @@ class NostoClearQueueCommand extends Command /** * @var Config */ - private $amqpConfig; + private Config $amqpConfig; /** * @var ResourceConnection */ - private $resourceConnection; + private ResourceConnection $resourceConnection; public function __construct( ResourceConnection $resourceConnection, @@ -96,7 +98,7 @@ private function clearDBQueues(string $topicName, SymfonyStyle $io): void $this->clearQueueMessages($topicName, $connection); $this->clearRelatedRecords($topicName, $connection); $connection->commit(); - } catch (\Exception $exception) { + } catch (Exception $exception) { $connection->rollBack(); $io->error('An error occurred while clearing DB queues for topic ' . $topicName . ': ' @@ -176,10 +178,10 @@ private function clearRabbitMQQueue(string $queueName, SymfonyStyle $io): void if ($this->queueExists($channel, $queueName)) { $channel->queue_purge($queueName); } - } catch (\Exception $e) { + } catch (Exception $e) { // Log the error or handle it as required. $io->error('An error occurred while clearing RabbitMQ queue ' . $queueName . ': ' . $e->getMessage()); - throw new \RuntimeException('Failed to clear RabbitMQ queue: ' . $e->getMessage()); + throw new RuntimeException('Failed to clear RabbitMQ queue: ' . $e->getMessage()); } } From da798c14f7acfc0fe51c2c7ba2a62fe50d91ca7d Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Tue, 4 Jun 2024 15:42:13 +0300 Subject: [PATCH 07/18] Drop MySQL support for message queueing --- etc/queue.xml | 4 ++-- etc/queue_consumer.xml | 4 ++-- etc/queue_publisher.xml | 4 ++-- etc/queue_topology.xml | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/etc/queue.xml b/etc/queue.xml index 95666bc8a..ea8724a15 100644 --- a/etc/queue.xml +++ b/etc/queue.xml @@ -35,13 +35,13 @@ --> - + - + diff --git a/etc/queue_publisher.xml b/etc/queue_publisher.xml index 0d044bc71..65ed577fd 100644 --- a/etc/queue_publisher.xml +++ b/etc/queue_publisher.xml @@ -36,9 +36,9 @@ - + - + diff --git a/etc/queue_topology.xml b/etc/queue_topology.xml index 8f70a8190..0ef798cc6 100644 --- a/etc/queue_topology.xml +++ b/etc/queue_topology.xml @@ -35,7 +35,7 @@ --> - + From 4806432600010b4c8b6d2e023849fe6e275f1a39 Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Tue, 4 Jun 2024 15:45:46 +0300 Subject: [PATCH 08/18] Rename default exchange name to magento-amqp --- etc/queue.xml | 4 ++-- etc/queue_publisher.xml | 4 ++-- etc/queue_topology.xml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/etc/queue.xml b/etc/queue.xml index ea8724a15..f8ad78b09 100644 --- a/etc/queue.xml +++ b/etc/queue.xml @@ -35,13 +35,13 @@ --> - + - + - + - + diff --git a/etc/queue_topology.xml b/etc/queue_topology.xml index 0ef798cc6..2b929aa7b 100644 --- a/etc/queue_topology.xml +++ b/etc/queue_topology.xml @@ -35,7 +35,7 @@ --> - + From d2f8f9a702a0526be09f7bc349d3cada2add046e Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Wed, 5 Jun 2024 12:08:51 +0200 Subject: [PATCH 09/18] Tidying code, clearing only rabbitmq queues --- Console/Command/NostoClearQueueCommand.php | 163 +++++---------------- etc/di.xml | 1 - 2 files changed, 37 insertions(+), 127 deletions(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index 4b3e04c0a..f6188daf8 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -5,8 +5,11 @@ use Exception; use Magento\Framework\Amqp\Config; use Magento\Framework\App\ResourceConnection; +use Magento\Framework\Bulk\BulkManagementInterface; +use Magento\Framework\DB\Adapter\Pdo\Mysql\Interceptor; use Nosto\NostoException; use RuntimeException; +use PhpAmqpLib\Channel\AMQPChannel; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\Console\Input\InputInterface; @@ -15,52 +18,62 @@ class NostoClearQueueCommand extends Command { /** - * Nosto Queues + * Nosto Product Sync Update label. + * + * @var string */ - private const QUEUE_TOPICS = [ - 'nosto_product_sync.update', - 'nosto_product_sync.delete' - ]; + public const NOSTO_UPDATE_SYNC_MESSAGE_QUEUE = 'nosto_product_sync.update'; /** - * @var Config + * Nosto Product Sync Delete label. + * + * @var string */ - private Config $amqpConfig; + public const NOSTO_DELETE_MESSAGE_QUEUE = 'nosto_product_sync.delete'; /** - * @var ResourceConnection + * @var Config */ - private ResourceConnection $resourceConnection; + private Config $amqpConfig; public function __construct( - ResourceConnection $resourceConnection, Config $amqpConfig ) { - $this->resourceConnection = $resourceConnection; $this->amqpConfig = $amqpConfig; parent::__construct(); } + /** + * Configure the command and the arguments + */ protected function configure() { // Define command name. - $this->setName('nosto:clear:queue') + $this->setName('nosto:clear:messagequeue') ->setDescription('Clear all message queues for Nosto product sync topics.'); parent::configure(); } + /** + * @inheritDoc + */ protected function execute(InputInterface $input, OutputInterface $output): int { $io = new SymfonyStyle($input, $output); try { - foreach (self::QUEUE_TOPICS as $topicName) { - $this->clearQueue($topicName, $io); + $queues = [ + self::NOSTO_DELETE_MESSAGE_QUEUE, + self::NOSTO_UPDATE_SYNC_MESSAGE_QUEUE, + ]; + + foreach ($queues as $queueName) { + $this->clearQueue($queueName, $io); } $io->success('Successfully cleared message queues.'); return 0; - } catch (NostoException $e) { + } catch (RuntimeException $e) { $io->error('An error occurred while clearing message queues: ' . $e->getMessage()); return 1; } @@ -69,130 +82,28 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** * Clear MySql and RabbitMq queues by name. * - * @param string $topicName - * @param SymfonyStyle $io - * @return void - */ - private function clearQueue(string $topicName, SymfonyStyle $io): void - { - $this->clearRabbitMQQueue($topicName, $io); - $this->clearDBQueues($topicName, $io); - } - - /** - * Clear DB. - * - * @param string $topicName - * @param SymfonyStyle $io - * @return void - */ - private function clearDBQueues(string $topicName, SymfonyStyle $io): void - { - // Get connection. - $connection = $this->resourceConnection->getConnection(); - - // Start DB transaction. - $connection->beginTransaction(); - try { - // Emptying DB tables. - $this->clearQueueMessages($topicName, $connection); - $this->clearRelatedRecords($topicName, $connection); - $connection->commit(); - } catch (Exception $exception) { - $connection->rollBack(); - $io->error('An error occurred while clearing DB queues for topic ' - . $topicName . ': ' - . $exception->getMessage() - ); - } - } - - /** - * Emptying queue message tables. - * - * @param string $topicName - * @param $connection - * @return void - */ - private function clearQueueMessages(string $topicName, $connection): void - { - $queueMessageTable = $this->resourceConnection->getTableName('queue_message'); - $queueMessageStatusTable = $this->resourceConnection->getTableName('queue_message_status'); - - // Get all IDs from "queue_message" table. - $select = $connection->select() - ->from($queueMessageTable, ['id']) - ->where('topic_name = ?', $topicName); - $messageIds = $connection->fetchCol($select); - - // Delete related records from "queue_message_status" table. - if (!empty($messageIds)) { - $connection->delete($queueMessageStatusTable, ['message_id IN (?)' => $messageIds]); - } - - // Delete records from "queue_message" table. - $connection->delete($queueMessageTable, ['topic_name = ?' => $topicName]); - } - - /** - * Emptying related tables. - * - * @param string $topicName - * @param $connection - * @return void - */ - private function clearRelatedRecords(string $topicName, $connection): void - { - $magentoOperationTable = $this->resourceConnection->getTableName('magento_operation'); - $magentoBulkTable = $this->resourceConnection->getTableName('magento_bulk'); - - // Get all IDs from "magento_operation" table. - $selectBulkUuids = $connection->select() - ->from($magentoOperationTable, ['bulk_uuid']) - ->where('topic_name = ?', $topicName); - $bulkUuids = $connection->fetchCol($selectBulkUuids); - - // Delete related records from "magento_bulk" table. - if (!empty($bulkUuids)) { - $connection->delete($magentoBulkTable, ['uuid IN (?)' => $bulkUuids]); - } - - // Delete records from "magento_operation" table. - $connection->delete($magentoOperationTable, ['topic_name = ?' => $topicName]); - } - - /** - * Clear RabbitMq Queues by name. - * * @param string $queueName - * @param SymfonyStyle $io * @return void */ - private function clearRabbitMQQueue(string $queueName, SymfonyStyle $io): void + private function clearQueue(string $queueName): void { - try { - // Get RabbitMq channel. - $channel = $this->amqpConfig->getChannel(); + // Get RabbitMq channel. + $channel = $this->amqpConfig->getChannel(); - // Empty queue if queue exists. - if ($this->queueExists($channel, $queueName)) { - $channel->queue_purge($queueName); - } - } catch (Exception $e) { - // Log the error or handle it as required. - $io->error('An error occurred while clearing RabbitMQ queue ' . $queueName . ': ' . $e->getMessage()); - throw new RuntimeException('Failed to clear RabbitMQ queue: ' . $e->getMessage()); + // Empty queue if queue exists. + if ($this->queueExists($channel, $queueName)) { + $channel->queue_purge($queueName); } } /** - * Check queue exist. + * Check the expected queue exist. * - * @param $channel + * @param AMQPChannel $channel * @param string $queueName * @return bool */ - protected function queueExists($channel, string $queueName): bool + protected function queueExists(AMQPChannel $channel, string $queueName): bool { $queueInfo = $channel->queue_declare($queueName, true); diff --git a/etc/di.xml b/etc/di.xml index c274eba3d..91e2a8564 100755 --- a/etc/di.xml +++ b/etc/di.xml @@ -239,7 +239,6 @@ - Magento\Framework\App\ResourceConnection Magento\Framework\Amqp\Config From 570a839e5d39745008ec19fe336c76d2cd4aa276 Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Wed, 5 Jun 2024 12:12:28 +0200 Subject: [PATCH 10/18] Remove unused variable --- Console/Command/NostoClearQueueCommand.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index f6188daf8..a3b79cd35 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -68,7 +68,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int ]; foreach ($queues as $queueName) { - $this->clearQueue($queueName, $io); + $this->clearQueue($queueName); } $io->success('Successfully cleared message queues.'); From f23fdcfd5f27f6a64971f86e6f399e957f82e828 Mon Sep 17 00:00:00 2001 From: Ugljesa Zivaljevic Date: Wed, 5 Jun 2024 14:57:38 +0200 Subject: [PATCH 11/18] Fixing for CI --- Console/Command/NostoClearQueueCommand.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index a3b79cd35..651820a18 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -3,7 +3,7 @@ namespace Nosto\Tagging\Console\Command; use Exception; -use Magento\Framework\Amqp\Config; +use Magento\Framework\Amqp\Config as AmqpConfig; use Magento\Framework\App\ResourceConnection; use Magento\Framework\Bulk\BulkManagementInterface; use Magento\Framework\DB\Adapter\Pdo\Mysql\Interceptor; @@ -32,12 +32,12 @@ class NostoClearQueueCommand extends Command public const NOSTO_DELETE_MESSAGE_QUEUE = 'nosto_product_sync.delete'; /** - * @var Config + * @var AmqpConfig */ - private Config $amqpConfig; + private AmqpConfig $amqpConfig; public function __construct( - Config $amqpConfig + AmqpConfig $amqpConfig ) { $this->amqpConfig = $amqpConfig; parent::__construct(); From f7c2eca44ffe0f8e6652f6421cfa3e2d72158fbd Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Thu, 6 Jun 2024 17:36:34 +0300 Subject: [PATCH 12/18] Use Magento\Framework\MessageQueue to clear message queues --- Console/Command/NostoClearQueueCommand.php | 90 +++++++++++----------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/Console/Command/NostoClearQueueCommand.php b/Console/Command/NostoClearQueueCommand.php index 651820a18..87febbca3 100644 --- a/Console/Command/NostoClearQueueCommand.php +++ b/Console/Command/NostoClearQueueCommand.php @@ -2,14 +2,10 @@ namespace Nosto\Tagging\Console\Command; -use Exception; -use Magento\Framework\Amqp\Config as AmqpConfig; -use Magento\Framework\App\ResourceConnection; -use Magento\Framework\Bulk\BulkManagementInterface; -use Magento\Framework\DB\Adapter\Pdo\Mysql\Interceptor; -use Nosto\NostoException; +use Magento\Framework\Exception\LocalizedException; +use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfig; +use Magento\Framework\MessageQueue\QueueRepository; use RuntimeException; -use PhpAmqpLib\Channel\AMQPChannel; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\Console\Input\InputInterface; @@ -32,14 +28,32 @@ class NostoClearQueueCommand extends Command public const NOSTO_DELETE_MESSAGE_QUEUE = 'nosto_product_sync.delete'; /** - * @var AmqpConfig + * @var ConsumerConfig */ - private AmqpConfig $amqpConfig; + private $consumerConfig; + /** + * @var QueueRepository + */ + private $queueRepository; + + private array $consumers = [ + self::NOSTO_DELETE_MESSAGE_QUEUE, + self::NOSTO_UPDATE_SYNC_MESSAGE_QUEUE, + ]; + + /** + * NostoClearQueueCommand constructor. + * + * @param ConsumerConfig $consumerConfig + * @param QueueRepository $queueRepository + */ public function __construct( - AmqpConfig $amqpConfig + ConsumerConfig $consumerConfig, + QueueRepository $queueRepository ) { - $this->amqpConfig = $amqpConfig; + $this->consumerConfig = $consumerConfig; + $this->queueRepository = $queueRepository; parent::__construct(); } @@ -48,8 +62,7 @@ public function __construct( */ protected function configure() { - // Define command name. - $this->setName('nosto:clear:messagequeue') + $this->setName('nosto:clear:message-queue') ->setDescription('Clear all message queues for Nosto product sync topics.'); parent::configure(); } @@ -62,51 +75,36 @@ protected function execute(InputInterface $input, OutputInterface $output): int $io = new SymfonyStyle($input, $output); try { - $queues = [ - self::NOSTO_DELETE_MESSAGE_QUEUE, - self::NOSTO_UPDATE_SYNC_MESSAGE_QUEUE, - ]; - - foreach ($queues as $queueName) { - $this->clearQueue($queueName); + foreach ($this->consumers as $queueName) { + $this->clearQueue($io, $queueName); } - $io->success('Successfully cleared message queues.'); - return 0; - } catch (RuntimeException $e) { + } catch (RuntimeException|LocalizedException $e) { $io->error('An error occurred while clearing message queues: ' . $e->getMessage()); return 1; } + return 0; } /** - * Clear MySql and RabbitMq queues by name. + * Clear message queues by consumer name. * - * @param string $queueName + * @param SymfonyStyle $io + * @param string $consumerName * @return void + * @throws LocalizedException */ - private function clearQueue(string $queueName): void + private function clearQueue(SymfonyStyle $io, string $consumerName): void { - // Get RabbitMq channel. - $channel = $this->amqpConfig->getChannel(); - - // Empty queue if queue exists. - if ($this->queueExists($channel, $queueName)) { - $channel->queue_purge($queueName); + $io->writeln(sprintf('Clearing messages from %s', $consumerName)); + $io->createProgressBar(); + $io->progressStart(); + $consumerConfig = $this->consumerConfig->getConsumer($consumerName); + $queue = $this->queueRepository->get($consumerConfig->getConnection(), $consumerConfig->getQueue()); + while ($message = $queue->dequeue()) { + $io->progressAdvance(1); + $queue->acknowledge($message); } - } - - /** - * Check the expected queue exist. - * - * @param AMQPChannel $channel - * @param string $queueName - * @return bool - */ - protected function queueExists(AMQPChannel $channel, string $queueName): bool - { - $queueInfo = $channel->queue_declare($queueName, true); - - return !empty($queueInfo); + $io->progressFinish(); } } From dda0870a4a00cfa32929414bc5565b89d841a90f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 10 Jun 2024 21:51:16 +0000 Subject: [PATCH 13/18] Bump composer/composer from 2.2.23 to 2.2.24 Bumps [composer/composer](https://github.com/composer/composer) from 2.2.23 to 2.2.24. - [Release notes](https://github.com/composer/composer/releases) - [Changelog](https://github.com/composer/composer/blob/2.2.24/CHANGELOG.md) - [Commits](https://github.com/composer/composer/compare/2.2.23...2.2.24) --- updated-dependencies: - dependency-name: composer/composer dependency-type: indirect ... Signed-off-by: dependabot[bot] --- composer.lock | 51 +++++++++++++++++++++++++++------------------------ 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/composer.lock b/composer.lock index 3510127d3..db586637f 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "9dcdb02accf946756f26a0c83c6deb7a", + "content-hash": "c08b01fdc7786f36fb6b4536c44379ea", "packages": [ { "name": "brick/math", @@ -284,16 +284,16 @@ }, { "name": "composer/composer", - "version": "2.2.23", + "version": "2.2.24", "source": { "type": "git", "url": "https://github.com/composer/composer.git", - "reference": "d1542e89636abf422fde328cb28d53752efb69e5" + "reference": "91d9d38ebc274267f952ee1fd3892dc7962075f4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/composer/composer/zipball/d1542e89636abf422fde328cb28d53752efb69e5", - "reference": "d1542e89636abf422fde328cb28d53752efb69e5", + "url": "https://api.github.com/repos/composer/composer/zipball/91d9d38ebc274267f952ee1fd3892dc7962075f4", + "reference": "91d9d38ebc274267f952ee1fd3892dc7962075f4", "shasum": "" }, "require": { @@ -363,7 +363,7 @@ "support": { "irc": "ircs://irc.libera.chat:6697/composer", "issues": "https://github.com/composer/composer/issues", - "source": "https://github.com/composer/composer/tree/2.2.23" + "source": "https://github.com/composer/composer/tree/2.2.24" }, "funding": [ { @@ -379,7 +379,7 @@ "type": "tidelift" } ], - "time": "2024-02-08T14:08:53+00:00" + "time": "2024-06-10T20:51:52+00:00" }, { "name": "composer/metadata-minifier", @@ -1128,12 +1128,12 @@ "version": "v5.2.13", "source": { "type": "git", - "url": "https://github.com/justinrainbow/json-schema.git", + "url": "https://github.com/jsonrainbow/json-schema.git", "reference": "fbbe7e5d79f618997bc3332a6f49246036c45793" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/justinrainbow/json-schema/zipball/fbbe7e5d79f618997bc3332a6f49246036c45793", + "url": "https://api.github.com/repos/jsonrainbow/json-schema/zipball/fbbe7e5d79f618997bc3332a6f49246036c45793", "reference": "fbbe7e5d79f618997bc3332a6f49246036c45793", "shasum": "" }, @@ -1188,8 +1188,8 @@ "schema" ], "support": { - "issues": "https://github.com/justinrainbow/json-schema/issues", - "source": "https://github.com/justinrainbow/json-schema/tree/v5.2.13" + "issues": "https://github.com/jsonrainbow/json-schema/issues", + "source": "https://github.com/jsonrainbow/json-schema/tree/v5.2.13" }, "time": "2023-09-26T02:20:38+00:00" }, @@ -4702,16 +4702,16 @@ }, { "name": "symfony/filesystem", - "version": "v5.4.38", + "version": "v5.4.40", "source": { "type": "git", "url": "https://github.com/symfony/filesystem.git", - "reference": "899330a01056077271e2f614c7b28b0379a671eb" + "reference": "26dd9912df6940810ea00f8f53ad48d6a3424995" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/filesystem/zipball/899330a01056077271e2f614c7b28b0379a671eb", - "reference": "899330a01056077271e2f614c7b28b0379a671eb", + "url": "https://api.github.com/repos/symfony/filesystem/zipball/26dd9912df6940810ea00f8f53ad48d6a3424995", + "reference": "26dd9912df6940810ea00f8f53ad48d6a3424995", "shasum": "" }, "require": { @@ -4720,6 +4720,9 @@ "symfony/polyfill-mbstring": "~1.8", "symfony/polyfill-php80": "^1.16" }, + "require-dev": { + "symfony/process": "^5.4|^6.4" + }, "type": "library", "autoload": { "psr-4": { @@ -4746,7 +4749,7 @@ "description": "Provides basic utilities for the filesystem", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/filesystem/tree/v5.4.38" + "source": "https://github.com/symfony/filesystem/tree/v5.4.40" }, "funding": [ { @@ -4762,20 +4765,20 @@ "type": "tidelift" } ], - "time": "2024-03-21T08:05:07+00:00" + "time": "2024-05-31T14:33:22+00:00" }, { "name": "symfony/finder", - "version": "v5.4.35", + "version": "v5.4.40", "source": { "type": "git", "url": "https://github.com/symfony/finder.git", - "reference": "abe6d6f77d9465fed3cd2d029b29d03b56b56435" + "reference": "f51cff4687547641c7d8180d74932ab40b2205ce" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/finder/zipball/abe6d6f77d9465fed3cd2d029b29d03b56b56435", - "reference": "abe6d6f77d9465fed3cd2d029b29d03b56b56435", + "url": "https://api.github.com/repos/symfony/finder/zipball/f51cff4687547641c7d8180d74932ab40b2205ce", + "reference": "f51cff4687547641c7d8180d74932ab40b2205ce", "shasum": "" }, "require": { @@ -4809,7 +4812,7 @@ "description": "Finds files and directories via an intuitive fluent interface", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/finder/tree/v5.4.35" + "source": "https://github.com/symfony/finder/tree/v5.4.40" }, "funding": [ { @@ -4825,7 +4828,7 @@ "type": "tidelift" } ], - "time": "2024-01-23T13:51:25+00:00" + "time": "2024-05-31T14:33:22+00:00" }, { "name": "symfony/http-client-contracts", @@ -12392,5 +12395,5 @@ "ext-json": "*" }, "platform-dev": [], - "plugin-api-version": "2.1.0" + "plugin-api-version": "2.6.0" } From a60a9b723a4179e78447d1382cb7fddec3f36a67 Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Tue, 11 Jun 2024 14:25:39 +0300 Subject: [PATCH 14/18] Bump version && update changelog --- CHANGELOG.md | 4 ++++ composer.json | 2 +- etc/module.xml | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1245c1d07..67bb59094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning. +### 7.6.0 +* Deprecate MySQL as message queue provider, the extension now uses RabbitMQ as the default message queue provider +* Add command to clear the Nosto message queue + ### 7.5.1 * Upgrade SDK to 7.4 * Remove PII information from Order Export diff --git a/composer.json b/composer.json index 1adf6dbfc..4a128c076 100644 --- a/composer.json +++ b/composer.json @@ -2,7 +2,7 @@ "name": "nosto/module-nostotagging", "description": "Increase your conversion rate and average order value by delivering your customers personalized product recommendations throughout their shopping journey.", "type": "magento2-module", - "version": "7.5.1", + "version": "7.6.0", "require-dev": { "phpmd/phpmd": "^2.5", "sebastian/phpcpd": "*", diff --git a/etc/module.xml b/etc/module.xml index 933a24fbf..d7070be90 100755 --- a/etc/module.xml +++ b/etc/module.xml @@ -37,5 +37,5 @@ - + From 2dbed625ce88ca151fba0ee8b5ad4e0378fc5ee8 Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Thu, 13 Jun 2024 11:01:51 +0300 Subject: [PATCH 15/18] di.xml remove unused param --- etc/di.xml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) mode change 100755 => 100644 etc/di.xml diff --git a/etc/di.xml b/etc/di.xml old mode 100755 new mode 100644 index 91e2a8564..1084b6ff8 --- a/etc/di.xml +++ b/etc/di.xml @@ -237,9 +237,4 @@ 500 - - - Magento\Framework\Amqp\Config - - - + \ No newline at end of file From 311abd46406bcc13d674c28aa8d8dd34765d50b3 Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Thu, 13 Jun 2024 13:25:30 +0300 Subject: [PATCH 16/18] Move logs to own Nosto files --- Logger/DebugHandler.php | 46 ++++++++++++++++++++++++++++++++++++++++ Logger/SystemHandler.php | 46 ++++++++++++++++++++++++++++++++++++++++ etc/di.xml | 4 ++-- 3 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 Logger/DebugHandler.php create mode 100644 Logger/SystemHandler.php diff --git a/Logger/DebugHandler.php b/Logger/DebugHandler.php new file mode 100644 index 000000000..64d9568b8 --- /dev/null +++ b/Logger/DebugHandler.php @@ -0,0 +1,46 @@ + + * @copyright 2020 Nosto Solutions Ltd + * @license http://opensource.org/licenses/BSD-3-Clause BSD 3-Clause + * + */ + +namespace Nosto\Tagging\Logger; + +use Magento\Framework\Logger\Handler\Base; +use Monolog\Logger; + +class DebugHandler extends Base +{ + protected $fileName = '/var/log/nosto-debug.log'; + protected $loggerType = Logger::DEBUG; +} diff --git a/Logger/SystemHandler.php b/Logger/SystemHandler.php new file mode 100644 index 000000000..2c3e961b6 --- /dev/null +++ b/Logger/SystemHandler.php @@ -0,0 +1,46 @@ + + * @copyright 2020 Nosto Solutions Ltd + * @license http://opensource.org/licenses/BSD-3-Clause BSD 3-Clause + * + */ + +namespace Nosto\Tagging\Logger; + +use Magento\Framework\Logger\Handler\Base; +use Monolog\Logger; + +class SystemHandler extends Base +{ + protected $fileName = '/var/log/nosto-system.log'; + protected $loggerType = Logger::INFO; +} diff --git a/etc/di.xml b/etc/di.xml index 91e2a8564..66a6e2189 100755 --- a/etc/di.xml +++ b/etc/di.xml @@ -76,8 +76,8 @@ nosto - Magento\Framework\Logger\Handler\System - Magento\Framework\Logger\Handler\Debug + Nosto\Tagging\Logger\SystemHandler + Nosto\Tagging\Logger\DebugHandler From 9a01f93f97edcddd56fc245c80e645a534e12ea1 Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Thu, 13 Jun 2024 15:00:18 +0300 Subject: [PATCH 17/18] Log exceptions to nosto-exception.log file --- CHANGELOG.md | 1 + Logger/ExceptionHandler.php | 46 +++++++++++++++++++++++++++++++++++++ etc/di.xml | 1 + 3 files changed, 48 insertions(+) create mode 100644 Logger/ExceptionHandler.php diff --git a/CHANGELOG.md b/CHANGELOG.md index 67bb59094..d2c235f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file. This projec ### 7.6.0 * Deprecate MySQL as message queue provider, the extension now uses RabbitMQ as the default message queue provider * Add command to clear the Nosto message queue +* Move Nosto logfiles to its own files nosto-debug.log(debug) and nosto-system.log(info) ### 7.5.1 * Upgrade SDK to 7.4 diff --git a/Logger/ExceptionHandler.php b/Logger/ExceptionHandler.php new file mode 100644 index 000000000..267facc47 --- /dev/null +++ b/Logger/ExceptionHandler.php @@ -0,0 +1,46 @@ + + * @copyright 2020 Nosto Solutions Ltd + * @license http://opensource.org/licenses/BSD-3-Clause BSD 3-Clause + * + */ + +namespace Nosto\Tagging\Logger; + +use Magento\Framework\Logger\Handler\Base; +use Monolog\Logger; + +class ExceptionHandler extends Base +{ + protected $fileName = '/var/log/nosto-exception.log'; + protected $loggerType = Logger::INFO; +} diff --git a/etc/di.xml b/etc/di.xml index 11a04303d..cccf6dccd 100644 --- a/etc/di.xml +++ b/etc/di.xml @@ -78,6 +78,7 @@ Nosto\Tagging\Logger\SystemHandler Nosto\Tagging\Logger\DebugHandler + Nosto\Tagging\Logger\ExceptionHandler From 1857550295baa221024693ed21fd144beaae8d50 Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Thu, 13 Jun 2024 15:01:50 +0300 Subject: [PATCH 18/18] Update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2c235f79..5908b10e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,7 @@ All notable changes to this project will be documented in this file. This projec ### 7.6.0 * Deprecate MySQL as message queue provider, the extension now uses RabbitMQ as the default message queue provider * Add command to clear the Nosto message queue -* Move Nosto logfiles to its own files nosto-debug.log(debug) and nosto-system.log(info) +* Move Nosto logfiles to its own files nosto-debug.log(debug), nosto-system.log(info) and nosto-exception.log(error) ### 7.5.1 * Upgrade SDK to 7.4