Skip to content

Commit

Permalink
Only log the UDF stack when there is a crash there.
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Mar 15, 2023
1 parent 5b4a3a9 commit a9a0fe5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 19 deletions.
39 changes: 38 additions & 1 deletion openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import TemporalDimension, SpatialDimension, Band, BandDimension
from openeo.util import dict_no_none, rfc3339, deep_get
from openeo.util import dict_no_none, rfc3339, deep_get, repr_truncate
from openeo_driver import backend
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing, ENV_SAVE_RESULT
from openeo_driver.backend import (ServiceMetadata, BatchJobMetadata, OidcProvider, ErrorSummary, LoadParameters,
Expand Down Expand Up @@ -837,6 +837,43 @@ def summarize_exception(self, error: Exception) -> Union[ErrorSummary, Exception

return error

@staticmethod
def extract_udf_stacktrace(full_stacktrace) -> Optional[str]:
"""
Select all lines a bit under 'run_udf_code'.
This is what interests the user
"""
regex = re.compile(r" in run_udf_code\n.*\n((.|\n)*)", re.MULTILINE)

match = regex.search(full_stacktrace)
if match:
return match.group(1).rstrip()
return None

@staticmethod
def summarize_batch_job_exception(e: Exception, width=1000) -> str:
if "Container killed on request. Exit code is 143" in str(e):
return "Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate."

if isinstance(e, Py4JJavaError):
java_exception = e.java_exception
if "SparkException" in java_exception.getClass().getName():

# Could maybe make this a while loop like in 'summarize_exception'
if java_exception.getCause() is not None:
java_exception = java_exception.getCause()

exception_message = str(java_exception.getMessage())

udf_stacktrace = GeoPySparkBackendImplementation.extract_udf_stacktrace(exception_message)
exception_message = udf_stacktrace or exception_message

return f"Your openEO batch job failed during Spark execution: {repr_truncate(exception_message, width=width)}"
else:
return f"Your openEO batch job failed: {repr_truncate(str(java_exception.getMessage()), width=width)}"
else:
return f"Your openEO batch job failed: {repr_truncate(e, width=width)}"

def changelog(self) -> Union[str, Path]:
roots = []
if Path(__file__).parent.parent.name == "openeo-geopyspark-driver":
Expand Down
19 changes: 2 additions & 17 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,23 +354,8 @@ def run_driver():
run_driver()

except Exception as e:
if isinstance(e, Py4JJavaError):
java_exception = e.java_exception
if "SparkException" in java_exception.getClass().getName():
# TODO: Extract UDF specific stack to show user.
message = f"Your openEO batch job failed during Spark execution: {repr_truncate(str(java_exception.getMessage()), width=5000)}"
if java_exception.getCause() is not None:
message = f"Your openEO batch job failed during Spark execution: {repr_truncate(str(java_exception.getCause().getMessage()), width=5000)}"
else:
message = f"Your openEO batch job failed: {repr_truncate(str(java_exception.getMessage()), width=1000)}"
else:
message = f"Your openEO batch job failed: {repr_truncate(e, width=1000)}"

if "Container killed on request. Exit code is 143" in str(e):
message = "Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate."

if message is not None:
user_facing_logger.exception(message)
message = GeoPySparkBackendImplementation.summarize_batch_job_exception(e)
user_facing_logger.exception(message)

raise

Expand Down
34 changes: 33 additions & 1 deletion tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.utils import EvalEnv

from openeogeotrellis.backend import GpsBatchJobs, GpsProcessing
from openeogeotrellis.backend import GpsBatchJobs, GpsProcessing, GeoPySparkBackendImplementation

def test_extract_application_id():
yarn_log = """
Expand Down Expand Up @@ -217,3 +217,35 @@ def test_extra_validation_layer_too_large_geometrycollection(backend_implementat
errors = list(processing.extra_validation(pg, env, None, env_source_constraints))
assert len(errors) == 1
assert errors[0]['code'] == "LayerTooLarge"


def test_extract_udf_stacktrace(tmp_path):
summarized = GeoPySparkBackendImplementation.extract_udf_stacktrace("""
Traceback (most recent call last):
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 52, in memory_logging_wrapper
return function(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 44, in wrapper
return _FUNCTION_POINTERS[key](*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 37, in first_time
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 701, in tile_function
result_data = run_udf_code(code=udf_code, data=data)
File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 180, in run_udf_code
func(data)
File "<string>", line 8, in transform
File "<string>", line 7, in function_in_transform
File "<string>", line 4, in function_in_root
Exception: This error message should be visible to user
""")
assert summarized == """ File "<string>", line 8, in transform
File "<string>", line 7, in function_in_transform
File "<string>", line 4, in function_in_root
Exception: This error message should be visible to user"""

0 comments on commit a9a0fe5

Please sign in to comment.