Skip to content

Commit

Permalink
Merge pull request #370 from Open-EO/cleanup-logging
Browse files Browse the repository at this point in the history
For testing on dev. Revert this if it raises any issues.
  • Loading branch information
EmileSonneveld authored Mar 17, 2023
2 parents 5b4a3a9 + 076a679 commit c934f17
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 25 deletions.
45 changes: 38 additions & 7 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import TemporalDimension, SpatialDimension, Band, BandDimension
from openeo.util import dict_no_none, rfc3339, deep_get
from openeo.util import dict_no_none, rfc3339, deep_get, repr_truncate
from openeo_driver import backend
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing, ENV_SAVE_RESULT
from openeo_driver.backend import (ServiceMetadata, BatchJobMetadata, OidcProvider, ErrorSummary, LoadParameters,
Expand Down Expand Up @@ -817,25 +817,56 @@ def accept_process_graph(cls, process_graph):
return SingleNodeUDFProcessGraphVisitor().accept_process_graph(process_graph)
return GeotrellisTileProcessGraphVisitor().accept_process_graph(process_graph)

def summarize_exception(self, error: Exception) -> Union[ErrorSummary, Exception]:
if isinstance(error, Py4JJavaError):
def summarize_exception(self, error: Exception, width=2000) -> Union[ErrorSummary, Exception]:
return self.summarize_exception_static(error, width)

@staticmethod
def summarize_exception_static(error: Exception, width=2000) -> Union[ErrorSummary, Exception]:
if "Container killed on request. Exit code is 143" in str(error):
is_client_error = False # Give user the benefit of doubt.
summary = "Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate."

elif isinstance(error, Py4JJavaError):
java_exception = error.java_exception

while java_exception.getCause() is not None and java_exception != java_exception.getCause():
java_exception = java_exception.getCause()

java_exception_class_name = java_exception.getClass().getName()
java_exception_message = java_exception.getMessage()
java_exception_message = repr_truncate(java_exception.getMessage(), width=width)

no_data_found = (java_exception_class_name == 'java.lang.AssertionError'
and "Cannot stitch empty collection" in java_exception_message)

is_client_error = java_exception_class_name == 'java.lang.IllegalArgumentException' or no_data_found
summary = "Cannot construct an image because the given boundaries resulted in an empty image collection" if no_data_found else java_exception_message
if no_data_found:
summary = "Cannot construct an image because the given boundaries resulted in an empty image collection"
elif "SparkException" in java_exception_class_name:
udf_stacktrace = GeoPySparkBackendImplementation.extract_udf_stacktrace(java_exception_message)
if udf_stacktrace:
summary = f"UDF Exception during Spark execution: {udf_stacktrace}"
else:
summary = f"Exception during Spark execution: {java_exception_message}"
else:
summary = java_exception_message
else:
is_client_error = False # Give user the benefit of doubt.
summary = repr_truncate(error, width=width)

return ErrorSummary(error, is_client_error, summary)
return ErrorSummary(error, is_client_error, summary)

return error
@staticmethod
def extract_udf_stacktrace(full_stacktrace) -> Optional[str]:
"""
Select all lines a bit under 'run_udf_code'.
This is what interests the user
"""
regex = re.compile(r" in run_udf_code\n.*\n((.|\n)*)", re.MULTILINE)

match = regex.search(full_stacktrace)
if match:
return match.group(1).rstrip()
return None

def changelog(self) -> Union[str, Path]:
roots = []
Expand Down
19 changes: 2 additions & 17 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,23 +354,8 @@ def run_driver():
run_driver()

except Exception as e:
if isinstance(e, Py4JJavaError):
java_exception = e.java_exception
if "SparkException" in java_exception.getClass().getName():
# TODO: Extract UDF specific stack to show user.
message = f"Your openEO batch job failed during Spark execution: {repr_truncate(str(java_exception.getMessage()), width=5000)}"
if java_exception.getCause() is not None:
message = f"Your openEO batch job failed during Spark execution: {repr_truncate(str(java_exception.getCause().getMessage()), width=5000)}"
else:
message = f"Your openEO batch job failed: {repr_truncate(str(java_exception.getMessage()), width=1000)}"
else:
message = f"Your openEO batch job failed: {repr_truncate(e, width=1000)}"

if "Container killed on request. Exit code is 143" in str(e):
message = "Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate."

if message is not None:
user_facing_logger.exception(message)
message = GeoPySparkBackendImplementation.summarize_exception(e)
user_facing_logger.exception("OpenEO batch job failed: " + message)

raise

Expand Down
3 changes: 3 additions & 0 deletions scripts/batch_job_log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ log4j.logger.org.apache.spark=ERROR, openeo, userlogs
log4j.logger.org.apache.spark.storage.DiskBlockObjectWriter=OFF, openeo, userlogs
log4j.logger.org.apache.zookeeper=WARN, openeo, userlogs
log4j.logger.org.apache.curator=WARN, openeo, userlogs
log4j.logger.org.apache.spark.executor.Executor=OFF
# TaskSetManager will give a warning when a task fails the first time, and an error when it ran out of retries:
log4j.logger.org.apache.spark.scheduler.TaskSetManager=WARN
90 changes: 89 additions & 1 deletion tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.utils import EvalEnv

from openeogeotrellis.backend import GpsBatchJobs, GpsProcessing
from openeogeotrellis.backend import GpsBatchJobs, GpsProcessing, GeoPySparkBackendImplementation

def test_extract_application_id():
yarn_log = """
Expand Down Expand Up @@ -217,3 +217,91 @@ def test_extra_validation_layer_too_large_geometrycollection(backend_implementat
errors = list(processing.extra_validation(pg, env, None, env_source_constraints))
assert len(errors) == 1
assert errors[0]['code'] == "LayerTooLarge"


def test_extract_udf_stacktrace_1():
summarized = GeoPySparkBackendImplementation.extract_udf_stacktrace("""
Traceback (most recent call last):
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 52, in memory_logging_wrapper
return function(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 44, in wrapper
return _FUNCTION_POINTERS[key](*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 37, in first_time
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 701, in tile_function
result_data = run_udf_code(code=udf_code, data=data)
File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 180, in run_udf_code
func(data)
File "<string>", line 8, in transform
File "<string>", line 7, in function_in_transform
File "<string>", line 4, in function_in_root
Exception: This error message should be visible to user
""")
assert summarized == """ File "<string>", line 8, in transform
File "<string>", line 7, in function_in_transform
File "<string>", line 4, in function_in_root
Exception: This error message should be visible to user"""


def test_extract_udf_stacktrace_2():
summarized = GeoPySparkBackendImplementation.extract_udf_stacktrace("""Traceback (most recent call last):
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 49, in memory_logging_wrapper
return function(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 44, in wrapper
return _FUNCTION_POINTERS[key](*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 37, in first_time
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 519, in tile_function
result_data = run_udf_code(code=udf_code, data=data)
File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 175, in run_udf_code
result_cube = func(data.get_datacube_list()[0], data.user_context)
File "<string>", line 156, in apply_datacube
TypeError: inspect() got multiple values for argument 'data'
""")
assert summarized == """ File "<string>", line 156, in apply_datacube
TypeError: inspect() got multiple values for argument 'data'"""


def test_extract_udf_stacktrace_no_udf():
summarized = GeoPySparkBackendImplementation.extract_udf_stacktrace("""Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/epsel.py", line 44, in wrapper
return _FUNCTION_POINTERS[key](*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/epsel.py", line 37, in first_time
return f(*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/openeo/util.py", line 362, in wrapper
return f(*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py", line 794, in process_product
dem_dir_context = S1BackscatterOrfeo._get_dem_dir_context(
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py", line 258, in _get_dem_dir_context
dem_dir_context = S1BackscatterOrfeo._creodias_dem_subset_srtm_hgt_unzip(
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py", line 664, in _creodias_dem_subset_srtm_hgt_unzip
with zipfile.ZipFile(zip_filename, 'r') as z:
File "/usr/lib64/python3.8/zipfile.py", line 1251, in __init__
self.fp = io.open(file, filemode)
FileNotFoundError: [Errno 2] No such file or directory: '/eodata/auxdata/SRTMGL1/dem/N64E024.SRTMGL1.hgt.zip'
""")
assert summarized is None
56 changes: 56 additions & 0 deletions tests/test_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from geopyspark import TiledRasterLayer, Extent
from openeo_driver.utils import EvalEnv
from shapely.geometry import MultiPolygon

from openeogeotrellis.backend import GeoPySparkBackendImplementation
from openeogeotrellis.geopysparkdatacube import GeopysparkDataCube


# Note: Ensure that the python environment has all the required modules installed.
# Numpy should be installed before Jep for off-heap memory tiles to work!
#
# Note: In order to run these tests you need to set several environment variables.
# If you use the virtual environment venv (with JEP and Numpy installed):
# 1. LD_LIBRARY_PATH = .../venv/lib/python3.6/site-packages/jep
# This will look for the shared library 'jep.so'. This is the compiled C code that binds Java and Python objects.

def test_chunk_polygon_exception(imagecollection_with_two_bands_and_three_dates):
udf_code = """
import xarray
from openeo.udf import XarrayDataCube
def function_in_root():
raise Exception("This error message should be visible to user")
def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
function_in_root()
array = cube.get_array()
return XarrayDataCube(array)
"""
udf_add_to_bands = {
"udf_process": {
"arguments": {
"data": {
"from_argument": "dimension_data"
},
"udf": udf_code
},
"process_id": "run_udf",
"result": True
},
}
env = EvalEnv()

polygon1 = Extent(0.0, 0.0, 4.0, 4.0).to_polygon
chunks = MultiPolygon([polygon1])
cube: GeopysparkDataCube = imagecollection_with_two_bands_and_three_dates
try:
result_cube: GeopysparkDataCube = cube.chunk_polygon(udf_add_to_bands, chunks=chunks, mask_value=None, env=env)
result_layer: TiledRasterLayer = result_cube.pyramid.levels[0]
result_layer.to_numpy_rdd().collect()
except Exception as e:
error_summary = GeoPySparkBackendImplementation.summarize_exception_static(e)
print(error_summary.summary)
assert "This error message should be visible to user" in error_summary.summary
else:
raise Exception("There should have been an exception raised in the try clause.")

0 comments on commit c934f17

Please sign in to comment.