From e8269677ed6cd60ef6d570a7a8e22c2cc43d1bca Mon Sep 17 00:00:00 2001 From: Cid Lopes Date: Thu, 1 Aug 2024 22:32:05 +0300 Subject: [PATCH] WIP - Avoid collections during full reindex --- Model/Indexer/ProductIndexer.php | 16 +++ .../Magento/Product/ProductBatchFetcher.php | 104 ++++++++++++++++++ Model/Service/Update/ProductUpdateService.php | 23 ++++ 3 files changed, 143 insertions(+) create mode 100644 Model/ResourceModel/Magento/Product/ProductBatchFetcher.php diff --git a/Model/Indexer/ProductIndexer.php b/Model/Indexer/ProductIndexer.php index c6dc1e725..b4cf3a94f 100644 --- a/Model/Indexer/ProductIndexer.php +++ b/Model/Indexer/ProductIndexer.php @@ -48,6 +48,7 @@ use Nosto\Tagging\Model\Indexer\Dimensions\StoreDimensionProvider; use Nosto\Tagging\Model\ResourceModel\Magento\Product\Collection as ProductCollection; use Nosto\Tagging\Model\ResourceModel\Magento\Product\CollectionBuilder; +use Nosto\Tagging\Model\ResourceModel\Magento\Product\ProductBatchFetcher; use Nosto\Tagging\Model\Service\Indexer\IndexerStatusServiceInterface; use Nosto\Tagging\Model\Service\Update\ProductUpdateService; use Symfony\Component\Console\Input\InputInterface; @@ -69,6 +70,9 @@ class ProductIndexer extends AbstractIndexer /** @var ProductModeSwitcher */ private ProductModeSwitcher $modeSwitcher; + /** @var ProductBatchFetcher */ + private ProductBatchFetcher $productBatchFetcher; + /** * Invalidate constructor. * @param NostoHelperScope $nostoHelperScope @@ -87,6 +91,7 @@ public function __construct( ProductUpdateService $productUpdateService, NostoLogger $logger, CollectionBuilder $productCollectionBuilder, + ProductBatchFetcher $productBatchFetcher, ProductModeSwitcher $modeSwitcher, StoreDimensionProvider $dimensionProvider, Emulation $storeEmulation, @@ -96,6 +101,7 @@ public function __construct( ) { $this->productUpdateService = $productUpdateService; $this->productCollectionBuilder = $productCollectionBuilder; + $this->productBatchFetcher = $productBatchFetcher; $this->modeSwitcher = $modeSwitcher; parent::__construct( $nostoHelperScope, @@ -123,6 +129,16 @@ public function getModeSwitcher(): ModeSwitcherInterface */ public function doIndex(Store $store, array $ids = []) { + if (empty($ids)) { // Full reindexing + foreach ($this->productBatchFetcher->fetchProductIdBatches() as $productIdsBatch) { + $this->productUpdateService->addIdsToUpsertMessageQueue( + $productIdsBatch, + $store + ); + } + return; + } + $collection = $this->getCollection($store, $ids); $this->productUpdateService->addCollectionToUpdateMessageQueue( $collection, diff --git a/Model/ResourceModel/Magento/Product/ProductBatchFetcher.php b/Model/ResourceModel/Magento/Product/ProductBatchFetcher.php new file mode 100644 index 000000000..a7e4635b3 --- /dev/null +++ b/Model/ResourceModel/Magento/Product/ProductBatchFetcher.php @@ -0,0 +1,104 @@ +resourceConnection = $resourceConnection; + } + + public function fetchProductIdBatches() + { + $connection = $this->resourceConnection->getConnection(); + $tableName = $connection->getTableName('catalog_product_entity'); + $visibilityTable = $connection->getTableName('catalog_product_entity_int'); + $statusTable = $connection->getTableName('catalog_product_entity_int'); + + $offset = 0; + $this->startBenchmark(self::BENCHMARK_SYNC_NAME, self::BENCHMARK_SYNC_BREAKPOINT); + $visibility = $this->getAttributeId('visibility'); + $status = $this->getAttributeId('status'); + do { + $this->checkMemoryConsumption('indexer by ID product sync'); + $query = $connection->select() + ->from(['e' => $tableName], ['entity_id']) + ->join(['visibility' => $visibilityTable], 'e.entity_id = visibility.entity_id', []) + ->join(['status' => $statusTable], 'e.entity_id = status.entity_id', []) + ->where('visibility.attribute_id = ?', $visibility) + ->where('status.attribute_id = ?', $status) // @TODO: abstract those like in the collections + ->where('visibility.value != ?', ProductVisibility::VISIBILITY_NOT_VISIBLE) + ->where('status.value = ?', Status::STATUS_ENABLED) + ->limit(self::BATCH_SIZE, $offset); + + $results = $connection->fetchAll($query); + + if (count($results) === 0) { + break; + } + + $productIdsBatch = []; + foreach ($results as $row) { + $productIdsBatch[] = (int) $row['entity_id']; + } + + yield $productIdsBatch; + $this->tickBenchmark(self::BENCHMARK_SYNC_NAME, true); + $offset += self::BATCH_SIZE; + } while (count($results) > 0); + } + + /** + * @param string $attributeCode + * @return string + */ + protected function getAttributeId(string $attributeCode): string + { + $connection = $this->resourceConnection->getConnection(); + $attributeTable = $connection->getTableName('eav_attribute'); + + $query = $connection->select() + ->from($attributeTable, 'attribute_id') + ->where('attribute_code = ?', $attributeCode) + ->where('entity_type_id = ?', $this->getEntityTypeId('catalog_product')); + + return $connection->fetchOne($query); + } + + /** + * @param string $entityTypeCode + * @return string + */ + protected function getEntityTypeId(string $entityTypeCode): string + { + $connection = $this->resourceConnection->getConnection(); + $entityTypeTable = $connection->getTableName('eav_entity_type'); + + $query = $connection->select() + ->from($entityTypeTable, 'entity_type_id') + ->where('entity_type_code = ?', $entityTypeCode); + + return $connection->fetchOne($query); + } +} \ No newline at end of file diff --git a/Model/Service/Update/ProductUpdateService.php b/Model/Service/Update/ProductUpdateService.php index 665d286fa..c67f64efe 100644 --- a/Model/Service/Update/ProductUpdateService.php +++ b/Model/Service/Update/ProductUpdateService.php @@ -123,6 +123,29 @@ public function addCollectionToUpdateMessageQueue(ProductCollection $collection, } } + /** + * Sets the product ids into the update message queue + * + * @param array $productIds + * @param Store $store + */ + public function addIdsToUpsertMessageQueue(array $productIds, Store $store) + { + if ($this->getAccountHelper()->findAccount($store) === null) { + $this->logDebugWithStore('No nosto account found for the store', $store); + return; + } + $batchedIds = array_chunk($productIds, $this->batchSize); + $this->logDebug(sprintf( + "Adding queue message with %s products to upsert queue. Batch size is %s", + count($productIds), + $this->batchSize + )); + foreach ($batchedIds as $idBatch) { + $this->upsertBulkPublisher->execute($store->getId(), $idBatch); + } + } + /** * Sets the product ids into the delete message queue *