Skip to content

Commit

Permalink
Pre-create Celery db result tables before running Celery worker (#9719)
Browse files Browse the repository at this point in the history
Otherwise at large scale this can end up with some tasks failing as they
try to create the result table at the same time.

This was always possible before, just exceedingly rare, but in large
scale performance testing where I create a lot of tasks quickly
(especially in my HA testing) I hit this a few times.

This is also only a problem for fresh installs/clean DBs, as once these
tables exist the possible race goes away.

This is the same fix from #8909, just for runtime, not test time.

GitOrigin-RevId: dcdc7c1fa92d2bddfe388eae5b411065c9f73b61
  • Loading branch information
ashb authored and Cloud Composer Team committed Sep 12, 2024
1 parent 2da5e02 commit ec98acb
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
17 changes: 17 additions & 0 deletions airflow/cli/commands/celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import daemon
import psutil
import sqlalchemy.exc
from celery import maybe_patch_concurrency
from celery.bin import worker as worker_bin
from daemon.pidfile import TimeoutPIDLockFile
Expand Down Expand Up @@ -112,6 +113,22 @@ def worker(args):
log=args.log_file,
)

if hasattr(celery_app.backend, 'ResultSession'):
# Pre-create the database tables now, otherwise SQLA via Celery has a
# race condition where one of the subprocesses can die with "Table
# already exists" error, because SQLA checks for which tables exist,
# then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
# EXISTS
try:
session = celery_app.backend.ResultSession()
session.close()
except sqlalchemy.exc.IntegrityError:
# At least on postgres, trying to create a table that already exist
# gives a unique constraint violation or the
# "pg_type_typname_nsp_index" table. If this happens we can ignore
# it, we raced to create the tables and lost.
pass

# Setup Celery worker
worker_instance = worker_bin.worker(app=celery_app)
options = {
Expand Down
3 changes: 3 additions & 0 deletions tests/cli/commands/test_celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_validate_session_dbapi_exception(self, mock_session):

@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
@pytest.mark.backend("mysql", "postgres")
class TestWorkerServeLogs(unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -91,6 +92,7 @@ def test_skip_serve_logs_on_worker_start(self, mock_worker):
mock_popen.assert_not_called()


@pytest.mark.backend("mysql", "postgres")
class TestCeleryStopCommand(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -144,6 +146,7 @@ def test_same_pid_file_is_used_in_start_and_stop(
mock_read_pid_from_pidfile.assert_called_once_with(pid_file)


@pytest.mark.backend("mysql", "postgres")
class TestWorkerStart(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down

0 comments on commit ec98acb

Please sign in to comment.