Skip to content

Commit

Permalink
Merge pull request #108 from php-enqueue/amqp-prefetch-support
Browse files Browse the repository at this point in the history
[amqp] Add pre_fetch_count, pre_fetch_size options.
  • Loading branch information
makasim authored Jun 8, 2017
2 parents 65c5b9c + 1c6d283 commit b923d2c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
26 changes: 24 additions & 2 deletions pkg/amqp-ext/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,30 @@ public function createContext()
{
if ($this->config['lazy']) {
return new AmqpContext(function () {
return new \AMQPChannel($this->establishConnection());
return $this->createExtContext($this->establishConnection());
});
}

return new AmqpContext(new \AMQPChannel($this->establishConnection()));
return new AmqpContext($this->createExtContext($this->establishConnection()));
}

/**
* @param \AMQPConnection $extConnection
*
* @return \AMQPChannel
*/
private function createExtContext(\AMQPConnection $extConnection)
{
$channel = new \AMQPChannel($extConnection);
if (false == empty($this->config['pre_fetch_count'])) {
$channel->setPrefetchCount((int) $this->config['pre_fetch_count']);
}

if (false == empty($this->config['pre_fetch_size'])) {
$channel->setPrefetchSize((int) $this->config['pre_fetch_size']);
}

return $channel;
}

/**
Expand Down Expand Up @@ -118,6 +137,7 @@ private function parseDsn($dsn)
if ($dsnConfig['query']) {
$query = [];
parse_str($dsnConfig['query'], $query);

$dsnConfig = array_replace($query, $dsnConfig);
}

Expand Down Expand Up @@ -149,6 +169,8 @@ private function defaultConfig()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
];
}
}
50 changes: 50 additions & 0 deletions pkg/amqp-ext/Tests/AmqpConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
],
];

Expand All @@ -83,6 +85,8 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
],
];

Expand All @@ -99,6 +103,8 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
],
];

Expand All @@ -115,6 +121,8 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
],
];

Expand All @@ -131,6 +139,8 @@ public static function provideConfigs()
'connect_timeout' => '2',
'persisted' => false,
'lazy' => '',
'pre_fetch_count' => null,
'pre_fetch_size' => null,
],
];

Expand All @@ -147,6 +157,8 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
],
];

Expand All @@ -163,6 +175,44 @@ public static function provideConfigs()
'connect_timeout' => null,
'persisted' => false,
'lazy' => false,
'pre_fetch_count' => null,
'pre_fetch_size' => null,
],
];

yield [
['pre_fetch_count' => 123, 'pre_fetch_size' => 321],
[
'host' => 'localhost',
'port' => 5672,
'vhost' => '/',
'user' => 'guest',
'pass' => 'guest',
'read_timeout' => null,
'write_timeout' => null,
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => 123,
'pre_fetch_size' => 321,
],
];

yield [
'amqp://user:pass@host:10000/vhost?pre_fetch_count=123&pre_fetch_size=321',
[
'host' => 'host',
'port' => '10000',
'vhost' => 'vhost',
'user' => 'user',
'pass' => 'pass',
'read_timeout' => null,
'write_timeout' => null,
'connect_timeout' => null,
'persisted' => false,
'lazy' => true,
'pre_fetch_count' => 123,
'pre_fetch_size' => 321,
],
];
}
Expand Down

0 comments on commit b923d2c

Please sign in to comment.