Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Laravel Octane with Rabbitmq - broken pipe or closed connection error and CHANNEL_ERROR - expected 'channel.open'(60, 40) #460

Closed
TahsinAbrar opened this issue Jan 12, 2022 · 15 comments · Fixed by #531
Assignees

Comments

@TahsinAbrar
Copy link

  • Laravel/Lumen version: ^8.54
  • Octane version: "^1.0"
  • RabbitMQ version: "3.8.6"
  • Package version: "^11.3"

Describe the bug

While pushing the event data to RabbitMQ via using this package, I am getting these two errors:

  • CHANNEL_ERROR - expected 'channel.open'(60, 40) -- getting this error mostly
  • broken pipe or closed connection -- sometimes this error pops up

Steps To Reproduce

This scenario is happening after a certain time of app deployment, that means - this issue is not happening all the time. After a certain period (exactly not sure how much) of time, these errors are popping up.

Current behavior

Weirdly, this error occurs every other times. And, when getting success, the queue data are getting lost even though it does not provide any error (not available in rabbitmq queue).

Additional context

I have also checked the RabbitMQ memory usage and it is 5% of total memory.

@mfiyalka
Copy link

mfiyalka commented Jul 1, 2022

@TahsinAbrar You managed to solve the problem? I have the same mistake. How did you solve the problem?

@TahsinAbrar
Copy link
Author

@TahsinAbrar You managed to solve the problem? I have the same mistake. How did you solve the problem?

The issue occurs because this package is not able to reconnect when the channel is automatically closed due to inactivity from RabbitMQ.

As with laravel octane, we are planning to do connection pooling, thus, it is not able to reconnect when the error occurred.

To do a quick fix to resolve the issue, I had to manually connect AMQP. The issue resolving code will be found here:
https://github.com/TahsinAbrar/plain-amqp-manager

Though I am planning to find a solution with this package, that's why I didn't make it in package structure yet. We will get the idea with this code if you want a work-around.

@khepin
Copy link
Collaborator

khepin commented Feb 4, 2023

Since the issue was marked fix in te previous PRs, I am closing it. Re-open if still experiencing the issue.

@khepin khepin closed this as completed Feb 4, 2023
@johnabil
Copy link

johnabil commented Feb 8, 2023

  • Laravel version 9
  • Octane version 1
  • Rabbitmq version 3.11.8
  • Package version 13.1

@khepin I had the same issue till now but I managed a solution for it and I wanted to share it here so it can be a reference for anyone will have the same issue.

Describe Bug
The bug originally is from how to handle rabbitmq connection with octane because Laravel octane is based on stateful requests not stateless like php fpm which is used by Laravel without octane when octane server starts it creates original app container after that each request sent this octane app create another clone of the original app container so what happens that while octane server is always up and when a rabbitmq connection is down what happens that the original container can't resolves the connection because it's down and can not create a new connection so it makes show this error that the server can not open channel cause the connection was close.

Solution

  • first we need to make sure that every clone that handles request from original app container in octane creates new connection with rabbitmq to make sure even if there is no connection there will be new one connected so to do this we need to reconfigure QueueSerivceProvider.php class that is originally created in Laravel and here is a copy from it
<?php

namespace App\Providers;

use Aws\DynamoDb\DynamoDbClient;
use Closure;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Queue\Connectors\BeanstalkdConnector;
use Illuminate\Queue\Connectors\DatabaseConnector;
use Illuminate\Queue\Connectors\NullConnector;
use Illuminate\Queue\Connectors\RedisConnector;
use Illuminate\Queue\Connectors\SqsConnector;
use Illuminate\Queue\Connectors\SyncConnector;
use Illuminate\Queue\Failed\DatabaseFailedJobProvider;
use Illuminate\Queue\Failed\DatabaseUuidFailedJobProvider;
use Illuminate\Queue\Failed\DynamoDbFailedJobProvider;
use Illuminate\Queue\Failed\NullFailedJobProvider;
use Illuminate\Queue\Listener;
use Illuminate\Queue\QueueManager;
use Illuminate\Queue\SerializesAndRestoresModelIdentifiers;
use Illuminate\Queue\Worker;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\Facade;
use Illuminate\Support\ServiceProvider;
use Laravel\SerializableClosure\SerializableClosure;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Connectors\RabbitMQConnector;

