Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup logging #370

Merged
merged 4 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.metadata import TemporalDimension, SpatialDimension, Band, BandDimension
from openeo.util import dict_no_none, rfc3339, deep_get
from openeo.util import dict_no_none, rfc3339, deep_get, repr_truncate
from openeo_driver import backend
from openeo_driver.ProcessGraphDeserializer import ConcreteProcessing, ENV_SAVE_RESULT
from openeo_driver.backend import (ServiceMetadata, BatchJobMetadata, OidcProvider, ErrorSummary, LoadParameters,
Expand Down Expand Up @@ -817,25 +817,56 @@ def accept_process_graph(cls, process_graph):
return SingleNodeUDFProcessGraphVisitor().accept_process_graph(process_graph)
return GeotrellisTileProcessGraphVisitor().accept_process_graph(process_graph)

def summarize_exception(self, error: Exception) -> Union[ErrorSummary, Exception]:
if isinstance(error, Py4JJavaError):
def summarize_exception(self, error: Exception, width=2000) -> Union[ErrorSummary, Exception]:
return self.summarize_exception_static(error, width)

@staticmethod
def summarize_exception_static(error: Exception, width=2000) -> Union[ErrorSummary, Exception]:
if "Container killed on request. Exit code is 143" in str(error):
is_client_error = False # Give user the benefit of doubt.
summary = "Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate."

elif isinstance(error, Py4JJavaError):
java_exception = error.java_exception

while java_exception.getCause() is not None and java_exception != java_exception.getCause():
java_exception = java_exception.getCause()

java_exception_class_name = java_exception.getClass().getName()
java_exception_message = java_exception.getMessage()
java_exception_message = repr_truncate(java_exception.getMessage(), width=width)

no_data_found = (java_exception_class_name == 'java.lang.AssertionError'
and "Cannot stitch empty collection" in java_exception_message)

is_client_error = java_exception_class_name == 'java.lang.IllegalArgumentException' or no_data_found
summary = "Cannot construct an image because the given boundaries resulted in an empty image collection" if no_data_found else java_exception_message
if no_data_found:
summary = "Cannot construct an image because the given boundaries resulted in an empty image collection"
elif "SparkException" in java_exception_class_name:
udf_stacktrace = GeoPySparkBackendImplementation.extract_udf_stacktrace(java_exception_message)
if udf_stacktrace:
summary = f"UDF Exception during Spark execution: {udf_stacktrace}"
else:
summary = f"Exception during Spark execution: {java_exception_message}"
else:
summary = java_exception_message
else:
is_client_error = False # Give user the benefit of doubt.
summary = repr_truncate(error, width=width)

return ErrorSummary(error, is_client_error, summary)
return ErrorSummary(error, is_client_error, summary)

return error
@staticmethod
def extract_udf_stacktrace(full_stacktrace) -> Optional[str]:
"""
Select all lines a bit under 'run_udf_code'.
This is what interests the user
"""
regex = re.compile(r" in run_udf_code\n.*\n((.|\n)*)", re.MULTILINE)

match = regex.search(full_stacktrace)
if match:
return match.group(1).rstrip()
return None

def changelog(self) -> Union[str, Path]:
roots = []
Expand Down
19 changes: 2 additions & 17 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,23 +354,8 @@ def run_driver():
run_driver()

except Exception as e:
if isinstance(e, Py4JJavaError):
java_exception = e.java_exception
if "SparkException" in java_exception.getClass().getName():
# TODO: Extract UDF specific stack to show user.
message = f"Your openEO batch job failed during Spark execution: {repr_truncate(str(java_exception.getMessage()), width=5000)}"
if java_exception.getCause() is not None:
message = f"Your openEO batch job failed during Spark execution: {repr_truncate(str(java_exception.getCause().getMessage()), width=5000)}"
else:
message = f"Your openEO batch job failed: {repr_truncate(str(java_exception.getMessage()), width=1000)}"
else:
message = f"Your openEO batch job failed: {repr_truncate(e, width=1000)}"

if "Container killed on request. Exit code is 143" in str(e):
message = "Your batch job failed because workers used too much Python memory. The same task was attempted multiple times. Consider increasing executor-memoryOverhead or contact the developers to investigate."

if message is not None:
user_facing_logger.exception(message)
message = GeoPySparkBackendImplementation.summarize_exception(e)
user_facing_logger.exception("OpenEO batch job failed: " + message)

raise

Expand Down
3 changes: 3 additions & 0 deletions scripts/batch_job_log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ log4j.logger.org.apache.spark=ERROR, openeo, userlogs
log4j.logger.org.apache.spark.storage.DiskBlockObjectWriter=OFF, openeo, userlogs
log4j.logger.org.apache.zookeeper=WARN, openeo, userlogs
log4j.logger.org.apache.curator=WARN, openeo, userlogs
log4j.logger.org.apache.spark.executor.Executor=OFF
# TaskSetManager will give a warning when a task fails the first time, and an error when it ran out of retries:
log4j.logger.org.apache.spark.scheduler.TaskSetManager=WARN
90 changes: 89 additions & 1 deletion tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from openeo_driver.delayed_vector import DelayedVector
from openeo_driver.utils import EvalEnv

from openeogeotrellis.backend import GpsBatchJobs, GpsProcessing
from openeogeotrellis.backend import GpsBatchJobs, GpsProcessing, GeoPySparkBackendImplementation

def test_extract_application_id():
yarn_log = """
Expand Down Expand Up @@ -217,3 +217,91 @@ def test_extra_validation_layer_too_large_geometrycollection(backend_implementat
errors = list(processing.extra_validation(pg, env, None, env_source_constraints))
assert len(errors) == 1
assert errors[0]['code'] == "LayerTooLarge"


def test_extract_udf_stacktrace_1():
summarized = GeoPySparkBackendImplementation.extract_udf_stacktrace("""
Traceback (most recent call last):
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 52, in memory_logging_wrapper
return function(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 44, in wrapper
return _FUNCTION_POINTERS[key](*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 37, in first_time
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 701, in tile_function
result_data = run_udf_code(code=udf_code, data=data)
File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 180, in run_udf_code
func(data)
File "<string>", line 8, in transform
File "<string>", line 7, in function_in_transform
File "<string>", line 4, in function_in_root
Exception: This error message should be visible to user
""")
assert summarized == """ File "<string>", line 8, in transform
File "<string>", line 7, in function_in_transform
File "<string>", line 4, in function_in_root
Exception: This error message should be visible to user"""


def test_extract_udf_stacktrace_2():
summarized = GeoPySparkBackendImplementation.extract_udf_stacktrace("""Traceback (most recent call last):
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/opt/spark3_2_0/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/utils.py", line 49, in memory_logging_wrapper
return function(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 44, in wrapper
return _FUNCTION_POINTERS[key](*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/epsel.py", line 37, in first_time
return f(*args, **kwargs)
File "/opt/venv/lib64/python3.8/site-packages/openeogeotrellis/geopysparkdatacube.py", line 519, in tile_function
result_data = run_udf_code(code=udf_code, data=data)
File "/opt/venv/lib64/python3.8/site-packages/openeo/udf/run_code.py", line 175, in run_udf_code
result_cube = func(data.get_datacube_list()[0], data.user_context)
File "<string>", line 156, in apply_datacube
TypeError: inspect() got multiple values for argument 'data'
""")
assert summarized == """ File "<string>", line 156, in apply_datacube
TypeError: inspect() got multiple values for argument 'data'"""


def test_extract_udf_stacktrace_no_udf():
summarized = GeoPySparkBackendImplementation.extract_udf_stacktrace("""Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process
serializer.dump_stream(out_iter, outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 74, in wrapper
return f(*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/epsel.py", line 44, in wrapper
return _FUNCTION_POINTERS[key](*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/epsel.py", line 37, in first_time
return f(*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/openeo/util.py", line 362, in wrapper
return f(*args, **kwargs)
File "/opt/openeo/lib/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py", line 794, in process_product
dem_dir_context = S1BackscatterOrfeo._get_dem_dir_context(
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py", line 258, in _get_dem_dir_context
dem_dir_context = S1BackscatterOrfeo._creodias_dem_subset_srtm_hgt_unzip(
File "/opt/openeo/lib64/python3.8/site-packages/openeogeotrellis/collections/s1backscatter_orfeo.py", line 664, in _creodias_dem_subset_srtm_hgt_unzip
with zipfile.ZipFile(zip_filename, 'r') as z:
File "/usr/lib64/python3.8/zipfile.py", line 1251, in __init__
self.fp = io.open(file, filemode)
FileNotFoundError: [Errno 2] No such file or directory: '/eodata/auxdata/SRTMGL1/dem/N64E024.SRTMGL1.hgt.zip'
""")
assert summarized is None
56 changes: 56 additions & 0 deletions tests/test_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from geopyspark import TiledRasterLayer, Extent
from openeo_driver.utils import EvalEnv
from shapely.geometry import MultiPolygon

from openeogeotrellis.backend import GeoPySparkBackendImplementation
from openeogeotrellis.geopysparkdatacube import GeopysparkDataCube


# Note: Ensure that the python environment has all the required modules installed.
# Numpy should be installed before Jep for off-heap memory tiles to work!
#
# Note: In order to run these tests you need to set several environment variables.
# If you use the virtual environment venv (with JEP and Numpy installed):
# 1. LD_LIBRARY_PATH = .../venv/lib/python3.6/site-packages/jep
# This will look for the shared library 'jep.so'. This is the compiled C code that binds Java and Python objects.

def test_chunk_polygon_exception(imagecollection_with_two_bands_and_three_dates):
udf_code = """
import xarray
from openeo.udf import XarrayDataCube

def function_in_root():
raise Exception("This error message should be visible to user")

def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
function_in_root()
array = cube.get_array()
return XarrayDataCube(array)
"""
udf_add_to_bands = {
"udf_process": {
"arguments": {
"data": {
"from_argument": "dimension_data"
},
"udf": udf_code
},
"process_id": "run_udf",
"result": True
},
}
env = EvalEnv()

polygon1 = Extent(0.0, 0.0, 4.0, 4.0).to_polygon
chunks = MultiPolygon([polygon1])
cube: GeopysparkDataCube = imagecollection_with_two_bands_and_three_dates
try:
result_cube: GeopysparkDataCube = cube.chunk_polygon(udf_add_to_bands, chunks=chunks, mask_value=None, env=env)
result_layer: TiledRasterLayer = result_cube.pyramid.levels[0]
result_layer.to_numpy_rdd().collect()
except Exception as e:
error_summary = GeoPySparkBackendImplementation.summarize_exception_static(e)
print(error_summary.summary)
assert "This error message should be visible to user" in error_summary.summary
else:
raise Exception("There should have been an exception raised in the try clause.")