-
Notifications
You must be signed in to change notification settings - Fork 133
Task API
The Tasks API in v2.0 makes it trivially easy to create ad-hoc background processes in your application.
You can run code in a background process with literally a single line of code: A call to the Core_Daemon::task()
method. Its first argument is the Callable or object you want to run in the parallel process. And when you use a callable, any additional arguments passed to task()
are passed to the method call when it is invoked in the background process.
In PHP, a Callable can take three primary forms:
- A string "callback" containing the name of a function, such as
$callable = 'strpos'
- An array "callback" containing an object reference and the name of a method, such as
$callable = array(new myObject, 'myMethod')
- A closure, such as
$callable = function() { .. };
When you pass a callable to task()
, the API will take a splat of the remaining arguments and pass them into the function, method or closure. This is the easiest
way to use the Task API and it's trivially easy to modify an existing, single-process application to begin doing certain types of work in the background.
In more complicated scenarios, you may find that you want to encapsulate functionality that you want to run in the background in its own object.
- Encapsulation of the code that you want to run in the background as its own subroutine of your application.
- Structure
setup()
andteardown()
methods that are invoked automatically, letting you keep your boilerplate code separate. - Organize your project to make it easy to identify code that will be run in the background.
The Core_ITask
interface defines a few simple methods, including a single "entry point" that is invoked automatically in the background process. That start()
method will be called right after the setup()
method returns.
PHP does not expose the ability to use threads in our applications. Developers should remember that every time you create a new task, a background process is forked. Forking has a meaningful overhead on the OS and you absolutely can impact the performance of the server if you fork too many tasks too quickly. Unlike the Worker API, the Task API does not have any throttling or intelligence, it's entirely the developer's responsibility to ensure they're using the Task API in a way that maintains stability in their application and in the server itself.
If you have a Task that is being called often -- and certainly if you're regularly calling a Task while the previous task(s) are still running -- you should consider turning the Task into a Worker. The worker API will let you define a maximum number of simultaneous tasks and once that's reached subsequent calls will be buffered until a worker process is available. By passing multiple jobs to an existing, persistant worker process it spreads the cost of forking that process across multiple calls.
For more examples, see the applications that use the Task API in the /Examples directory.
Since version 2.0
In this example we're passing a closure to the task() method. It doesn't use any existing state, we don't need to pass in any arguments, and we don't have any setup()
or teardown()
code to be run.
Note, though, that to use $this in a forked closure, in PHP 5.3, you cannot pass $this into a Closure's lexical scope. So you'll need to use a $that = $this
hack, and pass $that
to the closure in a use($that)
clause. You can read more about this in the PHP documentation.
$this->task(function(){
$ftp = new Lib_Ftp;
$ftp->open('ftp://backup/');
$ftp->put(BASE_PATH . '/../summary.dat');
$ftp->close();
@unlink(BASE_PATH . '/../summary.dat');
});
Since version 1.1
There are several ways to define a callback in PHP, and all of them will work with the Task API. Essentially, anything that is_callable will work just fine.
In this example we used MailChimp to send a "Congratulations" email when somebody won an auction. This is not a copy/paste, but rather a distilled real-world example. In this case, we passed a callback to a method in the $chimp
object.
// Loop Interval at 1 second
protected function execute() {
// Load the Watcher and attach the auctions this daemon is currently responsible for
$auction_watcher = new Auction_Watcher;
$auction_watcher->watch( $this->auctions );
// Look for auctions that are ended but not closed, and close them.
// We want to queue all events so we can handle them later.
$has_queued_events = false;
foreach($auction_watcher->auction as $auction) {
if($auction->ended == true && $auction->closed == false) {
$auction->close(AW_QUEUE_EVENTS);
$has_queued_events = true;
}
}
// If any events are queued, send them out via mailchimp.
// This will take several seconds per notification. Run each in its own process.
if($has_queued_events)
{
$chimp = MailChimp::getInstance();
$callback = array($chimp, "sendNotification");
foreach($auction_watcher->queue->find(array("type"=>"notification")) as $email) {
$this->task($callback, $email->event->id, $email->header(), $email->body());
}
}
}
It's also a very common use-case to pass a method on the daemon class itself as a task. It gives you the ability to call a method on your object, with all the shared state, in a parallel and non-blocking way. You could even imagine instances where sometimes a method is called directly in-process, and other times that same method is called in the background using the Task API.
// Loop Interval at 1 second
protected function execute() {
// every minute we want to save the execution state to the database
if ($this->runtime() % 60 == 0) {
$callback = array($this, "save_state");
// Each row in $this->processed_events contains array of event_id, event_type, input_json, output_json, ip
// Since we are calling a method on this same object, we don't need to pass the data as an argument.
if ($this->task($callback)) {
// If fork() returns true, it means that the process was forked successfully. We have no idea if
// the save_state() method was successful, but we know that it was, at the very least, called.
// It will dump its data to the log if it fails, so we can feel comfortable that the data is saved.
// Clear our buffer:
$this->processed_events = array();
}
}
}
protected function save_state() {
// Connect to the Database -- For clarity we could also implement this functionality as a Core_ITask object
// with the DB connection code in the `setup()`
$this->db = Mysql_Database::getInstance();
$this->db->connect('production');
if(!$this->db || !$this->db->connected()) {
throw new DB_Exception('No Database Connection');
}
$events = $this->processed_events;
// If the update fails, dump the data to the log so at least we have a copy of it
if (!$this->db->insertBatch($events)) {
$this->log('Database save_state Failed. Message: ' . $this->db->error);
$this->log(json_encode($events));
}
}