Skip to content

Commit

Permalink
Unify daemon processes (#848)
Browse files Browse the repository at this point in the history
* Refactoring Daemon to use new JobInterface

* Add InterruptTrait, separate parent daemon/child daemons

* Make daemon a JobInterface

* WIP checkin that can run a simple loop job

* Debugger Daemon can run through the google-cloud-batch daemon

* Ignore debugger if the daemon hasn't registered and reported its status

* Updating phpdoc

* Cleanup job traits

* clean up JobConfig variable names

* Fix debugger tests

* BatchConfigTest->JobConfigTest

* Fix tests

* Fix snippet tests

* Fix merge

* Fix unit tests

* Fixing tests

* New daemon options used by the lazy loaded debuggee

* Fix debug failure

* Fix type values for traits

* Agent provides storage implementation to daemon

* Add exponential backoff and exception handling to Agent/Daemon

* Force GC in Daemon loop

* With combined daemons, you no longer need the additional supervisord config

* Fix snippet test

* Use eventually consistent test trait for checking debuggeeId in E2E test

* Fix typo in E2ETest

* We no longer need the google-cloud-debugger script

* Remove __wakeup bootstrap file hack

* Fix test expectation

* Remove commented out test

* Fix tests from merge

* phpcs fix from merge

* Address PR feedback

* Adding tests for JobTrait, SimpleJob, SimpleJobTrait

* Extra closure serialization client config into a trait

* Ensure the deployment succeeds. Retry on 5xx errors

* Fix agent starting timing with the daemon registration
  • Loading branch information
chingor13 authored and dwsupplee committed Feb 8, 2018
1 parent 0876c0f commit 4f79a5e
Show file tree
Hide file tree
Showing 41 changed files with 1,140 additions and 500 deletions.
3 changes: 1 addition & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@
"google-cloud": "dev/google-cloud"
},
"bin": [
"src/Core/bin/google-cloud-batch",
"src/Debugger/bin/google-cloud-debugger"
"src/Core/bin/google-cloud-batch"
],
"extra": {
"component": {
Expand Down
113 changes: 19 additions & 94 deletions src/Core/Batch/BatchDaemon.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ class BatchDaemon
use BatchDaemonTrait;
use HandleFailureTrait;
use SysvTrait;
use InterruptTrait;

/* @var BatchRunner */
private $runner;

/* @var bool */
private $shutdown;

/* @var array */
private $descriptorSpec;

Expand Down Expand Up @@ -72,53 +70,31 @@ public function __construct($entrypoint)
1 => ['file', 'php://stdout', 'w'],
2 => ['file', 'php://stderr', 'w']
];
// setup signal handlers
pcntl_signal(SIGTERM, [$this, "sigHandler"]);
pcntl_signal(SIGINT, [$this, "sigHandler"]);
pcntl_signal(SIGHUP, [$this, "sigHandler"]);
pcntl_signal(SIGALRM, [$this, "sigHandler"]);

$this->command = sprintf('exec php -d auto_prepend_file="" %s daemon', $entrypoint);
$this->initFailureFile();
}

/**
* A signal handler for setting the terminate switch.
* {@see http://php.net/manual/en/function.pcntl-signal.php}
*
* @param int $signo The received signal.
* @param mixed $siginfo [optional] An array representing the signal
* information. **Defaults to** null.
*
* @return void
*/
public function sigHandler($signo, $signinfo = null)
{
switch ($signo) {
case SIGINT:
case SIGTERM:
$this->shutdown = true;
break;
}
}

/**
* A loop for the parent.
*
* @return void
*/
public function runParent()
public function run()
{
$this->setupSignalHandlers();

$procs = [];
while (true) {
$jobs = $this->runner->getJobs();
foreach ($jobs as $job) {
if (! array_key_exists($job->getIdentifier(), $procs)) {
$procs[$job->getIdentifier()] = [];
if (! array_key_exists($job->identifier(), $procs)) {
$procs[$job->identifier()] = [];
}
while (count($procs[$job->getIdentifier()]) > $job->getWorkerNum()) {
while (count($procs[$job->identifier()]) > $job->numWorkers()) {
// Stopping an excessive child.
echo 'Stopping an excessive child.' . PHP_EOL;
$proc = array_pop($procs[$job->getIdentifier()]);
$proc = array_pop($procs[$job->identifier()]);
$status = proc_get_status($proc);
// Keep sending SIGTERM until the child exits.
while ($status['running'] === true) {
Expand All @@ -128,10 +104,10 @@ public function runParent()
}
@proc_close($proc);
}
for ($i = 0; $i < $job->getWorkerNum(); $i++) {
for ($i = 0; $i < $job->numWorkers(); $i++) {
$needStart = false;
if (array_key_exists($i, $procs[$job->getIdentifier()])) {
$status = proc_get_status($procs[$job->getIdentifier()][$i]);
if (array_key_exists($i, $procs[$job->identifier()])) {
$status = proc_get_status($procs[$job->identifier()][$i]);
if ($status['running'] !== true) {
$needStart = true;
}
Expand All @@ -140,8 +116,8 @@ public function runParent()
}
if ($needStart) {
echo 'Starting a child.' . PHP_EOL;
$procs[$job->getIdentifier()][$i] = proc_open(
sprintf('%s %d', $this->command, $job->getIdNum()),
$procs[$job->identifier()][$i] = proc_open(
sprintf('%s %d', $this->command, $job->id()),
$this->descriptorSpec,
$pipes
);
Expand Down Expand Up @@ -173,64 +149,13 @@ public function runParent()
}

/**
* A loop for the children.
* Fetch the child job by id.
*
* @param int $idNum Numeric id for the job.
* @return void
* @param int $idNum The id of the job to find
* @return JobInterface
*/
public function runChild($idNum)
public function job($idNum)
{
// child process
$sysvKey = $this->getSysvKey($idNum);
$q = msg_get_queue($sysvKey);
$items = [];
$job = $this->runner->getJobFromIdNum($idNum);
$period = $job->getCallPeriod();
$lastInvoked = microtime(true);
$batchSize = $job->getBatchSize();
while (true) {
// Fire SIGALRM after 1 second to unblock the blocking call.
pcntl_alarm(1);
if (msg_receive(
$q,
0,
$type,
8192,
$message,
true,
0, // blocking mode
$errorcode
)) {
if ($type === self::$typeDirect) {
$items[] = $message;
} elseif ($type === self::$typeFile) {
$items[] = unserialize(file_get_contents($message));
@unlink($message);
}
}
pcntl_signal_dispatch();
// It runs the job when
// 1. Number of items reaches the batchSize.
// 2-a. Count is >0 and the current time is larger than lastInvoked + period.
// 2-b. Count is >0 and the shutdown flag is true.
if ((count($items) >= $batchSize)
|| (count($items) > 0
&& (microtime(true) > $lastInvoked + $period
|| $this->shutdown))) {
printf(
'Running the job with %d items' . PHP_EOL,
count($items)
);
if (! $job->run($items)) {
$this->handleFailure($idNum, $items);
}
$items = [];
$lastInvoked = microtime(true);
}
gc_collect_cycles();
if ($this->shutdown) {
exit;
}
}
return $this->runner->getJobFromIdNum($idNum);
}
}
3 changes: 0 additions & 3 deletions src/Core/Batch/BatchDaemonTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
*/
trait BatchDaemonTrait
{
private static $typeDirect = 1;
private static $typeFile = 2;

/**
* Returns whether or not the BatchDaemon is running.
*
Expand Down
Loading

0 comments on commit 4f79a5e

Please sign in to comment.