class QueueServiceProvider extends ServiceProvider implements DeferrableProvider
{
    use SerializesAndRestoresModelIdentifiers;

    /**
     * Register the service provider.
     *
     * @return void
     */
    public function register()
    {
        $this->configureSerializableClosureUses();

        $this->registerManager();
        $this->registerConnection();
        $this->registerWorker();
        $this->registerListener();
        $this->registerFailedJobServices();
    }

    /**
     * Configure serializable closures uses.
     *
     * @return void
     */
    protected function configureSerializableClosureUses()
    {
        SerializableClosure::transformUseVariablesUsing(function ($data) {
            foreach ($data as $key => $value) {
                $data[$key] = $this->getSerializedPropertyValue($value);
            }

            return $data;
        });

        SerializableClosure::resolveUseVariablesUsing(function ($data) {
            foreach ($data as $key => $value) {
                $data[$key] = $this->getRestoredPropertyValue($value);
            }

            return $data;
        });
    }

    /**
     * Register the queue manager.
     *
     * @return void
     */
    protected function registerManager()
    {
        $this->app->singleton('queue', function ($app) {
            // Once we have an instance of the queue manager, we will register the various
            // resolvers for the queue connectors. These connectors are responsible for
            // creating the classes that accept queue configs and instantiate queues.
            return tap(new QueueManager($app), function ($manager) {
                $this->registerConnectors($manager);
            });
        });
    }

    /**
     * Register the default queue connection binding.
     *
     * @return void
     */
    protected function registerConnection()
    {
        $this->app->singleton('queue.connection', function ($app) {
            return $app['queue']->connection();
        });
    }

    /**
     * Register the connectors on the queue manager.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    public function registerConnectors($manager)
    {
        foreach (['Null', 'Sync', 'Database', 'Redis', 'Beanstalkd', 'Sqs', 'Rabbitmq'] as $connector) {
            $this->{"register{$connector}Connector"}($manager);
        }
    }

    /**
     * Register the Sync queue connector.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    protected function registerRabbitmqConnector($manager)
    {
        $manager->addConnector('rabbitmq', function () {
            return new RabbitMQConnector($this->app['events']);
        });
    }

    /**
     * Register the Null queue connector.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    protected function registerNullConnector($manager)
    {
        $manager->addConnector('null', function () {
            return new NullConnector;
        });
    }

    /**
     * Register the Sync queue connector.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    protected function registerSyncConnector($manager)
    {
        $manager->addConnector('sync', function () {
            return new SyncConnector;
        });
    }

    /**
     * Register the database queue connector.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    protected function registerDatabaseConnector($manager)
    {
        $manager->addConnector('database', function () {
            return new DatabaseConnector($this->app['db']);
        });
    }

    /**
     * Register the Redis queue connector.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    protected function registerRedisConnector($manager)
    {
        $manager->addConnector('redis', function () {
            return new RedisConnector($this->app['redis']);
        });
    }

    /**
     * Register the Beanstalkd queue connector.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    protected function registerBeanstalkdConnector($manager)
    {
        $manager->addConnector('beanstalkd', function () {
            return new BeanstalkdConnector;
        });
    }

    /**
     * Register the Amazon SQS queue connector.
     *
     * @param \Illuminate\Queue\QueueManager $manager
     * @return void
     */
    protected function registerSqsConnector($manager)
    {
        $manager->addConnector('sqs', function () {
            return new SqsConnector;
        });
    }

    /**
     * Register the queue worker.
     *
     * @return void
     */
    protected function registerWorker()
    {
        $this->app->singleton('queue.worker', function ($app) {
            $isDownForMaintenance = function () {
                return $this->app->isDownForMaintenance();
            };

            $resetScope = function () use ($app) {
                if (method_exists($app['log']->driver(), 'withoutContext')) {
                    $app['log']->withoutContext();
                }

                if (method_exists($app['db'], 'getConnections')) {
                    foreach ($app['db']->getConnections() as $connection) {
                        $connection->resetTotalQueryDuration();
                        $connection->allowQueryDurationHandlersToRunAgain();
                    }
                }

                $app->forgetScopedInstances();

                return Facade::clearResolvedInstances();
            };

            return new Worker(
                $app['queue'],
                $app['events'],
                $app[ExceptionHandler::class],
                $isDownForMaintenance,
                $resetScope
            );
        });
    }

