Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added: ability to choose different entity manager #1042

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ enqueue:
queue_name: ~
job:
enabled: false
default_mapping: true
async_events:
enabled: false
extensions:
Expand Down
3 changes: 3 additions & 0 deletions docs/bundle/job_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ enqueue:
# plus basic bundle configuration

job: true

# if you configure doctrine mapping yourself, disable default mapping
default_mapping: false
balabis marked this conversation as resolved.
Show resolved Hide resolved

doctrine:
# plus basic bundle configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ class DoctrineClearIdentityMapExtension implements MessageReceivedExtensionInter
*/
protected $registry;

/**
* @param ManagerRegistry $registry
*/
public function __construct(ManagerRegistry $registry)
{
$this->registry = $registry;
Expand Down
3 changes: 3 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ private function getJobConfiguration(): ArrayNodeDefinition
}

return (new ArrayNodeDefinition('job'))
->children()
->booleanNode('default_mapping')->defaultTrue()->end()
->end()
->addDefaultsIfNotSet()
->canBeEnabled()
;
Expand Down
8 changes: 8 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ private function registerJobQueueDoctrineEntityMapping(ContainerBuilder $contain
return;
}

foreach ($container->getExtensionConfig('enqueue') as $modules) {
foreach ($modules as $config) {
if (isset($config['job']) && false === $config['job']['default_mapping']) {
return;
}
}
}

foreach ($container->getExtensionConfig('doctrine') as $config) {
// do not register mappings if dbal not configured.
if (!empty($config['dbal'])) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/enqueue-bundle/Tests/Functional/App/AppKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ public function getLogDir()
return sys_get_temp_dir().'/EnqueueBundle/cache/logs';
}

/**
* @param \Symfony\Component\Config\Loader\LoaderInterface $loader
*/
public function registerContainerConfiguration(LoaderInterface $loader)
{
$loader->load(__DIR__.'/config/config.yml');
Expand Down
11 changes: 1 addition & 10 deletions pkg/job-queue/CalculateRootJobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@ class CalculateRootJobStatusService
*/
private $jobStorage;

/**
* @param JobStorage $jobStorage
*/
public function __construct(JobStorage $jobStorage)
{
$this->jobStorage = $jobStorage;
}

/**
* @param Job $job
*
* @return bool true if root job was stopped
*/
public function calculate(Job $job)
Expand Down Expand Up @@ -91,11 +86,7 @@ protected function calculateRootJobStatus(array $jobs)
$success++;
break;
default:
throw new \LogicException(sprintf(
'Got unsupported job status: id: "%s" status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Got unsupported job status: id: "%s" status: "%s"', $job->getId(), $job->getStatus()));
}
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/job-queue/DependentJobContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ class DependentJobContext
*/
private $dependentJobs;

