Skip to content

Commit

Permalink
Merge branch 'main' into feature/workflow-cloud
Browse files Browse the repository at this point in the history
gplanchat authored Nov 21, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 48490a7 + d8f8820 commit 7f43d16
Showing 15 changed files with 3,777 additions and 4,594 deletions.
42 changes: 22 additions & 20 deletions composer.json
Original file line number Diff line number Diff line change
@@ -20,46 +20,48 @@
"ext-json": "*",
"psr/log": "^3.0",
"symfony/console": "^6.0",
"symfony/process": "^6.0",
"symfony/config": "^6.0",
"symfony/yaml": "^6.0",
"symfony/dependency-injection": "^6.0",
"nikic/php-parser": "^4.10",
"symfony/http-client": "^6.0",
"nikic/php-parser": "^4.15",
"nyholm/psr7": "^1.5",
"psr/http-client": "^1.0",
"react/promise": "^2.9",
"symfony/dotenv": "^6.0",
"php-etl/packaging": "*",
"php-etl/configurator-contracts": "0.8.*",
"php-etl/satellite-toolbox": "*",
"php-etl/pipeline-console-runtime": "*",
"php-etl/gyroscops-api-client": "*",
"php-etl/dockerfile": "*",
"php-etl/pipeline": "*",
"php-etl/workflow-console-runtime": "*",
"composer/composer": "*"
"composer/composer": "*",
"symfony/deprecation-contracts": "*",
"react/child-process": "^0.7",
"react/async": "^4.1",
"react/promise-timer": "^1.10"
},
"require-dev": {
"phpunit/phpunit": "^10.0",
"phpspec/phpspec": "^7.3",
"phpstan/phpstan": "^1.10",
"infection/infection": "^0.26.18",
"infection/infection": "^0.26",
"friends-of-phpspec/phpspec-code-coverage": "*",
"fakerphp/faker": "^1.9",
"justinrainbow/json-schema": "^5.2",
"rector/rector": "^0.15",
"php-etl/phpunit-extension": "*",
"php-etl/akeneo-expression-language": "*",
"php-etl/array-expression-language": "*",
"php-etl/string-expression-language": "*",
"php-etl/fast-map-plugin": "*",
"php-etl/akeneo-plugin": "*",
"php-etl/csv-plugin": "*",
"php-etl/spreadsheet-plugin": "*",
"php-etl/sql-plugin": "*",
"php-etl/sylius-plugin": "*",
"mikey179/vfsstream": "^1.6"
"php-etl/phpunit-extension": "0.7.*",
"mikey179/vfsstream": "^1.6",
"symfony/http-client": "^6.3",
"friendsofphp/php-cs-fixer": "^3.38"
},
"suggest": {
"php-etl/array-expression-language": "A set of function for arrays manipulation",
"php-etl/string-expression-language": "A set of function for string manipulation",
"php-etl/fast-map-plugin": "A Gyroscops plugin for data mapping",
"php-etl/akeneo-plugin": "A Gyroscops plugin for Akeneo API connectivity",
"php-etl/csv-plugin": "A Gyroscops plugin for CSV format",
"php-etl/spreadsheet-plugin": "A Gyroscops plugin for Excel and Open Document formats",
"php-etl/sql-plugin": "A Gyroscops plugin for SQL connectivity using PDO",
"php-etl/sylius-plugin": "A Gyroscops plugin for Sylius API connectivity"
},
"autoload": {
"psr-4": {
@@ -87,7 +89,7 @@
"bin": ["bin/satellite", "bin/cloud"],
"extra": {
"branch-alias": {
"dev-main": "0.6.x-dev"
"dev-main": "0.7.x-dev"
},
"gyroscops": {
"adapters": [
7,542 changes: 3,245 additions & 4,297 deletions composer.lock

Large diffs are not rendered by default.

56 changes: 28 additions & 28 deletions src/Adapter/Composer.php
Original file line number Diff line number Diff line change
@@ -6,7 +6,10 @@

use Psr\Log\AbstractLogger;
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\Process;
use React\ChildProcess\Process;
use React\Promise\Deferred;
use function React\Async\await;
use function React\Promise\Timer\timeout;

final class Composer
{
@@ -21,52 +24,49 @@ public function log($level, $message, array $context = []): void
};
}

private function execute(Process $process): void
private function execute(Process $process, float $timeout = 300): void
{
$process->run(function ($type, $buffer): void {
if (Process::ERR === $type) {
$this->logger->info($buffer);
} else {
$this->logger->debug($buffer);
}
$process->start();

$process->stdout->on('data', function ($chunk) {
$this->logger->debug($chunk);
});
$process->stderr->on('data', function ($chunk) {
$this->logger->info($chunk);
});

if (0 !== $process->getExitCode()) {
throw new ComposerFailureException($process->getCommandLine(), sprintf('Process exited unexpectedly with output: %s', $process->getErrorOutput()), $process->getExitCode());
}
}
$deferred = new Deferred();

private function command(string ...$command): void
{
$process = new Process($command);
$process->setWorkingDirectory($this->workdir);
$process->on('exit', function () use ($deferred) {
$deferred->resolve();
});

$process->setTimeout(300);
$this->logger->debug(sprintf('Starting process "%s".', $process->getCommand()));

$this->execute($process);
await(timeout($deferred->promise(), $timeout));

if (0 !== $process->getExitCode()) {
throw new ComposerFailureException($process->getCommand(), sprintf('Process exited unexpectedly with output: %s', $process->getExitCode()), $process->getExitCode());
}
}

private function pipe(Process ...$processes): void
private function command(string ...$command): void
{
$process = Process::fromShellCommandline(implode('|', array_map(fn (Process $process) => $process->getCommandLine(), $processes)));
$process->setWorkingDirectory($this->workdir);

$process->setTimeout(300);
$process = new Process(
implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)),
$this->workdir,
);

$this->execute($process);
}

private function subcommand(string ...$command): Process
{
return new Process($command);
}

public function require(string ...$packages): void
{
$this->command(
'composer',
'require',
'--with-dependencies',
'--with-all-dependencies',
'--prefer-dist',
'--no-progress',
'--prefer-stable',
57 changes: 44 additions & 13 deletions src/Adapter/Docker/Satellite.php
Original file line number Diff line number Diff line change
@@ -6,10 +6,15 @@

use Kiboko\Component\Dockerfile;
use Kiboko\Component\Packaging\TarArchive;
use Kiboko\Component\Satellite\Adapter\ComposerFailureException;
use Kiboko\Contract\Configurator;
use Kiboko\Contract\Packaging;
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\Process;
use React\ChildProcess\Process;
use React\Promise\Deferred;
use React\Stream\ReadableResourceStream;
use function React\Async\await;
use function React\Promise\Timer\timeout;

final class Satellite implements Configurator\SatelliteInterface
{
@@ -65,24 +70,50 @@ public function build(
}
};

$process = new Process([
'docker', 'build', '--rm', '-', ...iterator_to_array($iterator($this->imageTags)),
]);
$command = ['docker', 'build', '--rm', '-', ...iterator_to_array($iterator($this->imageTags))];

$process->setInput($archive->asResource());
$process = new Process(
implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)),
$this->workdir,
);

$process->setTimeout(300);
$process->start();

$process->run(function ($type, $buffer) use ($logger): void {
if (Process::ERR === $type) {
$logger->info($buffer);
} else {
$logger->debug($buffer);
}
});
$input = new ReadableResourceStream($archive->asResource());

$input->pipe($process->stdin);

$this->execute($logger, $process);

if (0 !== $process->getExitCode()) {
throw new \RuntimeException('Process exited unexpectedly.');
}
}

private function execute(
LoggerInterface $logger,
Process $process,
float $timeout = 300
): void {
$process->stdout->on('data', function ($chunk) use ($logger) {
$logger->debug($chunk);
});
$process->stderr->on('data', function ($chunk) use ($logger) {
$logger->info($chunk);
});

$deferred = new Deferred();

$process->on('exit', function () use ($deferred) {
$deferred->resolve();
});

$logger->debug(sprintf('Starting process "%s".', $process->getCommand()));

await(timeout($deferred->promise(), $timeout));

if (0 !== $process->getExitCode()) {
throw new ComposerFailureException($process->getCommand(), sprintf('Process exited unexpectedly with output: %s', $process->getExitCode()), $process->getExitCode());
}
}
}
6 changes: 6 additions & 0 deletions src/Builder/Pipeline.php
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ public function __construct(
) {}

public function addExtractor(
Node\Expr|Builder $code,
Node\Expr|Builder $extractor,
Node\Expr|Builder $rejection,
Node\Expr|Builder $state,
@@ -24,6 +25,7 @@ public function addExtractor(
var: $runtime,
name: new Node\Identifier('extract'),
args: [
new Node\Arg($code instanceof Builder ? $code->getNode() : $code),
new Node\Arg($extractor instanceof Builder ? $extractor->getNode() : $extractor),
new Node\Arg($rejection instanceof Builder ? $rejection->getNode() : $rejection),
new Node\Arg($state instanceof Builder ? $state->getNode() : $state),
@@ -34,6 +36,7 @@ public function addExtractor(
}

public function addTransformer(
Node\Expr|Builder $code,
Node\Expr|Builder $transformer,
Node\Expr|Builder $rejection,
Node\Expr|Builder $state,
@@ -42,6 +45,7 @@ public function addTransformer(
var: $runtime,
name: new Node\Identifier('transform'),
args: [
new Node\Arg($code instanceof Builder ? $code->getNode() : $code),
new Node\Arg($transformer instanceof Builder ? $transformer->getNode() : $transformer),
new Node\Arg($rejection instanceof Builder ? $rejection->getNode() : $rejection),
new Node\Arg($state instanceof Builder ? $state->getNode() : $state),
@@ -52,6 +56,7 @@ public function addTransformer(
}

public function addLoader(
Node\Expr|Builder $code,
Node\Expr|Builder $loader,
Node\Expr|Builder $rejection,
Node\Expr|Builder $state,
@@ -60,6 +65,7 @@ public function addLoader(
var: $runtime,
name: new Node\Identifier('load'),
args: [
new Node\Arg($code instanceof Builder ? $code->getNode() : $code),
new Node\Arg($loader instanceof Builder ? $loader->getNode() : $loader),
new Node\Arg($rejection instanceof Builder ? $rejection->getNode() : $rejection),
new Node\Arg($state instanceof Builder ? $state->getNode() : $state),
42 changes: 38 additions & 4 deletions src/Builder/Workflow.php
Original file line number Diff line number Diff line change
@@ -15,18 +15,34 @@ public function __construct(
private readonly Node\Expr $runtime
) {}

public function addPipeline(
string $pipelineFilename,
): self {
public function addPipeline(string $code, string $pipelineFilename): self {
$this->jobs[] = fn (Node\Expr $runtime) => new Node\Expr\MethodCall(
var: $runtime,
name: new Node\Identifier('job'),
args: [
new Node\Arg(
new Node\Expr\StaticCall(
new Node\Name\FullyQualified(\Kiboko\Component\Workflow\JobCode::class),
new Node\Identifier('fromString'),
[
new Node\Arg(new Node\Scalar\String_($code)),
],
),
),
new Node\Arg(
new Node\Expr\MethodCall(
var: new Node\Expr\Variable('runtime'),
name: 'loadPipeline',
args: [
new Node\Arg(
new Node\Expr\StaticCall(
new Node\Name\FullyQualified(\Kiboko\Component\Workflow\JobCode::class),
new Node\Identifier('fromString'),
[
new Node\Arg(new Node\Scalar\String_($code)),
],
),
),
new Node\Arg(
value: new Node\Expr\BinaryOp\Concat(
left: new Node\Scalar\MagicConst\Dir(),
@@ -47,17 +63,35 @@ public function addPipeline(
return $this;
}

public function addAction(string $pipelineFilename): self
public function addAction(string $code, string $pipelineFilename): self
{
$this->jobs[] = fn (Node\Expr $runtime) => new Node\Expr\MethodCall(
var: $runtime,
name: new Node\Identifier('job'),
args: [
new Node\Arg(
new Node\Expr\StaticCall(
new Node\Name\FullyQualified(\Kiboko\Component\Workflow\JobCode::class),
new Node\Identifier('fromString'),
[
new Node\Arg(new Node\Scalar\String_($code)),
],
),
),
new Node\Arg(
new Node\Expr\MethodCall(
var: new Node\Expr\Variable('runtime'),
name: 'loadAction',
args: [
new Node\Arg(
new Node\Expr\StaticCall(
new Node\Name\FullyQualified(\Kiboko\Component\Workflow\JobCode::class),
new Node\Identifier('fromString'),
[
new Node\Arg(new Node\Scalar\String_($code)),
],
),
),
new Node\Arg(
value: new Node\Expr\BinaryOp\Concat(
left: new Node\Scalar\MagicConst\Dir(),
28 changes: 25 additions & 3 deletions src/Console/Command/ApiRunCommand.php
Original file line number Diff line number Diff line change
@@ -4,8 +4,30 @@

namespace Kiboko\Component\Satellite\Console\Command;

final class ApiRunCommand extends HookRunCommand
use Symfony\Component\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

#[Console\Attribute\AsCommand('run:api', 'Run an HTTP API satellite.', hidden: true)]
final class ApiRunCommand extends RunCommand
{
protected static $defaultName = 'run:api';
protected static $defaultDescription = 'Run the api.';
protected function configure(): void
{
$this->addArgument('path', Console\Input\InputArgument::REQUIRED);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$style = new Console\Style\SymfonyStyle(
$input,
$output,
);

$style->warning([
'The command "run:hook is deprecated and will be removed in future releases.',
'Please use the "run" command as a replacement.'
]);

return parent::execute($input, $output);
}
}
31 changes: 7 additions & 24 deletions src/Console/Command/HookRunCommand.php
Original file line number Diff line number Diff line change
@@ -7,13 +7,10 @@
use Symfony\Component\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Process;

class HookRunCommand extends Console\Command\Command
#[Console\Attribute\AsCommand('run:hook', 'Run an HTTP hook satellite.', hidden: true)]
final class HookRunCommand extends RunCommand
{
protected static $defaultName = 'run:hook';
protected static $defaultDescription = 'Run the hook.';

protected function configure(): void
{
$this->addArgument('path', Console\Input\InputArgument::REQUIRED);
@@ -26,25 +23,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$output,
);

$style->writeln(sprintf('<fg=cyan>Starting server in %s</>', $input->getArgument('path')));

if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) {
$style->error('Nothing is compiled at the provided path');

return \Symfony\Component\Console\Command\Command::FAILURE;
}

$cwd = getcwd();
chdir($input->getArgument('path'));

$process = new Process(['php', '-S', 'localhost:8000', 'main.php']);
$process->setTimeout(null);
$process->run(function ($type, $buffer): void {
echo $buffer;
});

chdir($cwd);
$style->warning([
'The command "run:hook is deprecated and will be removed in future releases.',
'Please use the "run" command as a replacement.'
]);

return \Symfony\Component\Console\Command\Command::SUCCESS;
return parent::execute($input, $output);
}
}
105 changes: 6 additions & 99 deletions src/Console/Command/PipelineRunCommand.php
Original file line number Diff line number Diff line change
@@ -4,18 +4,13 @@

namespace Kiboko\Component\Satellite\Console\Command;

use Composer\Autoload\ClassLoader;
use Kiboko\Component\Runtime\Pipeline\Console as PipelineConsoleRuntime;
use Symfony\Component\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Dotenv\Dotenv;

final class PipelineRunCommand extends Console\Command\Command
#[Console\Attribute\AsCommand('run:pipeline', 'Run a data flow satellite (pipeline or workflow).', hidden: true)]
final class PipelineRunCommand extends RunCommand
{
protected static $defaultName = 'run:pipeline';
protected static $defaultDescription = 'Run the pipeline satellite.';

protected function configure(): void
{
$this->addArgument('path', Console\Input\InputArgument::REQUIRED);
@@ -28,99 +23,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$output,
);

$style->writeln(sprintf('<fg=cyan>Running pipeline in %s</>', $input->getArgument('path')));

if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) {
$style->error('There is no compiled pipeline at the provided path');

return \Symfony\Component\Console\Command\Command::FAILURE;
}

$cwd = getcwd();
chdir($input->getArgument('path'));

$dotenv = new Dotenv();
$dotenv->usePutenv();

if (file_exists($file = $cwd.'/.env')) {
$dotenv->loadEnv($file);
}
if (file_exists($file = $cwd.'/'.$input->getArgument('path').'/.env')) {
$dotenv->loadEnv($file);
}

/** @var ClassLoader $autoload */
$autoload = include 'vendor/autoload.php';
$autoload->addClassMap([
/* @phpstan-ignore-next-line */
\ProjectServiceContainer::class => 'container.php',
$style->warning([
'The command "run:pipeline is deprecated and will be removed in future releases.',
'Please use the "run" command as a replacement.'
]);
$autoload->register();

$runtime = new PipelineConsoleRuntime(
$output,
new \Kiboko\Component\Pipeline\Pipeline(
new \Kiboko\Component\Pipeline\PipelineRunner(
new \Psr\Log\NullLogger()
)
),
);

if (!file_exists('pipeline.php')) {
$style->error('The provided path does not contain one single pipeline, did you mean to run "run:workflow"?');

return \Symfony\Component\Console\Command\Command::FAILURE;
}
/** @var callable(runtime: PipelineRuntimeInterface): \Runtime $pipeline */
$pipeline = include 'pipeline.php';

$start = microtime(true);
$pipeline($runtime);
$runtime->run();
$end = microtime(true);

$autoload->unregister();

$style->writeln(sprintf('time: %s', $this->formatTime($end - $start)));

chdir($cwd);

return \Symfony\Component\Console\Command\Command::SUCCESS;
}

private function formatTime(float $time): string
{
if ($time < .00001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 2));
}
if ($time < .0001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 1));
}
if ($time < .001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000));
}
if ($time < .01) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 2));
}
if ($time < .1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 1));
}
if ($time < 1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000));
}
if ($time < 10) {
return sprintf('<fg=cyan>%ss</>', number_format($time, 2));
}
if ($time < 3600) {
$minutes = floor($time / 60);
$seconds = $time - (60 * $minutes);

return sprintf('<fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($minutes), number_format($seconds, 2));
}
$hours = floor($time / 3600);
$minutes = floor(($time - (3600 * $hours)) / 60);
$seconds = $time - (3600 * $hours) - (60 * $minutes);

return sprintf('<fg=cyan>%sh</> <fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($hours), number_format($minutes), number_format($seconds, 2));
return parent::execute($input, $output);
}
}
280 changes: 280 additions & 0 deletions src/Console/Command/RunCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
<?php

declare(strict_types=1);

namespace Kiboko\Component\Satellite\Console\Command;

use React\ChildProcess\Process;
use React\Promise\Deferred;
use React\Stream\ReadableResourceStream;
use Symfony\Component\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use function React\Async\await;

#[Console\Attribute\AsCommand('run', 'Run a data flow satellite (pipeline or workflow).')]
class RunCommand extends Console\Command\Command
{
protected function configure(): void
{
$this->addArgument('path', Console\Input\InputArgument::REQUIRED);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$style = new Console\Style\SymfonyStyle(
$input,
$output,
);

if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) {
$style->error('There is no compiled satellite at the provided path.');

return Console\Command\Command::FAILURE;
}

$cwd = getcwd();
chdir($input->getArgument('path'));

if (file_exists('pipeline.php')) {
$style->writeln(sprintf('<fg=cyan>Running pipeline in %s</>', $input->getArgument('path')));

$process = $this->pipelineWorker($style, $cwd, $input->getArgument('path'), 'pipeline.php');
} else if (file_exists('workflow.php')) {
$style->writeln(sprintf('<fg=cyan>Running workflow in %s</>', $input->getArgument('path')));

$process = $this->workflowWorker($style, $cwd, $input->getArgument('path'), 'workflow.php');
} else if (file_exists('main.php')) {
$style->writeln(sprintf('<fg=cyan>Running API in %s</>', $input->getArgument('path')));

$process = $this->httpWorker($style, $cwd, $input->getArgument('path'), 'main.php');
} else {
$style->error('The provided path does not contain either a workflow or a pipeline satellite, did you mean to run "run:api"?');
return Console\Command\Command::FAILURE;
}

$start = microtime(true);

if (!$this->executeWorker($style, $process)) {
return Console\Command\Command::FAILURE;
}

$end = microtime(true);

$style->writeln(sprintf('time: %s', $this->formatTime($end - $start)));

return Console\Command\Command::SUCCESS;
}

private function pipelineWorker(Console\Style\SymfonyStyle $style, string $cwd, string $path, string $entrypoint): Process
{
$source =<<<PHP
<?php
declare(strict_types=1);
/** @var ClassLoader \$autoload */
\$autoload = include '{$cwd}/{$path}/vendor/autoload.php';
\$autoload->addClassMap([
/* @phpstan-ignore-next-line */
\ProjectServiceContainer::class => 'container.php',
]);
\$autoload->register();
\$dotenv = new \Symfony\Component\Dotenv\Dotenv();
\$dotenv->usePutenv();
if (file_exists(\$file = '{$cwd}/.env')) {
\$dotenv->loadEnv(\$file);
}
if (file_exists(\$file = '{$cwd}/{$path}/.env')) {
\$dotenv->loadEnv(\$file);
}
\$runtime = new \Kiboko\Component\Runtime\Pipeline\Console(
new \Symfony\Component\Console\Output\ConsoleOutput(),
new \Kiboko\Component\Pipeline\Pipeline(
new \Kiboko\Component\Pipeline\PipelineRunner(
new \Psr\Log\NullLogger()
),
new \Kiboko\Contract\Pipeline\NullState(),
),
);
\$satellite = include '{$cwd}/{$path}/$entrypoint';
\$satellite(\$runtime);
\$runtime->run();
\$autoload->unregister();
PHP;

$stream = fopen('php://temp', 'r+');
fwrite($stream, $source);
fseek($stream, 0, SEEK_SET);

$input = new ReadableResourceStream($stream);

chdir($cwd);

$command = ['php'];

$style->note($source);

$command = implode(' ', array_map(fn ($part) => escapeshellarg($part), $command));
$style->note($command);
$process = new Process($command, $cwd);

$process->start();

$process->stdout->on('data', function ($chunk) use ($style) {
$style->text($chunk);
});
$process->stderr->on('data', function ($chunk) use ($style) {
$style->info($chunk);
});

$input->pipe($process->stdin);

return $process;
}

private function workflowWorker(Console\Style\SymfonyStyle $style, string $cwd, string $path, string $entrypoint): Process
{
$source =<<<PHP
<?php
declare(strict_types=1);
/** @var ClassLoader \$autoload */
\$autoload = include '{$cwd}/{$path}/vendor/autoload.php';
\$autoload->addClassMap([
/* @phpstan-ignore-next-line */
\ProjectServiceContainer::class => 'container.php',
]);
\$autoload->register();
\$dotenv = new \Symfony\Component\Dotenv\Dotenv();
\$dotenv->usePutenv();
if (file_exists(\$file = '{$cwd}/.env')) {
\$dotenv->loadEnv(\$file);
}
if (file_exists(\$file = '{$cwd}/{$path}/.env')) {
\$dotenv->loadEnv(\$file);
}
\$runtime = new \Kiboko\Component\Runtime\Workflow\Console(
new \Symfony\Component\Console\Output\ConsoleOutput(),
new \Kiboko\Component\Pipeline\PipelineRunner(
new \Psr\Log\NullLogger()
),
);
\$satellite = include '{$cwd}/{$path}/$entrypoint';
\$satellite(\$runtime);
\$runtime->run();
\$autoload->unregister();
PHP;

$stream = fopen('php://temp', 'r+');
fwrite($stream, $source);
fseek($stream, 0, SEEK_SET);

$input = new ReadableResourceStream($stream);

chdir($cwd);

$command = ['php'];

$style->note($source);

$command = implode(' ', array_map(fn ($part) => escapeshellarg($part), $command));
$style->note($command);
$process = new Process($command, $cwd);

$process->start();

$process->stdout->on('data', function ($chunk) use ($style) {
$style->text($chunk);
});
$process->stderr->on('data', function ($chunk) use ($style) {
$style->info($chunk);
});

$input->pipe($process->stdin);

return $process;
}

private function httpWorker(Console\Style\SymfonyStyle $style, string $cwd, string $path, string $entrypoint): Process
{
chdir($cwd);

$command = ['php', '-S', 'localhost:8000', $entrypoint];

$process = new Process(implode (' ', array_map(fn ($part) => escapeshellarg($part), $command)), $cwd.'/'.$path);

$process->start();

return $process;
}

private function formatTime(float $time): string
{
if ($time < .00001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 2));
}
if ($time < .0001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 1));
}
if ($time < .001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000));
}
if ($time < .01) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 2));
}
if ($time < .1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 1));
}
if ($time < 1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000));
}
if ($time < 10) {
return sprintf('<fg=cyan>%ss</>', number_format($time, 2));
}
if ($time < 3600) {
$minutes = floor($time / 60);
$seconds = $time - (60 * $minutes);

return sprintf('<fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($minutes), number_format($seconds, 2));
}
$hours = floor($time / 3600);
$minutes = floor(($time - (3600 * $hours)) / 60);
$seconds = $time - (3600 * $hours) - (60 * $minutes);

return sprintf('<fg=cyan>%sh</> <fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($hours), number_format($minutes), number_format($seconds, 2));
}

private function executeWorker(
Console\Style\SymfonyStyle $style,
Process $process
): bool {
$deferred = new Deferred();

$process->on('exit', function () use ($deferred) {
$deferred->resolve();
});

$style->info(sprintf('Starting process "%s".', $process->getCommand()));

await($deferred->promise());

if (0 !== $process->getExitCode()) {
$style->error(sprintf('Process exited unexpectedly with exit code %d', $process->getExitCode()));
return false;
}

return true;
}
}
98 changes: 5 additions & 93 deletions src/Console/Command/WorkflowRunCommand.php
Original file line number Diff line number Diff line change
@@ -4,18 +4,13 @@

namespace Kiboko\Component\Satellite\Console\Command;

use Composer\Autoload\ClassLoader;
use Kiboko\Component\Runtime\Workflow\Console as WorkflowConsoleRuntime;
use Symfony\Component\Console;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Dotenv\Dotenv;

#[Console\Attribute\AsCommand('run:workflow', 'Run a data flow satellite (pipeline or workflow).', hidden: true)]
final class WorkflowRunCommand extends Console\Command\Command
{
protected static $defaultName = 'run:workflow';
protected static $defaultDescription = 'Run the workflow satellite.';

protected function configure(): void
{
$this->addArgument('path', Console\Input\InputArgument::REQUIRED);
@@ -28,94 +23,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$output,
);

$style->writeln(sprintf('<fg=cyan>Running workflow in %s</>', $input->getArgument('path')));

if (!file_exists($input->getArgument('path').'/vendor/autoload.php')) {
$style->error('There is no compiled workflow at the provided path');

return \Symfony\Component\Console\Command\Command::FAILURE;
}

$cwd = getcwd();
chdir($input->getArgument('path'));

$dotenv = new Dotenv();
$dotenv->usePutenv();
if (file_exists($file = \dirname($cwd).'/.env')) {
$dotenv->loadEnv($file);
}
if (file_exists($file = $cwd.'/'.$input->getArgument('path').'/.env')) {
$dotenv->loadEnv($file);
}

/** @var ClassLoader $autoload */
$autoload = include 'vendor/autoload.php';
$autoload->addClassMap([
/* @phpstan-ignore-next-line */
\ProjectServiceContainer::class => 'container.php',
$style->warning([
'The command "run:pipeline is deprecated and will be removed in future releases.',
'Please use the "run" command as a replacement.'
]);
$autoload->register();

$runtime = new WorkflowConsoleRuntime(
$output,
new \Kiboko\Component\Pipeline\PipelineRunner(new \Psr\Log\NullLogger()),
);

if (!file_exists('workflow.php')) {
$style->error('The provided path does not contain a workflow, did you mean to run "run:pipeline"?');

return \Symfony\Component\Console\Command\Command::FAILURE;
}
/** @var callable(runtime: WorkflowRuntimeInterface): \Runtime $workflow */
$workflow = include 'workflow.php';

$start = microtime(true);
$workflow($runtime);
$runtime->run();
$end = microtime(true);

$autoload->unregister();

$style->writeln(sprintf('time: %s', $this->formatTime($end - $start)));

chdir($cwd);

return \Symfony\Component\Console\Command\Command::SUCCESS;
}

private function formatTime(float $time): string
{
if ($time < .00001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 2));
}
if ($time < .0001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000, 1));
}
if ($time < .001) {
return sprintf('<fg=cyan>%sµs</>', number_format($time * 1_000_000));
}
if ($time < .01) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 2));
}
if ($time < .1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000, 1));
}
if ($time < 1) {
return sprintf('<fg=cyan>%sms</>', number_format($time * 1000));
}
if ($time < 10) {
return sprintf('<fg=cyan>%ss</>', number_format($time, 2));
}
if ($time < 3600) {
$minutes = floor($time / 60);
$seconds = $time - (60 * $minutes);

return sprintf('<fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($minutes), number_format($seconds, 2));
}
$hours = floor($time / 3600);
$minutes = floor(($time - (3600 * $hours)) / 60);
$seconds = $time - (3600 * $hours) - (60 * $minutes);

return sprintf('<fg=cyan>%sh</> <fg=cyan>%smin</> <fg=cyan>%ss</>', number_format($hours), number_format($minutes), number_format($seconds, 2));
return parent::execute($input, $output);
}
}
24 changes: 21 additions & 3 deletions src/Pipeline/Extractor.php
Original file line number Diff line number Diff line change
@@ -15,7 +15,12 @@

