diff --git a/airflow/hooks/subprocess.py b/airflow/hooks/subprocess.py index c814b0528d25a..fa8c706c6963d 100644 --- a/airflow/hooks/subprocess.py +++ b/airflow/hooks/subprocess.py @@ -88,7 +88,7 @@ def pre_exec(): raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b''): - line = raw_line.decode(output_encoding).rstrip() + line = raw_line.decode(output_encoding, errors='backslashreplace').rstrip() self.log.info("%s", line) self.sub_process.wait() diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 17c2225631a48..eeecd4aa776cc 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -199,7 +199,8 @@ def follow_logs(since_time: Optional[DateTime] = None) -> Optional[DateTime]: ), ) for line in logs: - timestamp, message = self.parse_log_line(line.decode('utf-8')) + line = line.decode('utf-8', errors="backslashreplace") + timestamp, message = self.parse_log_line(line) self.log.info(message) except BaseHTTPError: # Catches errors like ProtocolError(TimeoutError). self.log.warning( diff --git a/tests/hooks/test_subprocess.py b/tests/hooks/test_subprocess.py index 642b219c0197a..5b7fef0c9cf03 100644 --- a/tests/hooks/test_subprocess.py +++ b/tests/hooks/test_subprocess.py @@ -96,3 +96,9 @@ def test_should_exec_subprocess(self, mock_popen, mock_temporary_directory): stderr=STDOUT, stdout=PIPE, ) + + def test_task_decode(self): + hook = SubprocessHook() + command=['bash', '-c', "source "+__file__[:-2]+'sh'] + result = hook.run_command(command=command) + assert result.exit_code==0 diff --git a/tests/hooks/test_subprocess.sh b/tests/hooks/test_subprocess.sh new file mode 100644 index 0000000000000..1d1265fc9649b --- /dev/null +++ b/tests/hooks/test_subprocess.sh @@ -0,0 +1 @@ +printf 'This will cause a coding error \xb1\xa6\x01\n' # v2.2.3: airflow/hooks/subprocess.py:88: UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb1 in position 31: invalid start byte