    /**
     * Register the queue listener.
     *
     * @return void
     */
    protected function registerListener()
    {
        $this->app->singleton('queue.listener', function ($app) {
            return new Listener($app->basePath());
        });
    }

    /**
     * Register the failed job services.
     *
     * @return void
     */
    protected function registerFailedJobServices()
    {
        $this->app->singleton('queue.failer', function ($app) {
            $config = $app['config']['queue.failed'];

            if (array_key_exists('driver', $config) &&
                (is_null($config['driver']) || $config['driver'] === 'null')) {
                return new NullFailedJobProvider;
            }

            if (isset($config['driver']) && $config['driver'] === 'dynamodb') {
                return $this->dynamoFailedJobProvider($config);
            } elseif (isset($config['driver']) && $config['driver'] === 'database-uuids') {
                return $this->databaseUuidFailedJobProvider($config);
            } elseif (isset($config['table'])) {
                return $this->databaseFailedJobProvider($config);
            } else {
                return new NullFailedJobProvider;
            }
        });
    }

    /**
     * Create a new database failed job provider.
     *
     * @param array $config
     * @return \Illuminate\Queue\Failed\DatabaseFailedJobProvider
     */
    protected function databaseFailedJobProvider($config)
    {
        return new DatabaseFailedJobProvider(
            $this->app['db'], $config['database'], $config['table']
        );
    }

    /**
     * Create a new database failed job provider that uses UUIDs as IDs.
     *
     * @param array $config
     * @return \Illuminate\Queue\Failed\DatabaseUuidFailedJobProvider
     */
    protected function databaseUuidFailedJobProvider($config)
    {
        return new DatabaseUuidFailedJobProvider(
            $this->app['db'], $config['database'], $config['table']
        );
    }

    /**
     * Create a new DynamoDb failed job provider.
     *
     * @param array $config
     * @return \Illuminate\Queue\Failed\DynamoDbFailedJobProvider
     */
    protected function dynamoFailedJobProvider($config)
    {
        $dynamoConfig = [
            'region' => $config['region'],
            'version' => 'latest',
            'endpoint' => $config['endpoint'] ?? null,
        ];

        if (!empty($config['key']) && !empty($config['secret'])) {
            $dynamoConfig['credentials'] = Arr::only(
                $config, ['key', 'secret', 'token']
            );
        }

        return new DynamoDbFailedJobProvider(
            new DynamoDbClient($dynamoConfig),
            $this->app['config']['app.name'],
            $config['table']
        );
    }

    /**
     * Get the services provided by the provider.
     *
     * @return array
     */
    public function provides()
    {
        return [
            'queue',
            'queue.connection',
            'queue.failer',
            'queue.listener',
            'queue.worker',
        ];
    }
}

here we can see that I configured rabbitmq as one of the connectors in Laravel so it can be made as a singleton to create a new connection every time a replica is created to serve a request.

  • second step we need to add QueueServiceProvider.php class in app.php under application service provider section
'providers' => [
        /*
         * Application Service Providers...
         */
        App\Providers\AppServiceProvider::class,
        App\Providers\QueueServiceProvider::class,
        App\Providers\AuthServiceProvider::class,
        // App\Providers\BroadcastServiceProvider::class,
        App\Providers\EventServiceProvider::class,
//        App\Providers\HorizonServiceProvider::class,
        App\Providers\RouteServiceProvider::class,
//        App\Providers\TelescopeServiceProvider::class,
]
  • the final step is to close the replica connection after each request is terminated and we will add a listener DisconnectedFromRabbit.php class inside octane.php and we need to add in flush array inside octane.php
<?php

namespace App\Listeners;