final readonly class Extractor implements StepInterface
{
public function __construct(private ?string $plugin, private ?string $key, private ExpressionLanguage $interpreter = new Satellite\ExpressionLanguage()) {}
public function __construct(
private ?string $plugin,
private ?string $key,
private ExpressionLanguage $interpreter = new Satellite\ExpressionLanguage()
) {
}

public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterface $repository): void
{
@@ -45,7 +50,7 @@ public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterf
$rejection = $compiled->getBuilder()->getNode();
} else {
$rejection = new Node\Expr\New_(
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullRejection::class),
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullStepRejection::class),
);
}

@@ -57,11 +62,24 @@ public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterf
$state = $compiled->getBuilder()->getNode();
} else {
$state = new Node\Expr\New_(
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullState::class),
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullStepState::class),
);
}

if (array_key_exists('code', $config)) {
$code = $config['code'];
} else {
$code = sprintf('%s.%s', $this->plugin, $this->key);
}

$pipeline->addExtractor(
new Node\Expr\StaticCall(
new Node\Name\FullyQualified('Kiboko\\Component\\Pipeline\\StepCode'),
new Node\Identifier('fromString'),
[
new Node\Arg(new Node\Scalar\String_($code))
]
),
$repository->getBuilder()
->withLogger($logger)
->withRejection($rejection)
24 changes: 21 additions & 3 deletions src/Pipeline/Loader.php
Original file line number Diff line number Diff line change
@@ -15,7 +15,12 @@

final readonly class Loader implements StepInterface
{
public function __construct(private ?string $plugin, private ?string $key, private ExpressionLanguage $interpreter = new Satellite\ExpressionLanguage()) {}
public function __construct(
private ?string $plugin,
private ?string $key,
private ExpressionLanguage $interpreter = new Satellite\ExpressionLanguage()
) {
}

public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterface $repository): void
{
@@ -45,7 +50,7 @@ public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterf
$rejection = $compiled->getBuilder()->getNode();
} else {
$rejection = new Node\Expr\New_(
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullRejection::class),
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullStepRejection::class),
);
}

