diff --git a/src/Core/AbstractResult.php b/src/Core/AbstractResult.php new file mode 100644 index 00000000..edfd61f5 --- /dev/null +++ b/src/Core/AbstractResult.php @@ -0,0 +1,75 @@ +result = $result; + $this->connection = $connection; + $this->profileKey = $profileKey; + } + + /** + * Receive by defer + * + * @param bool $defer + * @param bool $release + * + * @return mixed + */ + protected function recv(bool $defer = false, bool $release = true) + { + if ($this->connection instanceof ConnectionInterface) { + $result = $this->connection->receive(); + $this->release($release); + + return $result; + } + + $result = $this->connection->recv(); + if ($defer) { + $this->connection->setDefer(false); + } + + return $result; + } + + /** + * @param bool $release + */ + protected function release(bool $release = true) + { + if ($this->connection instanceof ConnectionInterface && $release) { + $this->connection->release(); + } + } +} \ No newline at end of file diff --git a/src/Core/Coroutine.php b/src/Core/Coroutine.php index 6bef82f3..0c0e6bb7 100644 --- a/src/Core/Coroutine.php +++ b/src/Core/Coroutine.php @@ -14,7 +14,7 @@ class Coroutine /** * @var int */ - private static $tid; + private static $tid = -1; /** * Coroutine id mapping @@ -53,7 +53,6 @@ public static function id() public static function tid() { $id = self::id(); - return self::$idMap[$id] ?? $id; } @@ -122,14 +121,4 @@ public static function shouldWrapCoroutine() { return App::isWorkerStatus() && swoole_version() >= '2.0.11'; } - - /** - * Init tid - */ - public static function initTid() - { - $time = time(); - $rand = mt_rand(1, 100); - self::$tid = (int)($time . $rand); - } } diff --git a/src/Event/AppEvent.php b/src/Event/AppEvent.php index 28d9cc66..da10f2a9 100644 --- a/src/Event/AppEvent.php +++ b/src/Event/AppEvent.php @@ -3,12 +3,12 @@ namespace Swoft\Event; /** - * 所有事件名称 + * Application event */ class AppEvent { /** - * 应用初始化加载监听器 + * Application loader event */ const APPLICATION_LOADER = "applicationLoader"; @@ -23,9 +23,9 @@ class AppEvent const RESOURCE_RELEASE = 'resourceRelease'; /** - * Resource transaction event behind application + * Before resource release */ - const TRANSACTION_RELEASE = 'transactionRelease'; + const RESOURCE_RELEASE_BEFORE = 'resourceReleaseBefore'; /** * Worker start event diff --git a/src/Event/Events/TransactionReleaseEvent.php b/src/Event/Events/TransactionReleaseEvent.php deleted file mode 100644 index 71b28544..00000000 --- a/src/Event/Events/TransactionReleaseEvent.php +++ /dev/null @@ -1,44 +0,0 @@ -tsStacks = $tsStacks; - $this->connections = $connections; - } - - /** - * @return \SplStack[] - */ - public function getTsStacks(): array - { - return $this->tsStacks; - } - - /** - * @return mixed - */ - public function getConnections() - { - return $this->connections; - } -} \ No newline at end of file diff --git a/src/Event/Listeners/ResourceReleaseListener.php b/src/Event/Listeners/ResourceReleaseListener.php index b27369c4..bf5fb289 100644 --- a/src/Event/Listeners/ResourceReleaseListener.php +++ b/src/Event/Listeners/ResourceReleaseListener.php @@ -4,11 +4,11 @@ use Swoft\App; use Swoft\Bean\Annotation\Listener; +use Swoft\Core\Coroutine; use Swoft\Core\RequestContext; use Swoft\Event\AppEvent; use Swoft\Event\EventHandlerInterface; use Swoft\Event\EventInterface; -use Swoft\Event\Events\TransactionReleaseEvent; use Swoft\Helper\PoolHelper; use Swoft\Log\Log; @@ -25,30 +25,23 @@ class ResourceReleaseListener implements EventHandlerInterface */ public function handle(EventInterface $event) { - $contextTsKey = PoolHelper::getContextTsKey(); + // Release system resources + App::trigger(AppEvent::RESOURCE_RELEASE_BEFORE); + $connectionKey = PoolHelper::getContextCntKey(); - $tsStacks = RequestContext::getContextDataByKey($contextTsKey, []); $connections = RequestContext::getContextDataByKey($connectionKey, []); - if (empty($connections)) { return; } /* @var \Swoft\Pool\ConnectionInterface $connection */ - foreach ($connections as $connection){ - if (App::isCoContext() && !$connection->isRecv()) { + foreach ($connections as $connectionId => $connection) { + if (!$connection->isRecv()) { + Log::error(sprintf('%s connection is not received ,forget to getResult()', get_class($connection))); $connection->receive(); } - } - if (!empty($tsStacks)) { - $event = new TransactionReleaseEvent(AppEvent::TRANSACTION_RELEASE, $tsStacks, $connections); - App::trigger($event); - } - - /* @var \Swoft\Pool\ConnectionInterface $connection */ - foreach ($connections as $connectionId => $connection) { - Log::error(sprintf('%s connection is not released ,forget to getResult() or em->close', get_class($connection))); + Log::error(sprintf('%s connection is not released ,forget to getResult()', get_class($connection))); $connection->release(true); } } diff --git a/src/Helper/PoolHelper.php b/src/Helper/PoolHelper.php index 472c35a3..804d883b 100644 --- a/src/Helper/PoolHelper.php +++ b/src/Helper/PoolHelper.php @@ -2,8 +2,6 @@ namespace Swoft\Helper; -use Swoft\Core\Coroutine; - /** * PoolHelper */ @@ -16,24 +14,4 @@ public static function getContextCntKey(): string { return sprintf('connectioins'); } - - /** - * @return string - */ - public static function getContextTsKey(): string - { - return sprintf('transactions'); - } - - /** - * @param string $group - * - * @return string - */ - public static function getGroupKey(string $group):string - { - $cid = Coroutine::id(); - - return sprintf('%d-%s', $cid, $group); - } } \ No newline at end of file diff --git a/src/Pool/AbstractConnection.php b/src/Pool/AbstractConnection.php index 728f87d3..f72e0b52 100644 --- a/src/Pool/AbstractConnection.php +++ b/src/Pool/AbstractConnection.php @@ -28,9 +28,11 @@ abstract class AbstractConnection implements ConnectionInterface protected $autoRelease = true; /** + * Whether or not the package has been recv,default true + * * @var bool */ - protected $recv = false; + protected $recv = true; /** * AbstractConnection constructor. diff --git a/src/Pool/ConnectionPool.php b/src/Pool/ConnectionPool.php index 169166cf..b0bd7903 100644 --- a/src/Pool/ConnectionPool.php +++ b/src/Pool/ConnectionPool.php @@ -85,7 +85,7 @@ public function release(ConnectionInterface $connection) { $connectionId = $connection->getConnectionId(); $connection->updateLastTime(); - $connection->setRecv(false); + $connection->setRecv(true); $connection->setAutoRelease(true); if (App::isCoContext()) { @@ -234,14 +234,17 @@ private function getConnectionByChannel(): ConnectionInterface } $maxWait = $this->poolConfig->getMaxWait(); - if ($maxWait != 0 && $stats['consumer_num'] >= $maxWait) { - throw new ConnectionException('Connection pool waiting queue is full'); + throw new ConnectionException(sprintf('Connection pool waiting queue is full, maxActive=%d,maxWait=%d,currentCount=%d', $maxActive, $maxWait, $this->currentCount)); + } + + $maxWaitTime = $this->poolConfig->getMaxWaitTime(); + if ($maxWaitTime == 0) { + return $this->channel->pop(); } $writes = []; $reads = [$this->channel]; - $maxWaitTime = $this->poolConfig->getMaxWaitTime(); $result = $this->channel->select($reads, $writes, $maxWaitTime); if ($result === false || empty($reads)) {