Skip to content

Commit

Permalink
Extract some queue logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Dec 29, 2016
1 parent 6f4076a commit e030231
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 106 deletions.
10 changes: 10 additions & 0 deletions src/Illuminate/Contracts/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ public function later($delay, $job, $data = '', $queue = null);
*/
public function laterOn($queue, $delay, $job, $data = '');

/**
* Push an array of jobs onto the queue.
*
* @param array $jobs
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function bulk($jobs, $data = '', $queue = null);

/**
* Pop the next job off of the queue.
*
Expand Down
19 changes: 19 additions & 0 deletions src/Illuminate/Queue/InvalidPayloadException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

namespace Illuminate\Queue;

use InvalidArgumentException;

class InvalidPayloadException extends InvalidArgumentException
{
/**
* Create a new exception instance.
*
* @param string|null $message
* @return void
*/
public function __construct($message = null)
{
parent::__construct($message ?: json_last_error());
}
}
91 changes: 31 additions & 60 deletions src/Illuminate/Queue/Jobs/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,9 @@ public function fire()
{
$payload = $this->payload();

list($class, $method) = $this->parseJob($payload['job']);
list($class, $method) = JobName::parse($payload['job']);

$this->instance = $this->resolve($class);

$this->instance->{$method}($this, $payload['data']);
}

/**
* Resolve the given job handler.
*
* @param string $class
* @return mixed
*/
protected function resolve($class)
{
return $this->container->make($class);
($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
}

/**
Expand Down Expand Up @@ -137,60 +124,22 @@ public function failed($e)
{
$payload = $this->payload();

list($class, $method) = $this->parseJob($payload['job']);
list($class, $method) = JobName::parse($payload['job']);

$this->instance = $this->resolve($class);

if (method_exists($this->instance, 'failed')) {
if (method_exists($this->instance = $this->resolve($class), 'failed')) {
$this->instance->failed($payload['data'], $e);
}
}

/**
* Parse the job declaration into class and method.
*
* @param string $job
* @return array
*/
protected function parseJob($job)
{
$segments = explode('@', $job);

return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
}

/**
* Get the name of the queued job class.
* Resolve the given class.
*
* @return string
*/
public function getName()
{
return $this->payload()['job'];
}

/**
* Get the resolved name of the queued job class.
*
* Resolves the name of "wrapped" jobs such as class-based handlers.
*
* @return string
* @param string $class
* @return mixed
*/
public function resolveName()
protected function resolve($class)
{
$name = $this->getName();

$payload = $this->payload();

if ($name === 'Illuminate\Queue\CallQueuedHandler@call') {
return Arr::get($payload, 'data.commandName', $name);
}

if ($name === 'Illuminate\Events\CallQueuedHandler@call') {
return $payload['data']['class'].'@'.$payload['data']['method'];
}

return $name;
return $this->container->make($class);
}

/**
Expand Down Expand Up @@ -223,6 +172,28 @@ public function timeout()
return array_get($this->payload(), 'timeout');
}

/**
* Get the name of the queued job class.
*
* @return string
*/
public function getName()
{
return $this->payload()['job'];
}

/**
* Get the resolved name of the queued job class.
*
* Resolves the name of "wrapped" jobs such as class-based handlers.
*
* @return string
*/
public function resolveName()
{
return JobName::resolve($this->getName(), $this->payload());
}

/**
* Get the name of the connection the job belongs to.
*
Expand Down
41 changes: 41 additions & 0 deletions src/Illuminate/Queue/Jobs/JobName.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace Illuminate\Queue\Jobs;

use Illuminate\Support\Arr;

class JobName
{
/**
* Parse the given job name into a class / method array.
*
* @param string $job
* @return array
*/
public static function parse($job)
{
$segments = explode('@', $job);

return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
}

/**
* Get the resolved name of the queued job class.
*
* @param string $name
* @param array $payload
* @return string
*/
public static function resolve($name, $payload)
{
if ($name === 'Illuminate\Queue\CallQueuedHandler@call') {
return Arr::get($payload, 'data.commandName', $name);
}

if ($name === 'Illuminate\Events\CallQueuedHandler@call') {
return $payload['data']['class'].'@'.$payload['data']['method'];
}

return $name;
}
}
72 changes: 47 additions & 25 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,53 @@ public function bulk($jobs, $data = '', $queue = null)
* @param string $queue
* @return string
*
* @throws \InvalidArgumentException
* @throws \Illuminate\Queue\InvalidPayloadException
*/
protected function createPayload($job, $data = '', $queue = null)

