Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FS] Polling Interval #192

Merged
merged 3 commits into from
Sep 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ function waitForService()
waitForService rabbitmq 5672 50
waitForService mysql 3306 50
waitForService redis 6379 50
waitForService beanstalkd 11300
waitForService gearmand 4730
waitForService kafka 9092
waitForService beanstalkd 11300 50
waitForService gearmand 4730 50
waitForService kafka 9092 50

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
9 changes: 8 additions & 1 deletion pkg/fs/FsConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class FsConnectionFactory implements PsrConnectionFactory
* 'path' => 'the directory where all queue\topic files remain. For example /home/foo/enqueue',
* 'pre_fetch_count' => 'Integer. Defines how many messages to fetch from the file.',
* 'chmod' => 'Defines a mode the files are created with',
* 'polling_interval' => 'How often query for new messages, default 100 (milliseconds)',
* ]
*
* or
Expand Down Expand Up @@ -48,7 +49,12 @@ public function __construct($config = 'file://')
*/
public function createContext()
{
return new FsContext($this->config['path'], $this->config['pre_fetch_count'], $this->config['chmod']);
return new FsContext(
$this->config['path'],
$this->config['pre_fetch_count'],
$this->config['chmod'],
$this->config['polling_interval']
);
}

/**
Expand Down Expand Up @@ -99,6 +105,7 @@ private function defaultConfig()
'path' => null,
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
];
}
}
45 changes: 41 additions & 4 deletions pkg/fs/FsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ class FsConsumer implements PsrConsumer
*/
private $preFetchedMessages;

/**
* @var int microseconds
*/
private $pollingInterval = 100000;

/**
* @param FsContext $context
* @param FsDestination $destination
Expand All @@ -42,6 +47,26 @@ public function __construct(FsContext $context, FsDestination $destination, $pre
$this->preFetchedMessages = [];
}

/**
* Set polling interval in milliseconds.
*
* @param int $msec
*/
public function setPollingInterval($msec)
{
$this->pollingInterval = $msec * 1000;
}

/**
* Get polling interval in milliseconds.
*
* @return int
*/
public function getPollingInterval()
{
return (int) $this->pollingInterval / 1000;
}

/**
* {@inheritdoc}
*
Expand All @@ -59,13 +84,25 @@ public function getQueue()
*/
public function receive($timeout = 0)
{
$end = microtime(true) + ($timeout / 1000);
while (0 === $timeout || microtime(true) < $end) {
if ($message = $this->receiveNoWait()) {
$timeout /= 1000;
$startAt = microtime(true);

while (true) {
$message = $this->receiveNoWait();

if ($message) {
return $message;
}

usleep(100);
if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return;
}

usleep($this->pollingInterval);

if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return;
}
}
}

Expand Down
17 changes: 15 additions & 2 deletions pkg/fs/FsContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,26 @@ class FsContext implements PsrContext
*/
private $lockHandlers;

/**
* @var null
*/
private $pollingInterval;

/**
* @param string $storeDir
* @param int $preFetchCount
* @param int $chmod
* @param null $pollingInterval
*/
public function __construct($storeDir, $preFetchCount, $chmod)
public function __construct($storeDir, $preFetchCount, $chmod, $pollingInterval = null)
{
$fs = new Filesystem();
$fs->mkdir($storeDir);

$this->storeDir = $storeDir;
$this->preFetchCount = $preFetchCount;
$this->chmod = $chmod;
$this->pollingInterval = $pollingInterval;

$this->lockHandlers = [];
}
Expand Down Expand Up @@ -160,7 +167,13 @@ public function createConsumer(PsrDestination $destination)
{
InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class);

return new FsConsumer($this, $destination, $this->preFetchCount);
$consumer = new FsConsumer($this, $destination, $this->preFetchCount);

if ($this->pollingInterval) {
$consumer->setPollingInterval($this->pollingInterval);
}

return $consumer;
}

public function close()
Expand Down
5 changes: 5 additions & 0 deletions pkg/fs/Symfony/FsTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public function addConfiguration(ArrayNodeDefinition $builder)
->defaultValue(0600)
->info('The queue files are created with this given permissions if not exist.')
->end()
->integerNode('polling_interval')
->defaultValue(100)
->min(50)
->info('How often query for new messages.')
->end()
;
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/fs/Tests/FsConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static function provideConfigs()
'path' => sys_get_temp_dir().'/enqueue',
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
],
];

Expand All @@ -67,6 +68,7 @@ public static function provideConfigs()
'path' => sys_get_temp_dir().'/enqueue',
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
],
];

Expand All @@ -76,6 +78,7 @@ public static function provideConfigs()
'path' => sys_get_temp_dir().'/enqueue',
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
],
];

Expand All @@ -85,6 +88,7 @@ public static function provideConfigs()
'path' => sys_get_temp_dir().'/enqueue',
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
],
];

Expand All @@ -94,6 +98,7 @@ public static function provideConfigs()
'path' => '/foo/bar/baz',
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
],
];

Expand All @@ -103,6 +108,7 @@ public static function provideConfigs()
'path' => '/foo/bar/baz',
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
],
];

Expand All @@ -112,6 +118,7 @@ public static function provideConfigs()
'path' => '/foo/bar/baz',
'pre_fetch_count' => 100,
'chmod' => 0666,
'polling_interval' => 100,
],
];
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/fs/Tests/FsConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ public function testShouldDoNothingOnReject()
$consumer->reject(new FsMessage());
}

public function testCouldSetAndGetPollingInterval()
{
$consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123);
$consumer->setPollingInterval(123456);

$this->assertEquals(123456, $consumer->getPollingInterval());
}

public function testShouldSendSameMessageToDestinationOnReQueue()
{
$message = new FsMessage();
Expand Down
14 changes: 14 additions & 0 deletions pkg/fs/Tests/FsContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,18 @@ public function testShouldCreateFileOnFilesystemIfNotExistOnDeclareDestination()

unlink($tmpFile);
}

public function testShouldCreateMessageConsumerAndSetPollingInterval()
{
$tmpFile = new TempFile(sys_get_temp_dir().'/foo');

$context = new FsContext(sys_get_temp_dir(), 1, 0666, 123456);

$queue = $context->createQueue($tmpFile->getFilename());

$consumer = $context->createConsumer($queue);

$this->assertInstanceOf(FsConsumer::class, $consumer);
$this->assertEquals(123456, $consumer->getPollingInterval());
}
}
5 changes: 5 additions & 0 deletions pkg/fs/Tests/Symfony/FsTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public function testShouldAllowAddConfiguration()
'path' => sys_get_temp_dir(),
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
], $config);
}

Expand All @@ -69,6 +70,7 @@ public function testShouldAllowAddConfigurationAsString()
'dsn' => 'fileDSN',
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
], $config);
}

Expand All @@ -82,6 +84,7 @@ public function testShouldCreateConnectionFactory()
'path' => sys_get_temp_dir(),
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
]);

$this->assertTrue($container->hasDefinition($serviceId));
Expand All @@ -91,6 +94,7 @@ public function testShouldCreateConnectionFactory()
'path' => sys_get_temp_dir(),
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
]], $factory->getArguments());
}

Expand Down Expand Up @@ -120,6 +124,7 @@ public function testShouldCreateContext()
'path' => sys_get_temp_dir(),
'pre_fetch_count' => 1,
'chmod' => 0600,
'polling_interval' => 100,
]);

$this->assertEquals('enqueue.transport.fs.context', $serviceId);
Expand Down