Skip to content

Commit

Permalink
symfony messenger for queue (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
benwalch committed Jan 9, 2024
1 parent ce9751f commit 3778acb
Show file tree
Hide file tree
Showing 20 changed files with 363 additions and 471 deletions.
5 changes: 4 additions & 1 deletion config/pimcore/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
imports:
- { resource: messenger.yaml }

doctrine_migrations:
migrations_paths:
'DynamicSearchBundle\Migrations': '@DynamicSearchBundle/src/Migrations'
Expand Down Expand Up @@ -28,4 +31,4 @@ monolog:
VERBOSITY_VERY_VERBOSE: DEBUG
VERBOSITY_DEBUG: DEBUG
channels: ['dynamic_search']
formatter: dynamic_search.log.formatter.console.provider
formatter: dynamic_search.log.formatter.console.provider
15 changes: 15 additions & 0 deletions config/pimcore/messenger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
parameters:
dynamic_search.queue.table_name: messenger_dynamic_search

framework:
messenger:
transports:
dynamic_search_async:
dsn: 'doctrine://default'
options:
table_name: '%dynamic_search.queue.table_name%'
routing:
DynamicSearchBundle\Queue\Message\QueueResourceMessage: dynamic_search_async
DynamicSearchBundle\Queue\Message\ProcessResourceMessage: dynamic_search_async
buses:
dynamic_search.bus: ~
4 changes: 0 additions & 4 deletions config/services/command.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,3 @@ services:
DynamicSearchBundle\Command\SearchCommand:
tags:
- { name: console.command }

DynamicSearchBundle\Command\QueuedDataCommand:
tags:
- { name: console.command }
4 changes: 3 additions & 1 deletion config/services/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ services:
DynamicSearchBundle\Manager\NormalizerManager: ~

DynamicSearchBundle\Manager\QueueManagerInterface: '@DynamicSearchBundle\Manager\QueueManager'
DynamicSearchBundle\Manager\QueueManager: ~
DynamicSearchBundle\Manager\QueueManager:
arguments:
$tableName: '%dynamic_search.queue.table_name%'
14 changes: 13 additions & 1 deletion config/services/queue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,20 @@ services:
autoconfigure: true
public: false

DynamicSearchBundle\Queue\MessageHandler\QueuedResourcesHandler:
arguments:
$messageBus: '@dynamic_search.bus'
tags:
- { name: messenger.message_handler, bus: dynamic_search.bus }

DynamicSearchBundle\Queue\MessageHandler\ProcessResourceHandler:
tags:
- { name: messenger.message_handler, bus: dynamic_search.bus }

DynamicSearchBundle\Queue\DataCollectorInterface: '@DynamicSearchBundle\Queue\DataCollector'
DynamicSearchBundle\Queue\DataCollector: ~
DynamicSearchBundle\Queue\DataCollector:
arguments:
$messageBus: '@dynamic_search.bus'

DynamicSearchBundle\Queue\DataProcessorInterface: '@DynamicSearchBundle\Queue\DataProcessor'
DynamicSearchBundle\Queue\DataProcessor: ~
31 changes: 0 additions & 31 deletions src/Command/QueuedDataCommand.php

This file was deleted.

2 changes: 1 addition & 1 deletion src/Context/ContextDefinitionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ interface ContextDefinitionInterface
/*
* Allowed dispatch types for queue
*/
public const ALLOWED_QUEUE_DISPATCH_TYPES = ['insert', 'update', 'delete'];
public const ALLOWED_QUEUE_DISPATCH_TYPES = ['index', 'insert', 'update', 'delete'];

public function getName(): string;

Expand Down
18 changes: 0 additions & 18 deletions src/EventListener/Maintenance/QueuedDataTask.php

This file was deleted.

43 changes: 17 additions & 26 deletions src/EventSubscriber/DataProcessingEventSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@
use DynamicSearchBundle\Logger\LoggerInterface;
use DynamicSearchBundle\Processor\ResourceModificationProcessorInterface;
use DynamicSearchBundle\Provider\DataProviderInterface;
use DynamicSearchBundle\Queue\DataCollectorInterface;
use DynamicSearchBundle\Queue\Message\ProcessResourceMessage;
use DynamicSearchBundle\Queue\Message\QueueResourceMessage;
use DynamicSearchBundle\Validator\ResourceValidatorInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\MessageBusInterface;

class DataProcessingEventSubscriber implements EventSubscriberInterface
{
public function __construct(
protected LoggerInterface $logger,
protected ContextDefinitionBuilderInterface $contextDefinitionBuilder,
protected ResourceModificationProcessorInterface $resourceModificationProcessor,
protected ResourceValidatorInterface $resourceValidator
protected DataCollectorInterface $dataCollector
) {
}

Expand All @@ -32,32 +36,19 @@ public function dispatchResourceModification(NewDataEvent $event): void
{
$contextDefinition = $this->contextDefinitionBuilder->buildContextDefinition($event->getContextName(), $event->getContextDispatchType());

try {
// validate and allow rewriting resource based on current data behaviour
$isImmutableResource = $event->getProviderBehaviour() === DataProviderInterface::PROVIDER_BEHAVIOUR_SINGLE_DISPATCH;
$resourceCandidate = $this->resourceValidator->validateResource($event->getContextName(), $event->getContextDispatchType(), false, $isImmutableResource, $event->getData());
} catch (\Throwable $e) {
$this->logger->error(
sprintf(
'Error while validate resource candidate: %s',
$e->getMessage()), $contextDefinition->getDataProviderName(), $event->getContextName()
);

return;
}

if ($resourceCandidate->getResource() === null) {
$this->logger->debug(
sprintf(
'Resource has been removed due to validation. Skipping...'),
$contextDefinition->getDataProviderName(), $contextDefinition->getName()
);

return;
}

if ($event->getProviderBehaviour() === DataProviderInterface::PROVIDER_BEHAVIOUR_FULL_DISPATCH) {
$this->resourceModificationProcessor->process($contextDefinition, $resourceCandidate->getResource());
// data collector add to queue
$this->dataCollector->addToContextQueue(
$contextDefinition->getName(),
$contextDefinition->getContextDispatchType(),
$event->getData(),
[
'resourceValidation' => [
'unknownResource' => false,
'immutableResource' => false
]
]
);
} elseif ($event->getProviderBehaviour() === DataProviderInterface::PROVIDER_BEHAVIOUR_SINGLE_DISPATCH) {
$this->resourceModificationProcessor->processByResourceMeta($contextDefinition, $event->getResourceMeta(), $event->getData());
} else {
Expand Down
146 changes: 12 additions & 134 deletions src/Manager/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,150 +2,28 @@

namespace DynamicSearchBundle\Manager;

use Doctrine\DBAL\Connection;
use DynamicSearchBundle\Logger\LoggerInterface;
use DynamicSearchBundle\Queue\Data\Envelope;
use Pimcore\Model\Tool\TmpStore;

class QueueManager implements QueueManagerInterface
{
public function __construct(protected LoggerInterface $logger)
{
}
public function __construct(
protected LoggerInterface $logger,
protected Connection $connection,
protected string $tableName
)
{}

public function clearQueue(): void
{
try {
$activeJobs = $this->getActiveJobs();
$this->logger->debug(sprintf('data queue cleared. Affected jobs: %d', count($activeJobs)), 'queue', 'maintenance');
foreach ($activeJobs as $envelope) {
TmpStore::delete($envelope->getId());
}
$stmt = sprintf('SELECT COUNT(*) FROM %s', $this->tableName);
$affectedRows = $this->connection->executeQuery($stmt)->fetchFirstColumn();
$sql = $this->connection->getDatabasePlatform()->getTruncateTableSQL($this->tableName);
$this->connection->executeStatement($sql);
$this->logger->debug(sprintf('data queue cleared. Affected jobs: %d', $affectedRows), 'queue', 'maintenance');
} catch (\Throwable $e) {
$this->logger->error(sprintf('Error while clearing queue. Message was: %s', $e->getMessage()), 'queue', 'maintenance');
}
}

public function getQueuedEnvelopes(): array
{
$jobs = $this->getActiveJobs();

$existingKeys = [];
$filteredResourceStack = [];

/*
* A resource can be added multiple times (saving an pimcore document 3 or more times in short intervals for example).
* Only the latest resource of its kind should be used in index processing to improve performance.
*
* Filter Jobs:
*
* -> first sort jobs by date (ASC) to receive latest entries first!
* -> create sub array for each context and dispatch type: stack[ context ][ dispatch_type ][]
* -> only add resource once per "context_name - document_id"
* -> only return [ resource_meta, corresponding envelope ]
*/

usort($jobs, static function (TmpStore $a, TmpStore $b) {

/** @var Envelope $envelopeA */
$envelopeA = $a->getData();
/** @var Envelope $envelopeB */
$envelopeB = $b->getData();

if ($envelopeA->getCreationTime() === $envelopeB->getCreationTime()) {
return 0;
}

return $envelopeA->getCreationTime() < $envelopeB->getCreationTime() ? 1 : -1;
});

/** @var TmpStore $job */
foreach ($jobs as $job) {

/** @var Envelope $envelope */
$envelope = $job->getData();
$contextName = $envelope->getContextName();
$dispatchType = $envelope->getDispatchType();
$resourceMetaStack = $envelope->getResourceMetaStack();

if (!isset($filteredResourceStack[$contextName])) {
$filteredResourceStack[$contextName] = [];
}

if (!isset($filteredResourceStack[$contextName][$dispatchType])) {
$filteredResourceStack[$contextName][$dispatchType] = [];
}

foreach ($resourceMetaStack as $resourceMeta) {
$key = sprintf('%s_%s', $contextName, $resourceMeta->getDocumentId());

if (in_array($key, $existingKeys, true)) {
continue;
}

$filteredResourceStack[$contextName][$dispatchType][] = [
'resourceMeta' => $resourceMeta,
'envelope' => $envelope
];

$existingKeys[] = $key;
}

$this->deleteEnvelope($envelope);
}

return $filteredResourceStack;
}

public function deleteEnvelope(Envelope $envelope): void
{
try {
TmpStore::delete($envelope->getId());
} catch (\Exception $e) {
$this->logger->error(sprintf('Could not delete queued job with id %s', $envelope->getId()), 'queue', $envelope->getContextName());
}
}

public function addJobToQueue(string $jobId, string $contextName, string $dispatchType, array $metaResources, array $options): void
{
$envelope = new Envelope($jobId, $contextName, $dispatchType, $metaResources, $options, microtime(true));

TmpStore::add($jobId, $envelope, self::QUEUE_IDENTIFIER);
}

public function hasActiveJobs(): bool
{
$activeJobs = TmpStore::getIdsByTag(self::QUEUE_IDENTIFIER);

return count($activeJobs) > 0;
}

public function getActiveJobs(): array
{
$activeJobs = TmpStore::getIdsByTag(self::QUEUE_IDENTIFIER);

$jobs = [];
foreach ($activeJobs as $processId) {
$process = $this->getJob($processId);
if (!$process instanceof TmpStore) {
continue;
}

$jobs[] = $process;
}

return $jobs;
}

protected function getJob(string $processId): ?TmpStore
{
$job = null;

try {
$job = TmpStore::get($processId);
} catch (\Exception $e) {
return null;
}

return $job;
}
}
18 changes: 0 additions & 18 deletions src/Manager/QueueManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,7 @@

namespace DynamicSearchBundle\Manager;

use DynamicSearchBundle\Queue\Data\Envelope;
use Pimcore\Model\Tool\TmpStore;

interface QueueManagerInterface
{
public const QUEUE_IDENTIFIER = 'dynamic_search_index_queue';

public function clearQueue(): void;

public function getQueuedEnvelopes(): array;

public function deleteEnvelope(Envelope $envelope): void;

public function addJobToQueue(string $jobId, string $contextName, string $dispatchType, array $metaResources, array $options): void;

public function hasActiveJobs(): bool;

/**
* @return array<int, TmpStore>
*/
public function getActiveJobs(): array;
}
Loading

0 comments on commit 3778acb

Please sign in to comment.