Skip to content

Commit

Permalink
feat(octane): add support for concurrent tasks with FrankenPHP
Browse files Browse the repository at this point in the history
  • Loading branch information
vinceAmstoutz committed Dec 19, 2024
1 parent ee88fe3 commit 3b0e730
Show file tree
Hide file tree
Showing 6 changed files with 470 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/Concerns/ProvidesConcurrencySupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
namespace Laravel\Octane\Concerns;

use Laravel\Octane\Contracts\DispatchesTasks;
use Laravel\Octane\FrankenPhp\FrankenPhpHttpTaskDispatcher;
use Laravel\Octane\FrankenPhp\FrankenPhpTaskDispatcher;
use Laravel\Octane\FrankenPhp\ServerStateFile as FrankenPhpServerStateFile;
use Laravel\Octane\SequentialTaskDispatcher;
use Laravel\Octane\Swoole\ServerStateFile;
use Laravel\Octane\Swoole\SwooleHttpTaskDispatcher;
Expand Down Expand Up @@ -35,6 +38,12 @@ public function tasks()
{
return match (true) {
app()->bound(DispatchesTasks::class) => app(DispatchesTasks::class),
app()->bound(FrankenPhpServerStateFile::class) => new FrankenPhpTaskDispatcher(),
class_exists(FrankenPhpServerStateFile::class) => new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher
),
app()->bound(Server::class) => new SwooleTaskDispatcher,
class_exists(Server::class) => (fn (array $serverState) => new SwooleHttpTaskDispatcher(
$serverState['state']['host'] ?? '127.0.0.1',
Expand Down
82 changes: 82 additions & 0 deletions src/FrankenPhp/FrankenPhpHttpTaskDispatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?php

namespace Laravel\Octane\FrankenPhp;

use Closure;
use Exception;
use Illuminate\Http\Client\ConnectionException;
use Illuminate\Support\Facades\Crypt;
use Illuminate\Support\Facades\Http;
use Laravel\Octane\Contracts\DispatchesTasks;
use Laravel\Octane\Exceptions\TaskExceptionResult;
use Laravel\Octane\Exceptions\TaskTimeoutException;
use Laravel\SerializableClosure\SerializableClosure;

class FrankenPhpHttpTaskDispatcher implements DispatchesTasks
{
public function __construct(
protected string $host,
protected string $port,
protected DispatchesTasks $fallbackDispatcher
) {
}

/**
* Concurrently resolve the given callbacks via background tasks, returning the results.
*
* Results will be keyed by their given keys - if a task did not finish, the tasks value will be "false".
*
*
* @throws \Laravel\Octane\Exceptions\TaskException
* @throws \Laravel\Octane\Exceptions\TaskTimeoutException
*/
public function resolve(array $tasks, int $waitMilliseconds = 3000): array
{
$tasks = collect($tasks)->mapWithKeys(function ($task, $key) {
return [
$key => $task instanceof Closure
? new SerializableClosure($task)
: $task,
];
})->all();

try {
$response = Http::timeout(($waitMilliseconds / 1000) + 5)->post("http://{$this->host}:{$this->port}/octane/resolve-tasks", [
'tasks' => Crypt::encryptString(serialize($tasks)),
'wait' => $waitMilliseconds,
]);

return match ($response->status()) {
200 => unserialize($response),
504 => throw TaskTimeoutException::after($waitMilliseconds),
default => throw TaskExceptionResult::from(
new Exception('Invalid response from task server.'),
)->getOriginal(),
};
} catch (ConnectionException) {
return $this->fallbackDispatcher->resolve($tasks, $waitMilliseconds);
}
}

/**
* Concurrently dispatch the given callbacks via background tasks.
*/
public function dispatch(array $tasks): void
{
$tasks = collect($tasks)->mapWithKeys(function ($task, $key) {
return [
$key => $task instanceof Closure
? new SerializableClosure($task)
: $task,
];
})->all();

try {
Http::post("http://{$this->host}:{$this->port}/octane/dispatch-tasks", [
'tasks' => Crypt::encryptString(serialize($tasks)),
]);
} catch (ConnectionException) {
$this->fallbackDispatcher->dispatch($tasks);
}
}
}
74 changes: 74 additions & 0 deletions src/FrankenPhp/FrankenPhpTaskDispatcher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

namespace Laravel\Octane\FrankenPhp;

use Closure;
use InvalidArgumentException;
use Laravel\Octane\Contracts\DispatchesTasks;
use Laravel\Octane\Exceptions\DdException;
use Laravel\Octane\Exceptions\TaskExceptionResult;
use Laravel\Octane\Exceptions\TaskTimeoutException;
use Laravel\SerializableClosure\SerializableClosure;

class FrankenPhpTaskDispatcher implements DispatchesTasks
{
/**
* Concurrently resolve the given callbacks via background tasks, returning the results.
*
* Results will be keyed by their given keys - if a task did not finish, the tasks value will be "false".
*
* @throws \Laravel\Octane\Exceptions\TaskException
* @throws \Laravel\Octane\Exceptions\TaskTimeoutException
* @throws DdException
*/
public function resolve(array $tasks, int $waitMilliseconds = 3000): array
{
if (! app()->bound(ServerStateFile::class)) {
throw new InvalidArgumentException('Tasks can only be dispatched within a FrankenPHP server context.');
}

$results = app(ServerStateFile::class)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) {
return [$key => $task instanceof Closure
? new SerializableClosure($task)
: $task, ];
})->all(), $waitMilliseconds / 1000);

if ($results === false) {
throw TaskTimeoutException::after($waitMilliseconds);
}

$i = 0;

foreach ($tasks as $key => $task) {
if (isset($results[$i])) {
if ($results[$i] instanceof TaskExceptionResult) {
throw $results[$i]->getOriginal();
}

$tasks[$key] = $results[$i]->result;
} else {
$tasks[$key] = false;
}

$i++;
}

return $tasks;
}

/**
* Concurrently dispatch the given callbacks via background tasks.
*/
public function dispatch(array $tasks): void
{
if (! app()->bound(ServerStateFile::class)) {
throw new InvalidArgumentException('Tasks can only be dispatched within a FrankenPHP server context.');
}

$server = app(ServerStateFile::class);

collect($tasks)->each(function ($task) use ($server) {
$server->task($task instanceof Closure ? new SerializableClosure($task) : $task);
});
}
}
10 changes: 10 additions & 0 deletions src/FrankenPhp/TaskResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Laravel\Octane\FrankenPhp;

