Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

celery executer with SQS and predefined_queues #11225

Closed
sudarshan2906 opened this issue Oct 1, 2020 · 13 comments
Closed

celery executer with SQS and predefined_queues #11225

sudarshan2906 opened this issue Oct 1, 2020 · 13 comments
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug Stale Bug Report

Comments

@sudarshan2906
Copy link

Hi I am using airflow 1.10.12 with celery executor with SQS. When configuring predefined_queues in celery as mentioned here I am getting the bellow error:

[2020-10-01 17:37:54,498: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'str' object has no attribute 'items'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 921, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/celery/worker/worker.py", line 208, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/connection.py", line 23, in start
    c.connection = c.connect()
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 405, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 412, in connection_for_read
    self.app.connection_for_read(heartbeat=heartbeat))
  File "/usr/local/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 439, in ensure_connected
    callback=maybe_shutdown,
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 389, in ensure_connection
    self._ensure_connection(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 445, in _ensure_connection
    callback, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/kombu/utils/functional.py", line 344, in retry_over_time
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 874, in _connection_factory
    self._connection = self._establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/connection.py", line 809, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 941, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 923, in create_channel
    channel = self.Channel(connection)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 134, in __init__
    self._update_queue_cache(self.queue_name_prefix)
  File "/usr/local/lib/python3.7/site-packages/kombu/transport/SQS.py", line 140, in _update_queue_cache
    for queue_name, q in self.predefined_queues.items():
AttributeError: 'str' object has no attribute 'items'

airflow.cfg:

[celery_broker_transport_options]
'predefined_queues': = { 'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
        }
    }
@sudarshan2906 sudarshan2906 added the kind:bug This is a clearly a bug label Oct 1, 2020
@mik-laj
Copy link
Member

mik-laj commented Oct 1, 2020

This section only accepts strings as values. You need to use "celery_config_options" to set-up other types. See:

if conf.has_option('celery', 'celery_config_options'):
celery_configuration = conf.getimport('celery', 'celery_config_options')
else:
celery_configuration = DEFAULT_CELERY_CONFIG

@mik-laj mik-laj changed the title celery executer with SQS celery executer with SQS and predefined_queues Oct 1, 2020
@mik-laj mik-laj added the area:Scheduler including HA (high availability) scheduler label Oct 1, 2020
@sudarshan2906
Copy link
Author

Thanks. I am using my celery configuration file for now.

But do you think we can add predefined_queues in DEFAULT_CELERY_CONFIG . That when predefined_queues is passed in celery config it will convert it to json. Its quite a important config for celery when using sqs as queue.

@mik-laj
Copy link
Member

mik-laj commented Oct 5, 2020

I have no experience with SQS, but it sounds like a good solution.

@sudarshan2906
Copy link
Author

well predefined_queues is supported in celery version 5.0.0. But airflow doesn't supports that version. I can't find any ticket created for that. Will this be upgraded in airflow 2.0.0?

@mik-laj
Copy link
Member

mik-laj commented Oct 5, 2020

@auvipy can you share some details about Celery 5? What is the status of? Should we think about migration? why is it worth migrating?

@chris-french
Copy link

@sudarshan2906 @mik-laj -- did you find a solution for this?

It seems like you would need to make a small change to setup.py to install sqs support for celery in Airflow:

    'celery[sqs]~=4.4.2',

source: https://docs.celeryproject.org/en/4.4.2/getting-started/brokers/sqs.html?highlight=sqs

@sudarshan2906
Copy link
Author

sudarshan2906 commented Oct 6, 2020

@chris-french Yup, I got it working using celery version to be 4.4.2. But the predefined_queues property was not supported by airflow as its a dict format. I used a celery_config file and changed the celery_config location in airflow.cfg

celery_config.py

from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
broker_transport_options = {'visibility_timeout': 21600,
                            'predefined_queues': {
                                'sqs_queue_name': {
                                    'url': 'sqs_queue_url'}}}

DEFAULT_CELERY_CONFIG['broker_transport_options'] = broker_transport_options
CELERY_CONFIG = DEFAULT_CELERY_CONFIG

@charliegriefer
Copy link

charliegriefer commented Aug 19, 2021

@sudarshan2906 - can you show the celery_config_options line in your airflow.cfg? I'm doing something similar, but getting an error on the scheduler saying:

airflow.exceptions.AirflowConfigException: The object could not be loaded. Please check "celery_config_options" key in "celery" section. Current value: "celery_config.MY_CELERY_CONFIG".

My celery_config is in my AIRFLOW_HOME, and currently it's pretty basic:

from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG

MY_CELERY_CONFIG = {
    **DEFAULT_CELERY_CONFIG
}

EDIT to add, an exception higher up in the stack trace: [2021-08-19 09:19:06,303] {configuration.py:468} ERROR - No module named 'celery_config', so it's just not finding the file.

The specific line in my airflow.cfg: celery_config_options = celery_config.MY_CELERY_CONFIG

Thanks!

@nicnguyen3103
Copy link

For anyone who is still struggling with the import path of celery config here is how to import it:

  1. Create a celery_config_extend.py and put it in AIRFLOW_HOME folder. By default, airflow will have this path added in the sys.path https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html.
  2. Setup the script by like @charliegriefer or other comments above. If you want to keep the original config and extends some extra config you can write the config like this, it will keep the original DEFAULT_CELERY_CONFIG and add/replace the key:value when applicable
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG

CELERY_CONFIG = dict(
    DEFAULT_CELERY_CONFIG,
    **{
        'key1': value1, # add new key value pair or replace the existing one if key is in DEFAULT_CELERY_CONFIG
        ....
    }
)
  1. Set the environment variable AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS=airflow.celery_default_config.CELERY_CONFIG to import your new config

Airflow now should pick up your new celery config and apply it to the worker

@auvipy
Copy link
Contributor

auvipy commented Dec 20, 2021

can anyone check this if it is helpful celery/kombu#1450 ?

@github-actions
Copy link

github-actions bot commented Jul 9, 2023

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

@github-actions
Copy link

github-actions bot commented Aug 8, 2023

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 8, 2023
@nagaran1
Copy link

nagaran1 commented Nov 30, 2023

@sudarshan2906 - can you show the celery_config_options line in your airflow.cfg? I'm doing something similar, but getting an error on the scheduler saying:

airflow.exceptions.AirflowConfigException: The object could not be loaded. Please check "celery_config_options" key in "celery" section. Current value: "celery_config.MY_CELERY_CONFIG".

My celery_config is in my AIRFLOW_HOME, and currently it's pretty basic:

from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG

MY_CELERY_CONFIG = {
    **DEFAULT_CELERY_CONFIG
}

EDIT to add, an exception higher up in the stack trace: [2021-08-19 09:19:06,303] {configuration.py:468} ERROR - No module named 'celery_config', so it's just not finding the file.

The specific line in my airflow.cfg: celery_config_options = celery_config.MY_CELERY_CONFIG

Thanks!

The custom celery config file should be kept under PYTHONPATH. Here are the built-in PYTHONPATH.

  1. The dags folder: It is configured with option dags_folder in section [core].
  2. The config folder: It is configured by setting AIRFLOW_HOME variable ({AIRFLOW_HOME}/config) by default.
  3. The plugins Folder: It is configured with option plugins_folder in section [core].

I have faced similar issue, but after placing the custom config file in ({AIRFLOW_HOME}/config) it worked fine. More details here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug Stale Bug Report
Projects
None yet
Development

No branches or pull requests

7 participants