class DisconnectedFromRabbit
{
    /**
     * Handle the event.
     *
     * @param object $event
     * @return void
     */
    public function handle($event)
    {
        $event->app->make('queue')->connection()->close();
    }
}
'listeners' => [
    RequestTerminated::class => [
            // FlushUploadedFiles::class,
            App\Listeners\DisconnectedFromRabbit::class
        ],

        TaskReceived::class => [
            ...Octane::prepareApplicationForNextOperation(),
            //
        ],

        TaskTerminated::class => [
            App\Listeners\DisconnectedFromRabbit::class,
        ],
],

'flush' => [
        'queue',
    ],

@khepin khepin reopened this Feb 8, 2023
@khepin
Copy link
Collaborator

khepin commented Feb 8, 2023

@johnabil so the close connection is happening on the publish side right? Not on consumers?

While constantly re-creating solutions would work, I'd rather we explore a different approach. The great thing about Octane is to not have to constantly re-build expensive resources and an SSL connection is about as expensive as it gets to create.

Would prefer we try establishing a new connection only when necessary.

@johnabil
Copy link

johnabil commented Feb 9, 2023

@khepin yes it mainly happens on the publish side but I think it won't happen on the consumers.

The consumers has it's own connection when you run queue:work or rabbitmq:consume so it will never make a broken pipe error cause those connection does not timeout and does not get closed unexpectedly.

On the other Side what happens on the publish side is that when you want to publish a message it creates a connection to the rabbit server and after the message is published the connection is not closed but it can be unexpectedly closed depending on the server specs and configs so what happens with octane is that when the original container create a connection to publish a message it won't be closed but after a while the rabbitmq server shuts the connection cause of inactivity or maybe other reasons, every new clone from the original container in octane tries to publish a message it gives the broke pipe error and even if you tried to handle this exception and create another connection it will still have the same error cause you are trying to create a new connection on the cloned app not the original one and that is the core of the problem and to make the octane app re-establish a connection you need then to restart the server so the original container will be started and will have a new connection established.

Solution

From what I read from you the approach you want to try is to track if the connection is down in the original container only then you want to create a new connection on the original container.
the first part is easy to track if the connection is down on the original container but the problem here is in the second part because as far as I know it's not supported to overwrite on the connection of the original container on octane.

Suggestion

The approach that I did which is while constantly I am creating an SSL connection it has cons that it will be expensive to create SSL connection every time a cloned app makes a connection but it has a slight positive thing the you will have by max 2 connections only on the rabbitmq server. one is from the original container and the other is from the cloned app that gets created to serve each request but the cloned app connection will be closed after the request is terminated.
when I tried this on an api that have a high demand the difference between not creating a new request and creating a new request to rabbit every time was very slight

  • only one single connection the api toke at most 200ms.
  • creating a new connection the api toke at most 220ms.

@khepin
Copy link
Collaborator

khepin commented Feb 9, 2023

This is just a proof of concept right now, but I believe something like this would work: https://github.com/vyuldashev/laravel-queue-rabbitmq/compare/octane?expand=1
Tested in on Octane and it reconnects properly when needed. I think that's the rough direction we'd want to go.

@johnabil
Copy link

That's great I hope this can be merged soon this is a really way better than creating connections and I will see if I can help to improve this merge request so it can be released soon.

@adm-bome
Copy link
Collaborator