class TaskResult
{
public function __construct(public mixed $result)
{
}
}
175 changes: 175 additions & 0 deletions tests/FrankenPhpHttpTaskDispatcherTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?php

namespace Laravel\Octane\Tests;

use Exception;
use Illuminate\Support\Facades\Http;
use Laravel\Octane\Exceptions\DdException;
use Laravel\Octane\Exceptions\TaskException;
use Laravel\Octane\Exceptions\TaskTimeoutException;
use Laravel\Octane\FrankenPhp\FrankenPhpHttpTaskDispatcher;
use Laravel\Octane\SequentialTaskDispatcher;
use Orchestra\Testbench\TestCase;

class FrankenPhpHttpTaskDispatcherTest extends TestCase
{
/**
* @throws TaskTimeoutException
* @throws TaskException
*/
public function test_tasks_can_be_resolved_via_http(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

Http::fake([
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(serialize(['first' => 1, 'second' => 2, 'third' => null])),
]);

$this->assertEquals([
'first' => 1,
'second' => 2,
'third' => null,
], $dispatcher->resolve([
'first' => fn () => 1,
'second' => fn () => 2,
'third' => function () {
},
]));
}

/** @doesNotPerformAssertions @test */
public function test_tasks_can_be_dispatched_via_http(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

Http::fake([
'127.0.0.1:8000/octane/dispatch-tasks' => Http::response(serialize(['first' => 1, 'second' => 2])),
]);

$dispatcher->dispatch([
'first' => fn () => 1,
'second' => fn () => 2,
]);
}

public function test_tasks_can_be_resolved_via_fallback_dispatcher(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

$this->assertEquals([
'first' => 1,
'second' => 2,
], $dispatcher->resolve([
'first' => fn () => 1,
'second' => fn () => 2,
]));
}

/** @doesNotPerformAssertions @test */
public function test_tasks_can_be_dispatched_via_fallback_dispatcher(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

$dispatcher->dispatch([
'first' => fn () => 1,
'second' => fn () => 2,
]);
}

/**
* @throws TaskTimeoutException
*/
public function test_resolving_tasks_propagate_exceptions(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

Http::fake([
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 500),
]);

$this->expectException(TaskException::class);
$this->expectExceptionMessage('Invalid response from task server.');

$dispatcher->resolve(['first' => fn () => throw new Exception('Something went wrong.')]);
}

/**
* @throws TaskTimeoutException
*/
public function test_resolving_tasks_propagate_dd_calls(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

Http::fake([
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 500),
]);

$this->expectException(TaskException::class);
$this->expectExceptionMessage('Invalid response from task server.');

$dispatcher->resolve(['first' => fn () => throw new DdException(['foo' => 'bar'])]);
}

/** @doesNotPerformAssertions @test */
public function test_dispatching_tasks_do_not_propagate_exceptions(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

Http::fake([
'127.0.0.1:8000/octane/dispatch-tasks' => Http::response(null, 500),
]);

$dispatcher->dispatch(['first' => fn () => throw new Exception('Something went wrong.')]);
}

public function test_resolving_tasks_may_timeout(): void
{
$dispatcher = new FrankenPhpHttpTaskDispatcher(
'127.0.0.1',
'8000',
new SequentialTaskDispatcher,
);

Http::fake([
'127.0.0.1:8000/octane/resolve-tasks' => Http::response(null, 504),
]);

$this->expectException(TaskTimeoutException::class);
$this->expectExceptionMessage('Task timed out after 2000 milliseconds.');

$dispatcher->resolve(['first' => fn () => 1], 2000);
}

protected function getPackageProviders($app): array
{
return ['Laravel\Octane\OctaneServiceProvider'];
}
}
Loading

0 comments on commit 3b0e730

Please sign in to comment.