/**
* @param Job $job
*/
public function __construct(Job $job)
{
$this->job = $job;
Expand Down
5 changes: 0 additions & 5 deletions pkg/job-queue/DependentJobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ class DependentJobProcessor implements Processor, TopicSubscriberInterface
*/
private $logger;

/**
* @param JobStorage $jobStorage
* @param ProducerInterface $producer
* @param LoggerInterface $logger
*/
public function __construct(JobStorage $jobStorage, ProducerInterface $producer, LoggerInterface $logger)
{
$this->jobStorage = $jobStorage;
Expand Down
10 changes: 1 addition & 9 deletions pkg/job-queue/DependentJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,17 @@ public function __construct(JobStorage $jobStorage)
}

/**
* @param Job $job
*
* @return DependentJobContext
*/
public function createDependentJobContext(Job $job)
{
return new DependentJobContext($job);
}

/**
* @param DependentJobContext $context
*/
public function saveDependentJob(DependentJobContext $context)
{
if (!$context->getJob()->isRoot()) {
throw new \LogicException(sprintf(
'Only root jobs allowed but got child. jobId: "%s"',
$context->getJob()->getId()
));
throw new \LogicException(sprintf('Only root jobs allowed but got child. jobId: "%s"', $context->getJob()->getId()));
}

$this->jobStorage->saveJob($context->getJob(), function (Job $job) use ($context) {
Expand Down
26 changes: 9 additions & 17 deletions pkg/job-queue/Doctrine/JobStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,13 @@ class JobStorage
private $uniqueTableName;

/**
* @param ManagerRegistry $doctrine
* @param string $entityClass
* @param string $uniqueTableName
* @var string
*/
private $entityManagerName;
balabis marked this conversation as resolved.
Show resolved Hide resolved

/**
* @param string $entityClass
* @param string $uniqueTableName
*/
public function __construct(ManagerRegistry $doctrine, $entityClass, $uniqueTableName)
{
Expand Down Expand Up @@ -90,7 +94,6 @@ public function findRootJobByOwnerIdAndJobName($ownerId, $jobName)

/**
* @param string $name
* @param Job $rootJob
*
* @return Job
*/
Expand Down Expand Up @@ -119,20 +122,13 @@ public function createJob()
}

/**
* @param Job $job
* @param \Closure|null $lockCallback
*
* @throws DuplicateJobException
*/
public function saveJob(Job $job, \Closure $lockCallback = null)
{
$class = $this->getEntityRepository()->getClassName();
if (!$job instanceof $class) {
throw new \LogicException(sprintf(
'Got unexpected job instance: expected: "%s", actual" "%s"',
$class,
get_class($job)
));
throw new \LogicException(sprintf('Got unexpected job instance: expected: "%s", actual" "%s"', $class, get_class($job)));
}

if ($lockCallback) {
Expand Down Expand Up @@ -175,11 +171,7 @@ public function saveJob(Job $job, \Closure $lockCallback = null)
]);
}
} catch (UniqueConstraintViolationException $e) {
throw new DuplicateJobException(sprintf(
'Duplicate job. ownerId:"%s", name:"%s"',
$job->getOwnerId(),
$job->getName()
));
throw new DuplicateJobException(sprintf('Duplicate job. ownerId:"%s", name:"%s"', $job->getOwnerId(), $job->getName()));
}

$this->getEntityManager()->persist($job);
Expand Down
9 changes: 0 additions & 9 deletions pkg/job-queue/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ public function getCreatedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $createdAt
*/
public function setCreatedAt(\DateTime $createdAt)
{
Expand All @@ -258,8 +256,6 @@ public function getStartedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $startedAt
*/
public function setStartedAt(\DateTime $startedAt)
{
Expand All @@ -279,8 +275,6 @@ public function getStoppedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $stoppedAt
*/
public function setStoppedAt(\DateTime $stoppedAt)
{
Expand Down Expand Up @@ -324,9 +318,6 @@ public function getData()
return $this->data;
}

/**
* @param array $data
*/
public function setData(array $data)
{
$this->data = $data;
Expand Down
46 changes: 4 additions & 42 deletions pkg/job-queue/JobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class JobProcessor
*/
private $producer;

/**
* @param JobStorage $jobStorage
* @param ProducerInterface $producer
*/
public function __construct(JobStorage $jobStorage, ProducerInterface $producer)
{
$this->jobStorage = $jobStorage;
Expand Down Expand Up @@ -74,7 +70,6 @@ public function findOrCreateRootJob($ownerId, $jobName, $unique = false)

/**
* @param string $jobName
* @param Job $rootJob
*
* @return Job
*/
Expand Down Expand Up @@ -104,9 +99,6 @@ public function findOrCreateChildJob($jobName, Job $rootJob)
return $job;
}

/**
* @param Job $job
*/
public function startChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -116,11 +108,7 @@ public function startChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_NEW !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can start only new jobs: id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can start only new jobs: id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_RUNNING);
Expand All @@ -131,9 +119,6 @@ public function startChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function successChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -143,11 +128,7 @@ public function successChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_RUNNING !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can success only running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can success only running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_SUCCESS);
Expand All @@ -158,9 +139,6 @@ public function successChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function failChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -170,11 +148,7 @@ public function failChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_RUNNING !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can fail only running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can fail only running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_FAILED);
Expand All @@ -185,9 +159,6 @@ public function failChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function cancelChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -197,11 +168,7 @@ public function cancelChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (!in_array($job->getStatus(), [Job::STATUS_NEW, Job::STATUS_RUNNING], true)) {
throw new \LogicException(sprintf(
'Can cancel only new or running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can cancel only new or running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_CANCELLED);
Expand All @@ -217,7 +184,6 @@ public function cancelChildJob(Job $job)
}

/**
* @param Job $job
* @param bool $force
*/
public function interruptRootJob(Job $job, $force = false)
Expand Down Expand Up @@ -245,8 +211,6 @@ public function interruptRootJob(Job $job, $force = false)

/**
* @see https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale
*
* @param Job $job
*/
protected function saveJob(Job $job)
{
Expand All @@ -255,8 +219,6 @@ protected function saveJob(Job $job)

/**
* @see https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale
*
* @param Job $job
*/
protected function sendCalculateRootJobStatusEvent(Job $job)
{
Expand Down
Loading