@@ -57,11 +62,24 @@ public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterf
$state = $compiled->getBuilder()->getNode();
} else {
$state = new Node\Expr\New_(
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullState::class),
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullStepState::class),
);
}

if (array_key_exists('code', $config)) {
$code = $config['code'];
} else {
$code = sprintf('%s.%s', $this->plugin, $this->key);
}

$pipeline->addLoader(
new Node\Expr\StaticCall(
new Node\Name\FullyQualified('Kiboko\\Component\\Pipeline\\StepCode'),
new Node\Identifier('fromString'),
[
new Node\Arg(new Node\Scalar\String_($code))
]
),
$repository->getBuilder()
->withLogger($logger)
->withRejection($rejection)
24 changes: 21 additions & 3 deletions src/Pipeline/Transformer.php
Original file line number Diff line number Diff line change
@@ -15,7 +15,12 @@

final readonly class Transformer implements StepInterface
{
public function __construct(private ?string $plugin, private ?string $key, private ExpressionLanguage $interpreter = new Satellite\ExpressionLanguage()) {}
public function __construct(
private ?string $plugin,
private ?string $key,
private ExpressionLanguage $interpreter = new Satellite\ExpressionLanguage()
) {
}

public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterface $repository): void
{
@@ -45,7 +50,7 @@ public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterf
$rejection = $compiled->getBuilder()->getNode();
} else {
$rejection = new Node\Expr\New_(
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullRejection::class),
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullStepRejection::class),
);
}

