Skip to content

Commit

Permalink
Merge pull request #3 from Innmind/next
Browse files Browse the repository at this point in the history
Next major update
  • Loading branch information
Baptouuuu authored Mar 10, 2024
2 parents 16e1726 + c19bb08 commit a6e3865
Show file tree
Hide file tree
Showing 100 changed files with 2,043 additions and 2,182 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
### Changed

- Requires `innmind/immutable:~5.2`
- Requires `innmind/operating-system:~4.1`
- Requires `innmind/operating-system:~5.0`
- Requires `innmind/filesystem:~7.0`
- Requires `innmind/io:~2.3`
- Requires `innmind/io:~2.6`
- Carried state inside a `Innmind\AMQP\Command` is now wrapped inside a `Innmind\AMQP\Client\State`
- `Innmind\AMQP\Client::of()` now requires an instance of `Innmind\OperatingSystem\Filesystem` as a second argument

## 4.3.0 - 2023-09-23

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ Feel free to look at the `Command` namespace to explore all capabilities.
make benchmark
Publishing 4000 msgs with 1KB of content:
php benchmark/producer.php 4000
0.39038109779358
0.48978996276855
Consuming 4000:
php benchmark/consumer.php
Pid: 701, Count: 4000, Time: 1.6017
Pid: 701, Count: 4000, Time: 2.3580
```

By comparison, the `php-amqplib` produces this result:
Expand Down
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
ignore:
- benchmark
- fixtures
- .php-cs-fixer.dist.php
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
"innmind/math": "~6.0",
"innmind/url": "~4.1",
"ramsey/uuid": "~4.0",
"innmind/operating-system": "~4.1",
"innmind/operating-system": "~5.0",
"innmind/media-type": "~2.0",
"innmind/filesystem": "~7.0",
"innmind/stream": "~4.0",
"innmind/io": "~2.3"
"innmind/io": "~2.6"
},
"autoload": {
"psr-4": {
Expand Down
2 changes: 1 addition & 1 deletion docs/Publish many messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ $client = Factory::of($os)->make(/* details */);
$os
->filesystem()
->mount(Path::of('/path/to/some/directory/'))
->get(new Name('leads.csv'))
->get(Name::of('leads.csv'))
->map(
static fn($file) => $file
->content()
Expand Down
68 changes: 31 additions & 37 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
Transport\Frame\Method,
Model\Channel\Close as CloseChannel,
};
use Innmind\OperatingSystem\CurrentProcess;
use Innmind\Stream\{
Capabilities,
Streams,
use Innmind\OperatingSystem\{
CurrentProcess,
Filesystem,
};
use Innmind\Immutable\{
Either,
Expand All @@ -27,7 +26,7 @@ final class Client
private Maybe $command;
/** @var callable(): Maybe<Connection> */
private $load;
private Capabilities $streams;
private Filesystem $filesystem;
/** @var Maybe<CurrentProcess> */
private Maybe $signals;

Expand All @@ -39,19 +38,19 @@ final class Client
private function __construct(
Maybe $command,
callable $load,
Capabilities $streams,
Filesystem $filesystem,
Maybe $signals,
) {
$this->command = $command;
$this->load = $load;
$this->streams = $streams;
$this->filesystem = $filesystem;
$this->signals = $signals;
}

/**
* @param callable(): Maybe<Connection> $load
*/
public static function of(callable $load, Capabilities $streams = null): self
public static function of(callable $load, Filesystem $filesystem): self
{
/** @var Maybe<Command> */
$command = Maybe::nothing();
Expand All @@ -61,7 +60,7 @@ public static function of(callable $load, Capabilities $streams = null): self
return new self(
$command,
$load,
$streams ?? Streams::fromAmbientAuthority(),
$filesystem,
$signals,
);
}
Expand All @@ -74,22 +73,17 @@ public function with(Command $command): self
->map(static fn($previous) => new Command\Pipe($previous, $command))
->otherwise(static fn() => Maybe::just($command)),
$this->load,
$this->streams,
$this->filesystem,
$this->signals,
);
}

public function listenSignals(CurrentProcess $currentProcess): self
{
// We ask for the current process instead of the signals wrapper directly
// because the user may fork the process between the time this method is
// called and the time the listeners are installed (when run is called).
// This would result on the listeners being installed for the parent
// process instead of the child.
return new self(
$this->command,
$this->load,
$this->streams,
$this->filesystem,
Maybe::just($currentProcess),
);
}
Expand All @@ -108,12 +102,12 @@ public function run(mixed $state): Either
->openChannel()
->flatMap(function($in) use ($command, $state) {
[$connection, $channel] = $in;
$read = MessageReader::of($this->streams);
$read = MessageReader::of($this->filesystem);

return $command($connection, $channel, $read, $state)->flatMap(
fn($clientState) => $this
->close($clientState->connection(), $channel)
->map(static fn(): mixed => $clientState->userState()),
return $command($connection, $channel, $read, Client\State::of($state))->flatMap(
fn($state) => $this
->close($connection, $channel)
->map(static fn(): mixed => $state->unwrap()),
);
}),
static fn() => Either::right($state),
Expand All @@ -134,21 +128,20 @@ private function openChannel(): Either
return ($this->load)()
->either()
->leftMap(static fn() => Failure::toOpenConnection())
->map(static fn($connection) => $connection->send(
static fn($protocol) => $protocol->channel()->open($channel),
))
->map(static fn($continuation) => $continuation->wait(Method::channelOpenOk))
->flatMap(
fn($continuation) => $continuation
->connection()
->map(fn($connection) => $this->signals->match(
fn($connection) => $connection
->request(
static fn($protocol) => $protocol->channel()->open($channel),
Method::channelOpenOk,
)
->map(fn() => $this->signals->match(
static fn($process) => $connection->listenSignals(
$process->signals(),
$channel,
),
static fn() => $connection,
static fn() => null,
))
->map(static fn($connection) => [$connection, $channel])
->map(static fn() => [$connection, $channel])
->leftMap(static fn() => Failure::toOpenChannel()),
);
}
Expand All @@ -160,15 +153,16 @@ private function close(Connection $connection, Channel $channel): Either
{
/** @var Either<Failure, SideEffect> */
return $connection
->send(static fn($protocol) => $protocol->channel()->close(
$channel,
CloseChannel::demand(),
))
->wait(Method::channelCloseOk)
->connection()
->request(
static fn($protocol) => $protocol->channel()->close(
$channel,
CloseChannel::demand(),
),
Method::channelCloseOk,
)
->leftMap(static fn() => Failure::toCloseChannel())
->flatMap(
static fn($connection) => $connection
static fn() => $connection
->close()
->either()
->leftMap(static fn() => Failure::toCloseConnection()),
Expand Down
26 changes: 7 additions & 19 deletions src/Client/State.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,22 @@

namespace Innmind\AMQP\Client;

use Innmind\AMQP\Transport\Connection;

/**
* @internal
*/
final class State
{
private Connection $connection;
private mixed $userState;

private function __construct(Connection $connection, mixed $userState)
{
$this->connection = $connection;
$this->userState = $userState;
}
private mixed $value;

public static function of(Connection $connection, mixed $userState): self
private function __construct(mixed $value)
{
return new self($connection, $userState);
$this->value = $value;
}

public function connection(): Connection
public static function of(mixed $value): self
{
return $this->connection;
return new self($value);
}

public function userState(): mixed
public function unwrap(): mixed
{
return $this->userState;
return $this->value;
}
}
2 changes: 1 addition & 1 deletion src/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ public function __invoke(
Connection $connection,
Channel $channel,
MessageReader $read,
mixed $state,
Client\State $state,
): Either;
}
29 changes: 18 additions & 11 deletions src/Command/Bind.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
Failure,
Transport\Connection,
Transport\Connection\MessageReader,
Transport\Protocol,
Transport\Frame\Channel,
Transport\Frame\Method,
Model\Queue\Binding,
};
use Innmind\Immutable\Either;
use Innmind\Immutable\{
Either,
Sequence,
};

final class Bind implements Command
{
Expand All @@ -28,17 +32,20 @@ public function __invoke(
Connection $connection,
Channel $channel,
MessageReader $read,
mixed $state,
State $state,
): Either {
/** @var Either<Failure, State> */
return $connection
->send(fn($protocol) => $protocol->queue()->bind(
$channel,
$this->command,
))
->maybeWait($this->command->shouldWait(), Method::queueBindOk)
->connection()
->map(static fn($connection) => State::of($connection, $state))
$frames = fn(Protocol $protocol): Sequence => $protocol->queue()->bind(
$channel,
$this->command,
);

$sideEffect = match ($this->command->shouldWait()) {
true => $connection->request($frames, Method::queueBindOk),
false => $connection->send($frames),
};

return $sideEffect
->map(static fn() => $state)
->leftMap(fn() => Failure::toBind($this->command));
}

Expand Down
Loading

0 comments on commit a6e3865

Please sign in to comment.