@khepin @johnabil
Is this not already possible? ( by setting your own Worker class extended from RabbitMQQueue::class

If i look at the octane branch and i only see an extended RabbitMQQueue::class with 2 extra functions so the worker won't die.

As an alternative we could (just as Horizon) support Octane by adding the extra class.
When in the config developers choose the worker 'octane' the correct class is created.

@johnabil
Copy link

@adm-bome I think theoretically this is possible but we should look into after the timeout cause when it times out or the connection was closed by any cause it can not reestablish a new connection.

@adm-bome
Copy link
Collaborator

<?php

namespace App\Queue\Octane;

use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{
    private function reconnect()
    {
        $this->connection->reconnect();
        $this->channel = $this->connection->channel();
    }

    private function withReconnect(callable $callback, callable $exceptionCallback = null)
    {
        try {
            return $callback();
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException|AMQPProtocolChannelException $exception) {
            $this->reconnect();

            return $exceptionCallback ? $exceptionCallback() : $callback();
        }
    }

    /**
     * {@inheritdoc}
     */
    public function pushRaw($payload, $queue = null, array $options = []): int|string|null
    {
        return $this->withReconnect(fn () => parent::pushRaw($payload, $queue, $options));
    }

    /**
     * {@inheritdoc}
     */
    public function laterRaw($delay, string $payload, $queue = null, int $attempts = 0): int|string|null
    {
        return $this->withReconnect(fn () => parent::laterRaw($delay, $payload, $queue, $attempts));
    }

    /**
     * {@inheritdoc}
     */
    public function bulk($jobs, $data = '', $queue = null): void
    {
        $this->withReconnect(fn () => parent::bulk($jobs, $data, $queue), fn () => $this->getChannel()->publish_batch());
    }
}

with config

'connections' => [
    // ...

    'rabbitmq-octane' => [
        // ...

       'worker' => \App\Queue\Octane\RabbitMQQueue::class
    ],

    // ...    
],

@johnabil
Copy link

johnabil commented Mar 15, 2023

@adm-bome we should add rabbitmq connection in octane.php config file to the warm array to resolve it only one time when the app starts for the first time.

'warm' => [
        ...Octane::defaultServicesToWarm(),
        'rabbitmq',
],

IMPORTANT NOTE

As the code mentioned in the documentation.
it's mandatory making your own rabbitmq class worker to use reconnect method in all needed functions so that if at anytime rabbit connection was closed it can reconnect without causing any errors.

Example Class

<?php

namespace App\Queue;

use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{

    protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
    {
        try {
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        }
    }

    protected function publishBatch($jobs, $data = '', $queue = null): void
    {
        try {
            parent::publishBatch($jobs, $data, $queue);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBatch($jobs, $data, $queue);
        }
    }

    protected function createChannel(): AMQPChannel
    {
        try {
            return parent::createChannel();
        } catch (AMQPConnectionClosedException) {
            $this->reconnect();
            return parent::createChannel();
        }
    }
}

After this update worker with your own class

'connections' => [
    // ...

    'rabbitmq' => [
        // ...

        /* Set to a class if you wish to use your own. */
       'worker' => \App\Queue\RabbitMQQueue::class,
    ],

    // ...    
],

@adm-bome adm-bome linked a pull request Mar 16, 2023 that will close this issue
adm-bome added a commit that referenced this issue Mar 28, 2023
 - issue #460
 - rework based on [comment](#531 (comment))
@adm-bome
Copy link
Collaborator

adm-bome commented Apr 11, 2023

@johnabil the master was updated. (not yet a release) #528

Could you test a bit. see what it brings for your use-cases.

I'm not really familiar with Octane. and when the class is cached and cloned.

But in the new version the connection class is created without opening a real connection and channel, until an action (like: push, pop , size) is executed. so the cached class should not have an open connection, which is already dead. it should connect and open a channel normally each time.

@adm-bome
Copy link
Collaborator

#531 (comment)

@adm-bome
Copy link
Collaborator

!! UPDATE !!
As of version 13.3.0 above #460 (comment) wont work because of the refactor #528

See the readme for the correct example.

<?php

namespace App\Queue;

use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use VladimirYuldashev\LaravelQueueRabbitMQ\Queue\RabbitMQQueue as BaseRabbitMQQueue;

class RabbitMQQueue extends BaseRabbitMQQueue
{
    protected function publishBasic($msg, $exchange = '', $destination = '', $mandatory = false, $immediate = false, $ticket = null): void
    {
        try {
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBasic($msg, $exchange, $destination, $mandatory, $immediate, $ticket);
        }
    }

    protected function publishBatch($jobs, $data = '', $queue = null): void
    {
        try {
            parent::publishBatch($jobs, $data, $queue);
        } catch (AMQPConnectionClosedException|AMQPChannelClosedException) {
            $this->reconnect();
            parent::publishBatch($jobs, $data, $queue);
        }
    }

    protected function createChannel(): AMQPChannel
    {
        try {
            return parent::createChannel();
        } catch (AMQPConnectionClosedException) {
            $this->reconnect();
            return parent::createChannel();
        }
    }
}

with config

'connections' => [
    // ...

    'rabbitmq-with-reconnect' => [
        // ...

       'worker' => \App\Queue\RabbitMQQueue::class
    ],

    // ...    
],

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants