diff --git a/.gitignore b/.gitignore index c0e7bf821..6ac624141 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ bin/jp.php bin/php-parse bin/google-cloud-batch vendor +var .php_cs .php_cs.cache composer.lock \ No newline at end of file diff --git a/README.md b/README.md index 517eba52c..a96aac98b 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Features: * [Feature rich](docs/quick_tour.md). * Implements [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transports based on [queue-interop](https://github.com/queue-interop/queue-interop) interfaces. * Supported transports - * Amqp based on [the ext](docs/transport/amqp.md), [bunny](docs/transport/amqp_bunny.md), [the lib](docs/transport/amqp_lib.md) + * AMQP(S) based on [the ext](docs/transport/amqp.md), [bunny](docs/transport/amqp_bunny.md), [the lib](docs/transport/amqp_lib.md) * [Beanstalk](docs/transport/pheanstalk.md) * [STOMP](docs/transport/stomp.md) * [Amazon SQS](docs/transport/sqs.md) diff --git a/bin/build-rabbitmq-image.sh b/bin/build-rabbitmq-image.sh new file mode 100755 index 000000000..fb6d6bb20 --- /dev/null +++ b/bin/build-rabbitmq-image.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +set -e +set -x + +(cd docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq:latest" -f Dockerfile.rabbitmq .) +(cd docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD") +(cd docker && docker push "enqueue/rabbitmq:latest") \ No newline at end of file diff --git a/bin/build-rabbitmq-ssl-image.sh b/bin/build-rabbitmq-ssl-image.sh new file mode 100755 index 000000000..635bde953 --- /dev/null +++ b/bin/build-rabbitmq-ssl-image.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + + +set -e +set -x + +mkdir -p /tmp/roboconf +rm -rf /tmp/roboconf/* + +(cd /tmp/roboconf && git clone git@github.com:roboconf/rabbitmq-with-ssl-in-docker.git) + +(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker build --rm --force-rm --no-cache --pull --squash --tag "enqueue/rabbitmq-ssl:latest" .) + +(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker login --username="$DOCKER_USER" --password="$DOCKER_PASSWORD") +(cd /tmp/roboconf/rabbitmq-with-ssl-in-docker && docker push "enqueue/rabbitmq-ssl:latest") + +docker run --rm -v "`pwd`/var/rabbitmq_certificates:/enqueue" "enqueue/rabbitmq-ssl:latest" cp /home/testca/cacert.pem /enqueue/cacert.pem + + + diff --git a/bin/test b/bin/test index ea5789a92..e53034ebb 100755 --- a/bin/test +++ b/bin/test @@ -4,6 +4,9 @@ # $1 host # $2 port # $3 attempts + +FORCE_EXIT=false + function waitForService() { ATTEMPTS=0 @@ -14,13 +17,20 @@ function waitForService() printf "service is not running %s:%s\n" $1 $2 exit 1 fi + if [ "$FORCE_EXIT" = true ]; then + exit; + fi + sleep 1 done printf "service is online %s:%s\n" $1 $2 } +trap "FORCE_EXIT=true" SIGTERM SIGINT + waitForService rabbitmq 5672 50 +waitForService rabbitmq_ssl 5671 50 waitForService mysql 3306 50 waitForService redis 6379 50 waitForService beanstalkd 11300 50 diff --git a/docker-compose.yml b/docker-compose.yml index 35a71db69..7d247510c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,7 @@ services: # build: { context: docker, dockerfile: Dockerfile } depends_on: - rabbitmq + - rabbitmq_ssl - mysql - redis - beanstalkd @@ -17,6 +18,7 @@ services: - './:/mqdev' environment: - AMQP_DSN=amqp://guest:guest@rabbitmq:5672/mqdev + - AMQPS_DSN=amqps://guest:guest@rabbitmq_ssl:5671 - DOCTINE_DSN=mysql://root:rootpass@mysql/mqdev - SYMFONY__RABBITMQ__HOST=rabbitmq - SYMFONY__RABBITMQ__USER=guest @@ -54,8 +56,16 @@ services: ports: - "15677:15672" + rabbitmq_ssl: + image: enqueue/rabbitmq-ssl:latest + environment: + - RABBITMQ_DEFAULT_USER=guest + - RABBITMQ_DEFAULT_PASS=guest + volumes: + - './var/rabbitmq_certificates:/home/client' + beanstalkd: - image: 'schickling/beanstalkd' + image: 'jonbaldie/beanstalkd' gearmand: image: 'artefactual/gearmand' diff --git a/docs/bundle/config_reference.md b/docs/bundle/config_reference.md index cac0ccd6b..947e40ce2 100644 --- a/docs/bundle/config_reference.md +++ b/docs/bundle/config_reference.md @@ -87,6 +87,21 @@ enqueue: # The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option. driver_options: ~ + + # Should be true if you want to use secure connections. False by default + ssl_on: ~ + + # This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default. + ssl_verify: ~ + + # Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string. + ssl_cacert: ~ + + # Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string + ssl_cert: ~ + + # Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string. + ssl_key: ~ rabbitmq_amqp: driver: ~ # One of "ext"; "lib"; "bunny" @@ -137,6 +152,21 @@ enqueue: # The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option. driver_options: ~ + # Should be true if you want to use secure connections. False by default + ssl_on: ~ + + # This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default. + ssl_verify: ~ + + # Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string. + ssl_cacert: ~ + + # Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string + ssl_cert: ~ + + # Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string. + ssl_key: ~ + # The delay strategy to be used. Possible values are "dlx", "delayed_message_plugin" or service id delay_strategy: dlx fs: @@ -196,6 +226,25 @@ enqueue: # the connection will be performed as later as possible, if the option set to true lazy: true + gps: + + # The connection to Google Pub/Sub broker set as a string. Other parameters are ignored if set + dsn: ~ + + # The project ID from the Google Developer's Console. + projectId: ~ + + # The full path to your service account credentials.json file retrieved from the Google Developers Console. + keyFilePath: ~ + + # Number of retries for a failed request. + retries: 3 + + # Scopes to be used for the request. + scopes: [] + + # The connection will be performed as later as possible, if the option set to true + lazy: true client: traceable_producer: false prefix: enqueue diff --git a/docs/transport/amqp.md b/docs/transport/amqp.md index fcfdd1436..218d5a3ac 100644 --- a/docs/transport/amqp.md +++ b/docs/transport/amqp.md @@ -50,6 +50,14 @@ $factory = new AmqpConnectionFactory([ // same as above but given as DSN string $factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f'); +// SSL or secure connection +$factory = new AmqpConnectionFactory([ + 'dsn' => 'amqps:', + 'ssl_cacert' => '/path/to/cacert.pem', + 'ssl_cert' => '/path/to/cert.pem', + 'ssl_key' => '/path/to/key.pem', +]); + $psrContext = $factory->createContext(); // if you have enqueue/enqueue library installed you can use a function from there to create the context diff --git a/docs/transport/amqp_lib.md b/docs/transport/amqp_lib.md index 8ce661433..e1cd3c8db 100644 --- a/docs/transport/amqp_lib.md +++ b/docs/transport/amqp_lib.md @@ -50,6 +50,14 @@ $factory = new AmqpConnectionFactory([ // same as above but given as DSN string $factory = new AmqpConnectionFactory('amqp://user:pass@example.com:10000/%2f'); +// SSL or secure connection +$factory = new AmqpConnectionFactory([ + 'dsn' => 'amqps:', + 'ssl_cacert' => '/path/to/cacert.pem', + 'ssl_cert' => '/path/to/cert.pem', + 'ssl_key' => '/path/to/key.pem', +]); + $psrContext = $factory->createContext(); // if you have enqueue/enqueue library installed you can use a function from there to create the context diff --git a/pkg/amqp-bunny/AmqpConnectionFactory.php b/pkg/amqp-bunny/AmqpConnectionFactory.php index 470819778..c5c567c40 100644 --- a/pkg/amqp-bunny/AmqpConnectionFactory.php +++ b/pkg/amqp-bunny/AmqpConnectionFactory.php @@ -85,6 +85,10 @@ public function getConfig() */ private function establishConnection() { + if ($this->config->isSslOn()) { + throw new \LogicException('The bunny library does not support SSL connections'); + } + if (false == $this->client) { $bunnyConfig = []; $bunnyConfig['host'] = $this->config->getHost(); diff --git a/pkg/amqp-bunny/Tests/AmqpConnectionFactoryTest.php b/pkg/amqp-bunny/Tests/AmqpConnectionFactoryTest.php index c4e6a1e64..43ae04fdd 100644 --- a/pkg/amqp-bunny/Tests/AmqpConnectionFactoryTest.php +++ b/pkg/amqp-bunny/Tests/AmqpConnectionFactoryTest.php @@ -22,7 +22,7 @@ public function testShouldSupportAmqpLibScheme() new AmqpConnectionFactory('amqp+bunny:'); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqp+bunny" only.'); + $this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqps", "amqp+bunny" only.'); new AmqpConnectionFactory('amqp+foo:'); } } diff --git a/pkg/amqp-bunny/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php b/pkg/amqp-bunny/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php new file mode 100644 index 000000000..1f53ae398 --- /dev/null +++ b/pkg/amqp-bunny/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php @@ -0,0 +1,59 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('The bunny library does not support SSL connections'); + parent::test(); + } + + /** + * {@inheritdoc} + */ + protected function createContext() + { + $baseDir = realpath(__DIR__.'/../../../../'); + + // guard + $this->assertNotEmpty($baseDir); + + $certDir = $baseDir.'/var/rabbitmq_certificates'; + $this->assertDirectoryExists($certDir); + + $factory = new AmqpConnectionFactory([ + 'dsn' => getenv('AMQPS_DSN'), + 'ssl_verify' => false, + 'ssl_cacert' => $certDir.'/cacert.pem', + 'ssl_cert' => $certDir.'/cert.pem', + 'ssl_key' => $certDir.'/key.pem', + ]); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-ext/AmqpConnectionFactory.php b/pkg/amqp-ext/AmqpConnectionFactory.php index 18de675e6..c6127133c 100644 --- a/pkg/amqp-ext/AmqpConnectionFactory.php +++ b/pkg/amqp-ext/AmqpConnectionFactory.php @@ -33,6 +33,7 @@ public function __construct($config = 'amqp:') { $this->config = (new ConnectionConfig($config)) ->addSupportedScheme('amqp+ext') + ->addSupportedScheme('amqps+ext') ->addDefaultOption('receive_method', 'basic_get') ->parse() ; @@ -113,11 +114,20 @@ private function establishConnection() $extConfig['read_timeout'] = $this->config->getReadTimeout(); $extConfig['write_timeout'] = $this->config->getWriteTimeout(); $extConfig['connect_timeout'] = $this->config->getConnectionTimeout(); + $extConfig['heartbeat'] = $this->config->getHeartbeat(); + + if ($this->config->isSslOn()) { + $extConfig['verify'] = $this->config->isSslVerify(); + $extConfig['cacert'] = $this->config->getSslCaCert(); + $extConfig['cert'] = $this->config->getSslCert(); + $extConfig['key'] = $this->config->getSslKey(); + } $this->connection = new \AMQPConnection($extConfig); $this->config->isPersisted() ? $this->connection->pconnect() : $this->connection->connect(); } + if (false == $this->connection->isConnected()) { $this->config->isPersisted() ? $this->connection->preconnect() : $this->connection->reconnect(); } diff --git a/pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php b/pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php index 251e57439..b4e1201ef 100644 --- a/pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php +++ b/pkg/amqp-ext/Tests/AmqpConnectionFactoryTest.php @@ -21,9 +21,10 @@ public function testShouldSupportAmqpExtScheme() { // no exception here new AmqpConnectionFactory('amqp+ext:'); + new AmqpConnectionFactory('amqps+ext:'); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqp+ext" only.'); + $this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqps", "amqp+ext", "amqps+ext" only.'); new AmqpConnectionFactory('amqp+foo:'); } diff --git a/pkg/amqp-ext/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php b/pkg/amqp-ext/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php new file mode 100644 index 000000000..4313b9b5e --- /dev/null +++ b/pkg/amqp-ext/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php @@ -0,0 +1,52 @@ +assertNotEmpty($baseDir); + + $certDir = $baseDir.'/var/rabbitmq_certificates'; + $this->assertDirectoryExists($certDir); + + $factory = new AmqpConnectionFactory([ + 'dsn' => getenv('AMQPS_DSN'), + 'ssl_verify' => false, + 'ssl_cacert' => $certDir.'/cacert.pem', + 'ssl_cert' => $certDir.'/cert.pem', + 'ssl_key' => $certDir.'/key.pem', + ]); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index a44f67146..de9de0318 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -10,6 +10,7 @@ use PhpAmqpLib\Connection\AMQPLazyConnection; use PhpAmqpLib\Connection\AMQPLazySocketConnection; use PhpAmqpLib\Connection\AMQPSocketConnection; +use PhpAmqpLib\Connection\AMQPSSLConnection; use PhpAmqpLib\Connection\AMQPStreamConnection; class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware @@ -38,6 +39,7 @@ public function __construct($config = 'amqp:') { $this->config = (new ConnectionConfig($config)) ->addSupportedScheme('amqp+lib') + ->addSupportedScheme('amqps+lib') ->addDefaultOption('stream', true) ->addDefaultOption('insist', false) ->addDefaultOption('login_method', 'AMQPLAIN') @@ -84,7 +86,32 @@ private function establishConnection() { if (false == $this->connection) { if ($this->config->getOption('stream')) { - if ($this->config->isLazy()) { + if ($this->config->isSslOn()) { + $con = new AMQPSSLConnection( + $this->config->getHost(), + $this->config->getPort(), + $this->config->getUser(), + $this->config->getPass(), + $this->config->getVHost(), + [ + 'cafile' => $this->config->getSslCaCert(), + 'local_cert' => $this->config->getSslCert(), + 'local_pk' => $this->config->getSslKey(), + 'verify_peer' => $this->config->isSslVerify(), + 'verify_peer_name' => $this->config->isSslVerify(), + ], + [ + 'insist' => $this->config->getOption('insist'), + 'login_method' => $this->config->getOption('login_method'), + 'login_response' => $this->config->getOption('login_response'), + 'locale' => $this->config->getOption('locale'), + 'connection_timeout' => $this->config->getConnectionTimeout(), + 'read_write_timeout' => (int) round(min($this->config->getReadTimeout(), $this->config->getWriteTimeout())), + 'keepalive' => $this->config->getOption('keepalive'), + 'heartbeat' => (int) round($this->config->getHeartbeat()), + ] + ); + } elseif ($this->config->isLazy()) { $con = new AMQPLazyConnection( $this->config->getHost(), $this->config->getPort(), @@ -120,6 +147,10 @@ private function establishConnection() ); } } else { + if ($this->config->isSslOn()) { + throw new \LogicException('The socket connection implementation does not support ssl connections.'); + } + if ($this->config->isLazy()) { $con = new AMQPLazySocketConnection( $this->config->getHost(), diff --git a/pkg/amqp-lib/Tests/AmqpConnectionFactoryTest.php b/pkg/amqp-lib/Tests/AmqpConnectionFactoryTest.php index 4bcf8f156..4f4c3e0e5 100644 --- a/pkg/amqp-lib/Tests/AmqpConnectionFactoryTest.php +++ b/pkg/amqp-lib/Tests/AmqpConnectionFactoryTest.php @@ -20,9 +20,10 @@ public function testShouldSupportAmqpLibScheme() { // no exception here new AmqpConnectionFactory('amqp+lib:'); + new AmqpConnectionFactory('amqps+lib:'); $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqp+lib" only.'); + $this->expectExceptionMessage('The given DSN scheme "amqp+foo" is not supported. Could be one of "amqp", "amqps", "amqp+lib'); new AmqpConnectionFactory('amqp+foo:'); } } diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php new file mode 100644 index 000000000..87fef80a0 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSslSendToAndReceiveFromQueueTest.php @@ -0,0 +1,52 @@ +assertNotEmpty($baseDir); + + $certDir = $baseDir.'/var/rabbitmq_certificates'; + $this->assertDirectoryExists($certDir); + + $factory = new AmqpConnectionFactory([ + 'dsn' => getenv('AMQPS_DSN'), + 'ssl_verify' => false, + 'ssl_cacert' => $certDir.'/cacert.pem', + 'ssl_cert' => $certDir.'/cert.pem', + 'ssl_key' => $certDir.'/key.pem', + ]); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +} diff --git a/pkg/amqp-tools/ConnectionConfig.php b/pkg/amqp-tools/ConnectionConfig.php index 8a844d1b6..932847250 100644 --- a/pkg/amqp-tools/ConnectionConfig.php +++ b/pkg/amqp-tools/ConnectionConfig.php @@ -20,11 +20,17 @@ * qos_prefetch_size - The server will send a message in advance if it is equal to or smaller in size than the available prefetch size. May be set to zero, meaning "no specific limit" * qos_prefetch_count - Specifies a prefetch window in terms of whole messages * qos_global - If "false" the QoS settings apply to the current channel only. If this field is "true", they are applied to the entire connection. + * ssl_on - Should be true if you want to use secure connections. False by default + * ssl_verify - This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default. + * ssl_cacert - Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string. + * ssl_cert - Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string + * ssl_key - Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string. * - * 2. null - in this case it tries to connect to locahost with default settings + * 2. null - in this case it tries to connect to localhost with default settings * 3. amqp: same as 2. * 4. amqp://user:pass@host:10000/vhost?lazy=true&persisted=false&read_timeout=2 * 5. amqp+foo: - the scheme driver could be used. (make sure you added it to the list of supported schemes) + * 6. amqps: - secure connection to localhost * * @see https://www.rabbitmq.com/uri-spec.html */ @@ -73,9 +79,15 @@ public function __construct($config = null) 'qos_global' => false, 'qos_prefetch_size' => 0, 'qos_prefetch_count' => 1, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ]; $this->addSupportedScheme('amqp'); + $this->addSupportedScheme('amqps'); } /** @@ -139,6 +151,11 @@ public function parse() $config['qos_global'] = !empty($config['qos_global']); $config['qos_prefetch_count'] = max((int) ($config['qos_prefetch_count']), 0); $config['qos_prefetch_size'] = max((int) ($config['qos_prefetch_size']), 0); + $config['ssl_on'] = !empty($config['ssl_on']); + $config['ssl_verify'] = !empty($config['ssl_verify']); + $config['ssl_cacert'] = (string) $config['ssl_cacert']; + $config['ssl_cert'] = (string) $config['ssl_cert']; + $config['ssl_key'] = (string) $config['ssl_key']; $this->config = $config; @@ -257,6 +274,46 @@ public function getQosPrefetchCount() return $this->getOption('qos_prefetch_count'); } + /** + * @return bool + */ + public function isSslOn() + { + return $this->getOption('ssl_on'); + } + + /** + * @return bool + */ + public function isSslVerify() + { + return $this->getOption('ssl_verify'); + } + + /** + * @return bool + */ + public function getSslCaCert() + { + return $this->getOption('ssl_cacert'); + } + + /** + * @return bool + */ + public function getSslCert() + { + return $this->getOption('ssl_cert'); + } + + /** + * @return bool + */ + public function getSslKey() + { + return $this->getOption('ssl_key'); + } + /** * @param string $name * @param mixed $default @@ -326,6 +383,10 @@ private function parseDsn($dsn) $config['vhost'] = ltrim($path, '/'); } + if (0 === strpos($scheme, 'amqps')) { + $config['ssl_on'] = true; + } + return array_map('urldecode', $config); } } diff --git a/pkg/amqp-tools/Tests/ConnectionConfigTest.php b/pkg/amqp-tools/Tests/ConnectionConfigTest.php index 8438ac19a..4472fd7f2 100644 --- a/pkg/amqp-tools/Tests/ConnectionConfigTest.php +++ b/pkg/amqp-tools/Tests/ConnectionConfigTest.php @@ -24,7 +24,7 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig() public function testThrowIfSchemeIsNotSupported() { $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be one of "amqp" only.'); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be one of "amqp", "amqps" only.'); (new ConnectionConfig('http://example.com'))->parse(); } @@ -32,7 +32,7 @@ public function testThrowIfSchemeIsNotSupported() public function testThrowIfSchemeIsNotSupportedIncludingAdditionalSupportedSchemes() { $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be one of "amqp", "amqp+foo" only.'); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be one of "amqp", "amqps", "amqp+foo" only.'); (new ConnectionConfig('http://example.com')) ->addSupportedScheme('amqp+foo') @@ -70,6 +70,11 @@ public function testShouldParseEmptyDsnWithDriverSet() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], $config->getConfig()); } @@ -95,6 +100,11 @@ public function testShouldParseCustomDsnWithDriverSet() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], $config->getConfig()); } @@ -131,6 +141,11 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -151,6 +166,36 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, + ], + ]; + + yield [ + 'amqps:', + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3., + 'write_timeout' => 3., + 'connection_timeout' => 3., + 'persisted' => false, + 'lazy' => true, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, + 'heartbeat' => 0.0, + 'ssl_on' => true, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -171,6 +216,11 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -191,6 +241,11 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -211,6 +266,11 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 23.3, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -231,6 +291,11 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => true, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -251,6 +316,11 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -271,6 +341,11 @@ public static function provideConfigs() 'qos_prefetch_count' => 1, 'qos_global' => true, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -291,6 +366,11 @@ public static function provideConfigs() 'qos_prefetch_size' => 0, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -311,6 +391,11 @@ public static function provideConfigs() 'qos_prefetch_size' => 0, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; @@ -337,6 +422,39 @@ public static function provideConfigs() 'qos_prefetch_size' => 0, 'qos_global' => false, 'heartbeat' => 0.0, + 'ssl_on' => false, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, + ], + ]; + + yield [ + [ + 'ssl_on' => false, + 'dsn' => 'amqps:', + ], + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3., + 'write_timeout' => 3., + 'connection_timeout' => 3., + 'persisted' => false, + 'lazy' => true, + 'qos_prefetch_size' => 0, + 'qos_prefetch_count' => 1, + 'qos_global' => false, + 'heartbeat' => 0.0, + 'ssl_on' => true, + 'ssl_verify' => true, + 'ssl_cacert' => null, + 'ssl_cert' => null, + 'ssl_key' => null, ], ]; } diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index facfdf9d5..c90b18697 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -23,6 +23,14 @@ public function setUp() public function provideEnqueueConfigs() { + $baseDir = realpath(__DIR__.'/../../../../'); + + // guard + $this->assertNotEmpty($baseDir); + + $certDir = $baseDir.'/var/rabbitmq_certificates'; + $this->assertDirectoryExists($certDir); + yield 'amqp' => [[ 'transport' => [ 'default' => 'amqp', @@ -45,6 +53,19 @@ public function provideEnqueueConfigs() ], ]]; + yield 'amqps_dsn' => [[ + 'transport' => [ + 'default' => 'amqp', + 'amqp' => [ + 'dsn' => getenv('AMQPS_DSN'), + 'ssl_verify' => false, + 'ssl_cacert' => $certDir.'/cacert.pem', + 'ssl_cert' => $certDir.'/cert.pem', + 'ssl_key' => $certDir.'/key.pem', + ], + ], + ]]; + yield 'default_amqp_as_dsn' => [[ 'transport' => [ 'default' => getenv('AMQP_DSN'), diff --git a/pkg/enqueue/Symfony/AmqpTransportFactory.php b/pkg/enqueue/Symfony/AmqpTransportFactory.php index 8b64a61f2..d098762ab 100644 --- a/pkg/enqueue/Symfony/AmqpTransportFactory.php +++ b/pkg/enqueue/Symfony/AmqpTransportFactory.php @@ -116,6 +116,21 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->variableNode('driver_options') ->info('The options that are specific to the amqp transport you chose. For example amqp+lib have insist, keepalive, stream options. amqp+bunny has tcp_nodelay extra option.') ->end() + ->booleanNode('ssl_on') + ->info('Should be true if you want to use secure connections. False by default') + ->end() + ->booleanNode('ssl_verify') + ->info('This option determines whether ssl client verifies that the server cert is for the server it is known as. True by default.') + ->end() + ->scalarNode('ssl_cacert') + ->info('Location of Certificate Authority file on local filesystem which should be used with the verify_peer context option to authenticate the identity of the remote peer. A string.') + ->end() + ->scalarNode('ssl_cert') + ->info('Path to local certificate file on filesystem. It must be a PEM encoded file which contains your certificate and private key. A string') + ->end() + ->scalarNode('ssl_key') + ->info('Path to local private key file on filesystem in case of separate files for certificate (local_cert) and private key. A string.') + ->end() ; } diff --git a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php b/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php index 59c5333b6..6596a79af 100644 --- a/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php +++ b/pkg/enqueue/Tests/Functions/DsnToConnectionFactoryFunctionTest.php @@ -2,7 +2,9 @@ namespace Enqueue\Tests\Functions; -use Enqueue\AmqpExt\AmqpConnectionFactory; +use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory; +use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory; +use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory; use Enqueue\Dbal\DbalConnectionFactory; use Enqueue\Fs\FsConnectionFactory; use Enqueue\Gearman\GearmanConnectionFactory; @@ -56,9 +58,21 @@ public function testReturnsExpectedFactoryInstance($dsn, $expectedFactoryClass) public static function provideDSNs() { - yield ['amqp:', AmqpConnectionFactory::class]; + yield ['amqp:', AmqpExtConnectionFactory::class]; - yield ['amqp://user:pass@foo/vhost', AmqpConnectionFactory::class]; + yield ['amqps:', AmqpExtConnectionFactory::class]; + + yield ['amqp+ext:', AmqpExtConnectionFactory::class]; + + yield ['amqps+ext:', AmqpExtConnectionFactory::class]; + + yield ['amqp+lib:', AmqpLibConnectionFactory::class]; + + yield ['amqps+lib:', AmqpLibConnectionFactory::class]; + + yield ['amqp+bunny:', AmqpBunnyConnectionFactory::class]; + + yield ['amqp://user:pass@foo/vhost', AmqpExtConnectionFactory::class]; yield ['file:', FsConnectionFactory::class]; @@ -72,7 +86,7 @@ public static function provideDSNs() yield ['beanstalk:', PheanstalkConnectionFactory::class]; - // yield ['gearman:', GearmanConnectionFactory::class]; + yield ['gearman:', GearmanConnectionFactory::class]; yield ['kafka:', RdKafkaConnectionFactory::class]; diff --git a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php index d468b33c2..d8bb5e1fb 100644 --- a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php @@ -112,6 +112,31 @@ public function testShouldAllowAddConfigurationWithDriverOptions() ], $config); } + public function testShouldAllowAddSslOptions() + { + $transport = new AmqpTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + 'ssl_on' => true, + 'ssl_verify' => false, + 'ssl_cacert' => '/path/to/cacert.pem', + 'ssl_cert' => '/path/to/cert.pem', + 'ssl_key' => '/path/to/key.pem', + ]]); + + $this->assertEquals([ + 'ssl_on' => true, + 'ssl_verify' => false, + 'ssl_cacert' => '/path/to/cacert.pem', + 'ssl_cert' => '/path/to/cert.pem', + 'ssl_key' => '/path/to/key.pem', + ], $config); + } + public function testShouldAllowAddConfigurationAsString() { $transport = new AmqpTransportFactory(); diff --git a/pkg/enqueue/functions.php b/pkg/enqueue/functions.php index 25d443890..8960785b7 100644 --- a/pkg/enqueue/functions.php +++ b/pkg/enqueue/functions.php @@ -34,9 +34,11 @@ function dsn_to_connection_factory($dsn) if (class_exists(AmqpExtConnectionFactory::class)) { $map['amqp+ext'] = AmqpExtConnectionFactory::class; + $map['amqps+ext'] = AmqpExtConnectionFactory::class; } if (class_exists(AmqpLibConnectionFactory::class)) { $map['amqp+lib'] = AmqpLibConnectionFactory::class; + $map['amqps+lib'] = AmqpLibConnectionFactory::class; } if (class_exists(AmqpBunnyConnectionFactory::class)) { $map['amqp+bunny'] = AmqpBunnyConnectionFactory::class; @@ -50,6 +52,12 @@ function dsn_to_connection_factory($dsn) $map['amqp'] = AmqpLibConnectionFactory::class; } + if (class_exists(AmqpExtConnectionFactory::class)) { + $map['amqps'] = AmqpExtConnectionFactory::class; + } elseif (class_exists(AmqpLibConnectionFactory::class)) { + $map['amqps'] = AmqpLibConnectionFactory::class; + } + if (class_exists(NullConnectionFactory::class)) { $map['null'] = NullConnectionFactory::class; }