-
Notifications
You must be signed in to change notification settings - Fork 14.4k
/
webserver_command.py
490 lines (432 loc) · 20.8 KB
/
webserver_command.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Webserver command."""
from __future__ import annotations
import logging
import os
import signal
import subprocess
import sys
import textwrap
import time
from contextlib import suppress
from pathlib import Path
from time import sleep
from typing import NoReturn
import psutil
from lockfile.pidlockfile import read_pid_from_pidfile
from airflow import settings
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowWebServerTimeout
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
log = logging.getLogger(__name__)
class GunicornMonitor(LoggingMixin):
"""
Runs forever.
Monitoring the child processes of @gunicorn_master_proc and restarting
workers occasionally or when files in the plug-in directory has been modified.
Each iteration of the loop traverses one edge of this state transition
diagram, where each state (node) represents
[ num_ready_workers_running / num_workers_running ]. We expect most time to
be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size.
The horizontal transition at ? happens after the new worker parses all the
dags (so it could take a while!)
V ────────────────────────────────────────────────────────────────────────┐
[n / n] ──TTIN──> [ [n, n+bs) / n + bs ] ────?───> [n + bs / n + bs] ──TTOU─┘
^ ^───────────────┘
│
│ ┌────────────────v
└──────┴────── [ [0, n) / n ] <─── start
We change the number of workers by sending TTIN and TTOU to the gunicorn
master process, which increases and decreases the number of child workers
respectively. Gunicorn guarantees that on TTOU workers are terminated
gracefully and that the oldest worker is terminated.
:param gunicorn_master_pid: PID for the main Gunicorn process
:param num_workers_expected: Number of workers to run the Gunicorn web server
:param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
doesn't respond
:param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
:param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
bringing up new ones and killing old ones.
:param reload_on_plugin_change: If set to True, Airflow will track files in plugins_folder directory.
When it detects changes, then reload the gunicorn.
"""
def __init__(
self,
gunicorn_master_pid: int,
num_workers_expected: int,
master_timeout: int,
worker_refresh_interval: int,
worker_refresh_batch_size: int,
reload_on_plugin_change: bool,
):
super().__init__()
self.gunicorn_master_proc = psutil.Process(gunicorn_master_pid)
self.num_workers_expected = num_workers_expected
self.master_timeout = master_timeout
self.worker_refresh_interval = worker_refresh_interval
self.worker_refresh_batch_size = worker_refresh_batch_size
self.reload_on_plugin_change = reload_on_plugin_change
self._num_workers_running = 0
self._num_ready_workers_running = 0
self._last_refresh_time = time.monotonic() if worker_refresh_interval > 0 else None
self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
self._restart_on_next_plugin_check = False
def _generate_plugin_state(self) -> dict[str, float]:
"""
Get plugin states.
Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
directory.
"""
if not settings.PLUGINS_FOLDER:
return {}
all_filenames: list[str] = []
for root, _, filenames in os.walk(settings.PLUGINS_FOLDER):
all_filenames.extend(os.path.join(root, f) for f in filenames)
plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
return plugin_state
@staticmethod
def _get_file_hash(fname: str):
"""Calculate MD5 hash for file."""
hash_md5 = md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _get_num_ready_workers_running(self) -> int:
"""Return number of ready Gunicorn workers by looking for READY_PREFIX in process name."""
workers = psutil.Process(self.gunicorn_master_proc.pid).children()
def ready_prefix_on_cmdline(proc):
try:
cmdline = proc.cmdline()
if cmdline:
return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
except psutil.NoSuchProcess:
pass
return False
nb_ready_workers = sum(1 for proc in workers if ready_prefix_on_cmdline(proc))
return nb_ready_workers
def _get_num_workers_running(self) -> int:
"""Return number of running Gunicorn workers processes."""
workers = psutil.Process(self.gunicorn_master_proc.pid).children()
return len(workers)
def _wait_until_true(self, fn, timeout: int = 0) -> None:
"""Sleep until fn is true."""
start_time = time.monotonic()
while not fn():
if 0 < timeout <= time.monotonic() - start_time:
raise AirflowWebServerTimeout(f"No response from gunicorn master within {timeout} seconds")
sleep(0.1)
def _spawn_new_workers(self, count: int) -> None:
"""
Send signal to kill the worker.
:param count: The number of workers to spawn
"""
excess = 0
for _ in range(count):
# TTIN: Increment the number of processes by one
self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
excess += 1
self._wait_until_true(
lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
timeout=self.master_timeout,
)
def _kill_old_workers(self, count: int) -> None:
"""
Send signal to kill the worker.
:param count: The number of workers to kill
"""
for _ in range(count):
count -= 1
# TTOU: Decrement the number of processes by one
self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
self._wait_until_true(
lambda: self.num_workers_expected + count == self._get_num_workers_running(),
timeout=self.master_timeout,
)
def _reload_gunicorn(self) -> None:
"""
Send signal to reload the gunicorn configuration.
When gunicorn receive signals, it reloads the configuration,
start the new worker processes with a new configuration and gracefully
shutdown older workers.
"""
# HUP: Reload the configuration.
self.gunicorn_master_proc.send_signal(signal.SIGHUP)
sleep(1)
self._wait_until_true(
lambda: self.num_workers_expected == self._get_num_workers_running(), timeout=self.master_timeout
)
def start(self) -> NoReturn:
"""Start monitoring the webserver."""
try:
self._wait_until_true(
lambda: self.num_workers_expected == self._get_num_workers_running(),
timeout=self.master_timeout,
)
while True:
if not self.gunicorn_master_proc.is_running():
sys.exit(1)
self._check_workers()
# Throttle loop
sleep(1)
except (AirflowWebServerTimeout, OSError) as err:
self.log.error(err)
self.log.error("Shutting down webserver")
try:
self.gunicorn_master_proc.terminate()
self.gunicorn_master_proc.wait()
finally:
sys.exit(1)
def _check_workers(self) -> None:
num_workers_running = self._get_num_workers_running()
num_ready_workers_running = self._get_num_ready_workers_running()
# Whenever some workers are not ready, wait until all workers are ready
if num_ready_workers_running < num_workers_running:
self.log.debug(
"[%d / %d] Some workers are starting up, waiting...",
num_ready_workers_running,
num_workers_running,
)
sleep(1)
return
# If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
# number of workers
if num_workers_running > self.num_workers_expected:
excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
self.log.debug(
"[%d / %d] Killing %s workers", num_ready_workers_running, num_workers_running, excess
)
self._kill_old_workers(excess)
return
# If there are too few workers, start a new worker by asking gunicorn
# to increase number of workers
if num_workers_running < self.num_workers_expected:
self.log.error(
"[%d / %d] Some workers seem to have died and gunicorn did not restart them as expected",
num_ready_workers_running,
num_workers_running,
)
sleep(10)
num_workers_running = self._get_num_workers_running()
if num_workers_running < self.num_workers_expected:
new_worker_count = min(
self.num_workers_expected - num_workers_running, self.worker_refresh_batch_size
)
# log at info since we are trying fix an error logged just above
self.log.info(
"[%d / %d] Spawning %d workers",
num_ready_workers_running,
num_workers_running,
new_worker_count,
)
self._spawn_new_workers(new_worker_count)
return
# Now the number of running and expected worker should be equal
# If workers should be restarted periodically.
if self.worker_refresh_interval > 0 and self._last_refresh_time:
# and we refreshed the workers a long time ago, refresh the workers
last_refresh_diff = time.monotonic() - self._last_refresh_time
if self.worker_refresh_interval < last_refresh_diff:
num_new_workers = self.worker_refresh_batch_size
self.log.debug(
"[%d / %d] Starting doing a refresh. Starting %d workers.",
num_ready_workers_running,
num_workers_running,
num_new_workers,
)
self._spawn_new_workers(num_new_workers)
self._last_refresh_time = time.monotonic()
return
# if we should check the directory with the plugin,
if self.reload_on_plugin_change:
# compare the previous and current contents of the directory
new_state = self._generate_plugin_state()
# If changed, wait until its content is fully saved.
if new_state != self._last_plugin_state:
self.log.debug(
"[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the "
"plugin directory is checked, if there is no change in it.",
num_ready_workers_running,
num_workers_running,
)
self._restart_on_next_plugin_check = True
self._last_plugin_state = new_state
elif self._restart_on_next_plugin_check:
self.log.debug(
"[%d / %d] Starts reloading the gunicorn configuration.",
num_ready_workers_running,
num_workers_running,
)
self._restart_on_next_plugin_check = False
self._last_refresh_time = time.monotonic()
self._reload_gunicorn()
@cli_utils.action_cli
@providers_configuration_loaded
def webserver(args):
"""Start Airflow Webserver."""
print(settings.HEADER)
# Check for old/insecure config, and fail safe (i.e. don't launch) if the config is wildly insecure.
if conf.get("webserver", "secret_key") == "temporary_key":
from rich import print as rich_print
rich_print(
"[red][bold]ERROR:[/bold] The `secret_key` setting under the webserver config has an insecure "
"value - Airflow has failed safe and refuses to start. Please change this value to a new, "
"per-environment, randomly generated string, for example using this command `[cyan]openssl rand "
"-hex 30[/cyan]`",
file=sys.stderr,
)
sys.exit(1)
access_logfile = args.access_logfile or conf.get("webserver", "access_logfile")
error_logfile = args.error_logfile or conf.get("webserver", "error_logfile")
access_logformat = args.access_logformat or conf.get("webserver", "access_logformat")
num_workers = args.workers or conf.get("webserver", "workers")
worker_timeout = args.worker_timeout or conf.get("webserver", "web_server_worker_timeout")
ssl_cert = args.ssl_cert or conf.get("webserver", "web_server_ssl_cert")
ssl_key = args.ssl_key or conf.get("webserver", "web_server_ssl_key")
if not ssl_cert and ssl_key:
raise AirflowException("An SSL certificate must also be provided for use with " + ssl_key)
if ssl_cert and not ssl_key:
raise AirflowException("An SSL key must also be provided for use with " + ssl_cert)
from airflow.www.app import create_app
if args.debug:
print(f"Starting the web server on port {args.port} and host {args.hostname}.")
app = create_app(testing=conf.getboolean("core", "unit_test_mode"))
app.run(
debug=True,
use_reloader=not app.config["TESTING"],
port=args.port,
host=args.hostname,
ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None,
)
else:
print(
textwrap.dedent(
f"""\
Running the Gunicorn Server with:
Workers: {num_workers} {args.workerclass}
Host: {args.hostname}:{args.port}
Timeout: {worker_timeout}
Logfiles: {access_logfile} {error_logfile}
Access Logformat: {access_logformat}
================================================================="""
)
)
pid_file, _, _, _ = setup_locations("webserver", pid=args.pid)
run_args = [
sys.executable,
"-m",
"gunicorn",
"--workers",
str(num_workers),
"--worker-class",
str(args.workerclass),
"--timeout",
str(worker_timeout),
"--bind",
args.hostname + ":" + str(args.port),
"--name",
"airflow-webserver",
"--pid",
pid_file,
"--config",
"python:airflow.www.gunicorn_config",
]
if args.access_logfile:
run_args += ["--access-logfile", str(args.access_logfile)]
if args.error_logfile:
run_args += ["--error-logfile", str(args.error_logfile)]
if args.access_logformat and args.access_logformat.strip():
run_args += ["--access-logformat", str(args.access_logformat)]
if args.daemon:
run_args += ["--daemon"]
if ssl_cert:
run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]
run_args += ["airflow.www.app:cached_app()"]
if conf.getboolean("webserver", "reload_on_plugin_change", fallback=False):
log.warning(
"Setting reload_on_plugin_change = true prevents running Gunicorn with preloading. "
"This means the app cannot be loaded before workers are forked, and each worker has a "
"separate copy of the app. This may cause IntegrityError during webserver startup, and "
"should be avoided in production."
)
else:
# To prevent different workers creating the web app and
# all writing to the database at the same time, we use the --preload option.
run_args += ["--preload"]
def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
log.info("Received signal: %s. Closing gunicorn.", signum)
gunicorn_master_proc.terminate()
with suppress(TimeoutError):
gunicorn_master_proc.wait(timeout=30)
if isinstance(gunicorn_master_proc, subprocess.Popen):
still_running = gunicorn_master_proc.poll() is not None
else:
still_running = gunicorn_master_proc.is_running()
if still_running:
gunicorn_master_proc.kill()
sys.exit(0)
def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
# Register signal handlers
signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
GunicornMonitor(
gunicorn_master_pid=gunicorn_master_proc.pid,
num_workers_expected=num_workers,
master_timeout=conf.getint("webserver", "web_server_master_timeout"),
worker_refresh_interval=conf.getint("webserver", "worker_refresh_interval", fallback=30),
worker_refresh_batch_size=conf.getint("webserver", "worker_refresh_batch_size", fallback=1),
reload_on_plugin_change=conf.getboolean(
"webserver", "reload_on_plugin_change", fallback=False
),
).start()
def start_and_monitor_gunicorn(args):
if args.daemon:
subprocess.Popen(run_args, close_fds=True)
# Reading pid of gunicorn master as it will be different that
# the one of process spawned above.
gunicorn_master_proc_pid = None
while not gunicorn_master_proc_pid:
sleep(0.1)
gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)
# Run Gunicorn monitor
gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
monitor_gunicorn(gunicorn_master_proc)
else:
with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc:
monitor_gunicorn(gunicorn_master_proc)
if args.daemon:
# This makes possible errors get reported before daemonization
os.environ["SKIP_DAGS_PARSING"] = "True"
create_app(None)
os.environ.pop("SKIP_DAGS_PARSING")
pid_file_path = Path(pid_file)
monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
run_command_with_daemon_option(
args=args,
process_name="webserver",
callback=lambda: start_and_monitor_gunicorn(args),
should_setup_logging=True,
pid_file=monitor_pid_file,
)