Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework DriverFactory, add separator option to Client Config. #646

Merged
merged 3 commits into from
Nov 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ enqueue:
client:
traceable_producer: true
prefix: enqueue
separator: .
app_name: app
router_topic: default
router_queue: default
router_processor: null
default_processor_queue: default
redelivered_delay_time: 0
default_queue: default

# The array contains driver specific options
driver_options: []

# The "monitoring" option could accept a string DSN, an array with DSN key, or null. It accept extra options. To find out what option you can set, look at stats storage constructor doc block.
monitoring:
Expand All @@ -57,9 +61,12 @@ enqueue:

# The factory service should be a class that implements "Enqueue\Monitoring\StatsStorageFactory" interface
storage_factory_class: ~
job: false
async_commands:
enabled: false
job:
enabled: false
async_events:
enabled: false
extensions:
doctrine_ping_connection_extension: false
doctrine_clear_identity_map_extension: false
Expand Down
2 changes: 1 addition & 1 deletion docs/laravel/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ return [
'client' => [
'router_topic' => 'default',
'router_queue' => 'default',
'default_processor_queue' => 'default',
'default_queue' => 'default',
],
],
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class CustomAppKernel extends Kernel
'app_name' => '',
'router_topic' => 'test',
'router_queue' => 'test',
'default_processor_queue' => 'test',
'default_queue' => 'test',
],
],
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Config\Definition\ConfigurationInterface;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use Symfony\Component\Config\Definition\Exception\InvalidTypeException;
use Symfony\Component\Config\Definition\Processor;

class ConfigurationTest extends TestCase
Expand Down Expand Up @@ -96,14 +97,66 @@ public function testShouldSetDefaultConfigurationForClient()
'router_processor' => null,
'router_topic' => 'default',
'router_queue' => 'default',
'default_processor_queue' => 'default',
'default_queue' => 'default',
'traceable_producer' => true,
'redelivered_delay_time' => 0,
],
],
], $config);
}

public function testThrowIfClientDriverOptionsIsNotArray()
{
$configuration = new Configuration(true);

$processor = new Processor();

$this->expectException(InvalidTypeException::class);
$this->expectExceptionMessage('Invalid type for path "enqueue.default.client.driver_options". Expected array, but got string');
$processor->processConfiguration($configuration, [[
'default' => [
'transport' => 'null:',
'client' => [
'driver_options' => 'invalidOption',
],
],
]]);
}

public function testShouldConfigureClientDriverOptions()
{
$configuration = new Configuration(true);

$processor = new Processor();
$config = $processor->processConfiguration($configuration, [[
'default' => [
'transport' => 'null:',
'client' => [
'driver_options' => [
'foo' => 'fooVal',
],
],
],
]]);

$this->assertConfigEquals([
'default' => [
'client' => [
'prefix' => 'enqueue',
'app_name' => 'app',
'router_processor' => null,
'router_topic' => 'default',
'router_queue' => 'default',
'default_queue' => 'default',
'traceable_producer' => true,
'driver_options' => [
'foo' => 'fooVal',
],
],
],
], $config);
}

public function testThrowExceptionIfRouterTopicIsEmpty()
{
$this->expectException(InvalidConfigurationException::class);
Expand Down Expand Up @@ -147,12 +200,12 @@ public function testShouldThrowExceptionIfDefaultProcessorQueueIsEmpty()
$processor = new Processor();

$this->expectException(InvalidConfigurationException::class);
$this->expectExceptionMessage('The path "enqueue.default.client.default_processor_queue" cannot contain an empty value, but got "".');
$this->expectExceptionMessage('The path "enqueue.default.client.default_queue" cannot contain an empty value, but got "".');
$processor->processConfiguration($configuration, [[
'default' => [
'transport' => ['dsn' => 'null:'],
'client' => [
'default_processor_queue' => '',
'default_queue' => '',
],
],
]]);
Expand Down
49 changes: 39 additions & 10 deletions pkg/enqueue/Client/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,22 @@ class Config
*/
private $transportConfig;

public function __construct(string $prefix, string $app, string $routerTopic, string $routerQueue, string $defaultQueue, string $routerProcessor, array $transportConfig = [])
{
/**
* @var array
*/
private $driverConfig;

public function __construct(
string $prefix,
string $separator,
string $app,
string $routerTopic,
string $routerQueue,
string $defaultQueue,
string $routerProcessor,
array $transportConfig,
array $driverConfig
) {
$this->prefix = trim($prefix);
$this->app = trim($app);

Expand All @@ -78,8 +92,9 @@ public function __construct(string $prefix, string $app, string $routerTopic, st
}

$this->transportConfig = $transportConfig;
$this->driverConfig = $driverConfig;

$this->separator = '.';
$this->separator = $separator;
}

public function getPrefix(): string
Expand Down Expand Up @@ -117,33 +132,47 @@ public function getRouterProcessor(): string
return $this->routerProcessor;
}

/**
* @deprecated
*
* @param null|mixed $default
*/
public function getTransportOption(string $name, $default = null)
{
return array_key_exists($name, $this->transportConfig) ? $this->transportConfig[$name] : $default;
}

public function getTransportOptions(): array
{
return $this->transportConfig;
}

public function getDriverOption(string $name, $default = null)
{
return array_key_exists($name, $this->driverConfig) ? $this->driverConfig[$name] : $default;
}

public function getDriverOptions(): array
{
return $this->driverConfig;
}

public static function create(
string $prefix = null,
string $separator = null,
string $app = null,
string $routerTopic = null,
string $routerQueue = null,
string $defaultQueue = null,
string $routerProcessor = null,
array $transportConfig = []
array $transportConfig = [],
array $driverConfig = []
): self {
return new static(
$prefix ?: '',
$separator ?: '.',
$app ?: '',
$routerTopic ?: 'router',
$routerQueue ?: 'default',
$defaultQueue ?: 'default',
$routerProcessor ?: 'router',
$transportConfig
$transportConfig,
$driverConfig
);
}
}
73 changes: 24 additions & 49 deletions pkg/enqueue/Client/DriverFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,31 @@

