Skip to content

Commit

Permalink
Use a single statement with multiple contexts instead of nested state…
Browse files Browse the repository at this point in the history
…ments in core (#33769)
  • Loading branch information
hussein-awala authored Aug 26, 2023
1 parent c90eec9 commit 60e6847
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 41 deletions.
75 changes: 38 additions & 37 deletions airflow/sensors/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,44 +68,45 @@ def poke(self, context: Context):
"""Execute the bash command in a temporary directory."""
bash_command = self.bash_command
self.log.info("Tmp dir root location: %s", gettempdir())
with TemporaryDirectory(prefix="airflowtmp") as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
f.write(bytes(bash_command, "utf_8"))
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
self.log.info("Temporary script location: %s", script_location)
self.log.info("Running command: %s", bash_command)
with TemporaryDirectory(prefix="airflowtmp") as tmp_dir, NamedTemporaryFile(
dir=tmp_dir, prefix=self.task_id
) as f:
f.write(bytes(bash_command, "utf_8"))
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
self.log.info("Temporary script location: %s", script_location)
self.log.info("Running command: %s", bash_command)

with Popen(
["bash", fname],
stdout=PIPE,
stderr=STDOUT,
close_fds=True,
cwd=tmp_dir,
env=self.env,
preexec_fn=os.setsid,
) as resp:
if resp.stdout:
self.log.info("Output:")
for line in iter(resp.stdout.readline, b""):
self.log.info(line.decode(self.output_encoding).strip())
resp.wait()
self.log.info("Command exited with return code %s", resp.returncode)
with Popen(
["bash", fname],
stdout=PIPE,
stderr=STDOUT,
close_fds=True,
cwd=tmp_dir,
env=self.env,
preexec_fn=os.setsid,
) as resp:
if resp.stdout:
self.log.info("Output:")
for line in iter(resp.stdout.readline, b""):
self.log.info(line.decode(self.output_encoding).strip())
resp.wait()
self.log.info("Command exited with return code %s", resp.returncode)

# zero code means success, the sensor can go green
if resp.returncode == 0:
return True
# zero code means success, the sensor can go green
if resp.returncode == 0:
return True

# we have a retry exit code, sensor retries if return code matches, otherwise error
elif self.retry_exit_code is not None:
if resp.returncode == self.retry_exit_code:
self.log.info("Return code matches retry code, will retry later")
return False
else:
raise AirflowFailException(f"Command exited with return code {resp.returncode}")

# backwards compatibility: sensor retries no matter the error code
else:
self.log.info("Non-zero return code and no retry code set, will retry later")
# we have a retry exit code, sensor retries if return code matches, otherwise error
elif self.retry_exit_code is not None:
if resp.returncode == self.retry_exit_code:
self.log.info("Return code matches retry code, will retry later")
return False
else:
raise AirflowFailException(f"Command exited with return code {resp.returncode}")

# backwards compatibility: sensor retries no matter the error code
else:
self.log.info("Non-zero return code and no retry code set, will retry later")
return False
7 changes: 3 additions & 4 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1650,10 +1650,9 @@ def resetdb(session: Session = NEW_SESSION, skip_init: bool = False):

connection = settings.engine.connect()

with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
with connection.begin():
drop_airflow_models(connection)
drop_airflow_moved_tables(connection)
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS), connection.begin():
drop_airflow_models(connection)
drop_airflow_moved_tables(connection)

if not skip_init:
initdb(session=session)
Expand Down

0 comments on commit 60e6847

Please sign in to comment.