@@ -57,11 +62,24 @@ public function __invoke(array $config, Pipeline $pipeline, StepRepositoryInterf
$state = $compiled->getBuilder()->getNode();
} else {
$state = new Node\Expr\New_(
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullState::class),
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullStepState::class),
);
}

if (array_key_exists('code', $config)) {
$code = $config['code'];
} else {
$code = sprintf('%s.%s', $this->plugin, $this->key);
}

$pipeline->addTransformer(
new Node\Expr\StaticCall(
new Node\Name\FullyQualified('Kiboko\\Component\\Pipeline\\StepCode'),
new Node\Identifier('fromString'),
[
new Node\Arg(new Node\Scalar\String_($code))
]
),
$repository->getBuilder()
->withLogger($logger)
->withRejection($rejection)
12 changes: 8 additions & 4 deletions src/Service.php
Original file line number Diff line number Diff line change
@@ -234,6 +234,9 @@ private function compileWorkflow(array $config): Satellite\Builder\Repository\Wo
$repository = new Satellite\Builder\Repository\Workflow($workflow);

$repository->addPackages(
'php-etl/satellite-contracts:>=0.1.1 <0.2',
'php-etl/pipeline-contracts:>=0.5.1 <0.6',
'php-etl/action-contracts:>=0.2.0 <0.3',
'php-etl/workflow:*',
);

@@ -273,7 +276,7 @@ private function compileWorkflow(array $config): Satellite\Builder\Repository\Wo
)
);