namespace Enqueue\Client;

use Enqueue\Client\Driver\RabbitMqDriver;
use Enqueue\Client\Driver\RabbitMqStompDriver;
use Enqueue\Client\Driver\StompManagementClient;
use Enqueue\Dsn\Dsn;
use Enqueue\Stomp\StompConnectionFactory;
use Interop\Amqp\AmqpConnectionFactory;
use Interop\Queue\ConnectionFactory;

final class DriverFactory implements DriverFactoryInterface
{
/**
* @var Config
*/
private $config;

/**
* @var RouteCollection
*/
private $routeCollection;

public function __construct(Config $config, RouteCollection $routeCollection)
public function create(ConnectionFactory $factory, Config $config, RouteCollection $collection): DriverInterface
{
$this->config = $config;
$this->routeCollection = $routeCollection;
}
$dsn = $config->getTransportOption('dsn');

if (empty($dsn)) {
throw new \LogicException('This driver factory relies on dsn option from transport config. The option is empty or not set.');
}

public function create(ConnectionFactory $factory, string $dsn, array $config): DriverInterface
{
$dsn = Dsn::parseFirst($dsn);

if ($driverInfo = $this->findDriverInfo($dsn, Resources::getAvailableDrivers())) {
$driverClass = $driverInfo['driverClass'];

if (RabbitMqDriver::class === $driverClass) {
if (false == $factory instanceof AmqpConnectionFactory) {
throw new \LogicException(sprintf(
'The factory must be instance of "%s", got "%s"',
AmqpConnectionFactory::class,
get_class($factory)
));
}

return new RabbitMqDriver($factory->createContext(), $this->config, $this->routeCollection);
}

if (RabbitMqStompDriver::class === $driverClass) {
if (false == $factory instanceof StompConnectionFactory) {
throw new \LogicException(sprintf(
'The factory must be instance of "%s", got "%s"',
StompConnectionFactory::class,
get_class($factory)
));
}

$managementClient = StompManagementClient::create(
ltrim($dsn->getPath(), '/'),
$dsn->getHost() ?: 'localhost',
$config['management_plugin_port'] ?? 15672,
(string) $dsn->getUser(),
(string) $dsn->getPassword()
);

return new RabbitMqStompDriver($factory->createContext(), $this->config, $this->routeCollection, $managementClient);
return $this->createRabbitMqStompDriver($factory, $dsn, $config, $collection);
}

return new $driverClass($factory->createContext(), $this->config, $this->routeCollection);
return new $driverClass($factory->createContext(), $config, $collection);
}

$knownDrivers = Resources::getKnownDrivers();
Expand Down Expand Up @@ -121,4 +80,20 @@ private function findDriverInfo(Dsn $dsn, array $factories): ?array

return null;
}

private function createRabbitMqStompDriver(ConnectionFactory $factory, Dsn $dsn, Config $config, RouteCollection $collection): RabbitMqStompDriver
{
$defaultManagementHost = $dsn->getHost() ?: $config->getTransportOption('host', 'localhost');
$managementVast = ltrim($dsn->getPath(), '/') ?: $config->getTransportOption('vhost', '/');

$managementClient = StompManagementClient::create(
urldecode($managementVast),
$config->getDriverOption('rabbitmq_management_host', $defaultManagementHost),
$config->getDriverOption('rabbitmq_management_port', 15672),
(string) $dsn->getUser() ?: $config->getTransportOption('user', 'guest'),
(string) $dsn->getPassword() ?: $config->getTransportOption('pass', 'guest')
);

return new RabbitMqStompDriver($factory->createContext(), $config, $collection, $managementClient);
}
}
2 changes: 1 addition & 1 deletion pkg/enqueue/Client/DriverFactoryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

interface DriverFactoryInterface
{
public function create(ConnectionFactory $factory, string $dsn, array $config): DriverInterface;
public function create(ConnectionFactory $factory, Config $config, RouteCollection $collection): DriverInterface;
}
Loading