This comment has been minimized.

Copy link
@echnio

echnio May 11, 2017

Why is this place so changed? What is the problem?

{
if (is_object($job)) {
$payload = json_encode([
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
]);
} else {
$payload = json_encode($this->createPlainPayload($job, $data));
}
$payload = json_encode($this->createPayloadArray($job, $data, $queue));

if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: '.json_last_error_msg());
throw new InvalidPayloadException;
}

return $payload;
}

/**
* Create a payload array from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return array
*/
protected function createPayloadArray($job, $data = '', $queue = null)
{
return is_object($job)
? $this->createObjectPayload($job)
: $this->createPlainPayload($job, $data);
}

/**
* Create a payload for an object-based queue handler.
*
* @param mixed $job
* @return array
*/
protected function createObjectPayload($job)
{
return [
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'maxTries' => isset($job->tries) ? $job->tries : null,
'timeout' => isset($job->timeout) ? $job->timeout : null,
'data' => [
'commandName' => get_class($job),
'command' => serialize(clone $job),
],
];
}

/**
* Create a typical, "plain" queue payload array.
*
Expand All @@ -119,20 +141,20 @@ protected function createPlainPayload($job, $data)
* @param string $value
* @return string
*
* @throws \InvalidArgumentException
* @throws \Illuminate\Queue\InvalidPayloadException
*/
protected function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
// protected function setMeta($payload, $key, $value)
// {
// $payload = json_decode($payload, true);

$payload = json_encode(Arr::set($payload, $key, $value));
// $payload = json_encode(Arr::set($payload, $key, $value));

if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidArgumentException('Unable to create payload: '.json_last_error_msg());
}
// if (JSON_ERROR_NONE !== json_last_error()) {
// throw new InvalidPayloadException;
// }

return $payload;
}
// return $payload;
// }

/**
* Set the IoC container instance.
Expand Down
11 changes: 5 additions & 6 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,12 @@ public function migrateExpiredJobs($from, $to)
* @param string $queue
* @return string
*/
protected function createPayload($job, $data = '', $queue = null)
protected function createPayloadArray($job, $data = '', $queue = null)
{
$payload = $this->setMeta(
parent::createPayload($job, $data), 'id', $this->getRandomId()
);

return $this->setMeta($payload, 'attempts', 1);
return array_merge(parent::createPayloadArray($job, $data, $queue), [
'id' => $this->getRandomId(),
'attempts' => 1,
]);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Illuminate/Redis/Connections/PhpRedisConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class PhpRedisConnection extends Connection
/**
* Create a new Predis connection.
*
* @param \Predis\Client $client
* @param \Redis $client
* @return void
*/
public function __construct($client)
Expand Down
14 changes: 0 additions & 14 deletions tests/Queue/QueueDatabaseQueueUnitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,6 @@ public function testFailureToCreatePayloadFromArray()
]);
}

public function testFailureToCreatePayloadAfterAddingMeta()
{
$this->expectException('InvalidArgumentException');

$queue = $this->getMockForAbstractClass('Illuminate\Queue\Queue');
$class = new ReflectionClass('Illuminate\Queue\Queue');

$setMeta = $class->getMethod('setMeta');
$setMeta->setAccessible(true);
$setMeta->invokeArgs($queue, [
json_encode(['valid']), 'key', "\xc3\x28",
]);
}

public function testBulkBatchPushesOntoDatabase()
{
$database = m::mock('Illuminate\Database\Connection');
Expand Down

0 comments on commit e030231

Please sign in to comment.