foreach ($config['workflow']['jobs'] as $job) {
foreach ($config['workflow']['jobs'] as $code => $job) {
if (\array_key_exists('pipeline', $job)) {
$pipeline = $this->compilePipelineJob($job);
$pipelineFilename = sprintf('%s.php', uniqid('pipeline'));
@@ -290,7 +293,7 @@ private function compileWorkflow(array $config): Satellite\Builder\Repository\Wo
)
);

$workflow->addPipeline($pipelineFilename);
$workflow->addPipeline($code, $pipelineFilename);
} elseif (\array_key_exists('action', $job)) {
$action = $this->compileActionJob($job);
$actionFilename = sprintf('%s.php', uniqid('action'));
@@ -307,7 +310,7 @@ private function compileWorkflow(array $config): Satellite\Builder\Repository\Wo
)
);

$workflow->addAction($actionFilename);
$workflow->addAction($code, $actionFilename);
} else {
throw new \LogicException('Not implemented');
}
@@ -325,14 +328,15 @@ private function compilePipelineJob(array $config): Satellite\Builder\Repository
$repository = new Satellite\Builder\Repository\Pipeline($pipeline);

$repository->addPackages(
'php-etl/pipeline-contracts:0.4.*',
'php-etl/pipeline-contracts:>=0.5.1 <0.6',
'php-etl/pipeline:*',
'php-etl/console-state:*',
'php-etl/pipeline-console-runtime:*',
'php-etl/workflow-console-runtime:*',
'psr/log:*',
'monolog/monolog:*',
'symfony/console:^6.0',
'symfony/dotenv:^6.0',
'symfony/dependency-injection:^6.0',
);

0 comments on commit 7f43d16

Please sign in to comment.