-
Notifications
You must be signed in to change notification settings - Fork 133
How Workers Work
Version 2.0
- Internally, workers are just special plugins built on top of the Task API.
- As with Tasks, you pass in a Closure, Callback, or Object. A container (implementing the Mediator design pattern) is created for your worker.
- As with Plugins, you also supply an alias for your worker. Afterwards your worker is available at that alias:
$this->MyAliasHere->someWorkerMethod()
- The Mediator implements all of the Worker functionality -- adding a few relevant event listeners to the Daemon and using the Task API to perform the lowest-level process forking functionality.
- Like plugins, after you create the Worker you'll need to continue setting it up. You can see the Worker API documentation for specifics.
- The Mediator will use several runtime factors to determine a strategy for forking the background processes. If your application is running in a tight event loop with little idle time, for example, all of the worker processes you specify will be forked during startup. In other strategies, processes are forked one at a time, on-demand.
- Background worker processes themselves will respond to signals and write to your application's log file. They also will keep track of their uptime and the number of calls they've processed and will automatically recycle themselves at appropriate thresholds.
- The Mediator communicates with background worker processes using an Operating System API, the SysV Message Queue. When you call a worker, a Call message is passed to the worker pool, and the next available worker will run it. If all workers are busy, the call is buffered and will be called FIFO. When a background process begins running a call, it will send a "running" ack message to the Mediator; when it returns, it will ack "complete."
- Your worker calls are converted by the Mediator into a stdClass based Call Struct. This is passed to and from background processes and to your onReturn and onTimeout callbacks.
- When a background worker process experiences a fatal error, it will attempt to log the error and it will exit without affecting the Mediator or your application as whole. If it was processing a call at the time, when the timeout is reached that call will appear to have timed-out. Your onTimeout callback will be called and you'll have the ability to retry the call.
- When your application shuts down it will attempt to gracefully shutdown workers and, when all work is complete, the application will exit. During shutdown, any Lock you're using will be released as the daemon waits for workers to finish, which would allow you to start a new instance of the daemon while the workers finish up.
During daemon startup, by the time the Core_Daemon::init()
method is called, it has all the information needed to initialize the workers you defined in your setup_workers()
method: Number of processes, the daemon's loop interval, etc.
This information is used to choose a forking strategy for the worker. Forking is an expensive operation and the last thing you want is your application going unresponsive for a few seconds at a critical time while the Mediator forks 5 worker processes that don't even need at that moment. You can expect forking strategies to become more sophisticated and possibly more numerous in future releases. As of 2.0, one of 3 possible strategies is chosen at startup. From the PHP DocBlock:
* @example self::LAZY
* Daemon Startup: No processes are forked
* Worker Method Call: If existing process(es) are busy, fork another worker process for this call, up to the workers() limit.
* In Lazy forking, processes are only forked as-needed
*
* @example self::MIXED
* Daemon Startup: No processes are forked
* Worker Method Call: Fork maximum number of worker processes (as set via workers())
* In Mixed forking, nothing is forked until the first method call but all forks are done simultaneously.
*
* @example self::AGGRESSIVE
* Daemon Startup: All processes are forked up front
* Worker Method Call: Processes are forked as-needed to maintain the max number of available workers
When you call a worker method, the Mediator intercepts the call and serializes it into a simple stdClass-based call struct. It contains the method being called, any arguments passed, information about the current state, what time it was called, and eventually the return value.
The Call Struct is a state machine. Available States:
const UNCALLED = 0;
const CALLED = 1;
const RUNNING = 2;
const RETURNED = 3;
const CANCELLED = 4;
const TIMEOUT = 10;
The life cycle of a method call is fairly straightforward and can be described with a flow chart. Until I come back and create one, I'll describe it here:
- Any calls to a worker are intercepted by the Mediators __invoke or __call methods. A call struct is created.
- The call struct is passed to the
call()
method. This will encode the struct as a message for the background process and cache a copy of the struct locally. - In the background worker process, it receives a new call message. It will decode the message, read-in any arguments, and perform a method call. When the call is complete, return values will be serialized into the call struct and sent back on the "wire" to the Mediator.
- If the Mediator receives a "complete" ack, it will save the updated call struct and pass it to your onReturn callback.
- If no complete ack is received before the timeout specified for that worker, the Mediator will enforce the timeout: The worker process will be killed, the local copy of the call struct will be updated with the timeout status, and it will be passed to you onTimeout callback
- The onTimeout callback can use the
retries
property of the call struct and decide if it should be retried. If so, the struct is passed to theretry()
method which will increment theretries
count and pass the Call Struct back to step 2.
After work is complete and all of your specified callbacks have been called, the struct will be marked for garbage collection by the mediator. The next time the mediator garbage collector runs, it will remove the arguments and return values (which are potential memory hogs) and keep the trimmed-back call struct in memory.
A design goal of the Worker API has been to allow you to call worker methods transparently as if you were calling actual methods in that same process. Obviously worker methods will return instantly without a result, but aside from the asynchronous nature, we wanted worker calls to appear familiar and transparent and easily understandable.
Behind the scenes, this is made possible by the inter-process communication (IPC) that is handled by the Mediator. At the most basic level, the Mediator uses 2 features (shm and msg) bundled with the PHP Semaphore Functions.
When a Call Struct is encoded to be sent to the background worker, a 2-step process is initiated behind the scenes:
- The call struct is serialized and written to shared memory. The ID of the call struct is used as the memory address.
- After we confirm the struct was written to memory, we write a message on a queue unique to this worker pool. The message has several relevant details: the ID of the call struct, the status, the status timestamp.
The hand-off of work is complete when the worker process reads the message off the queue and then reads the serialized call struct from the referenced address.
The background worker processes themselves are little daemon applications. They are forked by the Mediator using the underlying Core_Daemon Task API. The Mediator, which implements the Core_ITask interface, passes itself to the task()
method. If you're familiar with the Task API you'll know that after forking into this new background process, the Core_Worker_Mediator::setup()
method will be called, then the start()
method.
The Core_Worker_Mediator::start()
method enters its own event loop, and blocks waiting for a Call message to be passed from the Mediator. When a message is received it processes it, including sending messages back to the Mediator when appropriate, and then blocks again waiting for the next Call message.
After a preset interval the event loop will exit and the worker process will be recycled. Whether or not a new worker process is immediately forked in its place depends on the forking strategy and the load factor.
On the roadmap for an upcoming point release is to create an underlying storage class, giving you the ability to swap-out the storage mechanism used during IPC. Using the SystemV SHM storage API has revealed weaknesses during development that weren't anticipated during the feature design. The problems reveal themselves as periodic corruption and performance issues when multi-megabyte values are being passed as args or returned from the worker.
The current outline calls for several storage drivers to be created that can be selected automatically by the daemon or specified by the developer. This could include Memcached, Redis, and perhaps just listening on a socket for serialized call data.