From 817c3fad1dd997c8e7dfcce2f13c39bc7320a38f Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Thu, 23 Sep 2021 16:43:30 -0700 Subject: [PATCH] Fix logic while parsing the sum statistic for numerical orc columns (#9183) Fixes #9182. In cases where the `sum` statistic was not present in the orc file for int and float columns, the values would be incorrectly interpreted as 0 because of protobuf's [default](https://developers.google.com/protocol-buffers/docs/proto#optional) values when fields are missing. This PR adds a check for field presence before assignment. Authors: - Ayush Dattagupta (https://github.com/ayushdg) Approvers: - Sheilah Kirui (https://github.com/skirui-source) - Vukasin Milovanovic (https://github.com/vuule) - Marlene (https://github.com/marlenezw) URL: https://github.com/rapidsai/cudf/pull/9183 --- python/cudf/cudf/io/orc.py | 86 ++++++++++++++++++++++++------ python/cudf/cudf/tests/test_orc.py | 71 ++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 16 deletions(-) diff --git a/python/cudf/cudf/io/orc.py b/python/cudf/cudf/io/orc.py index cc5e1909d67..3aa672223c9 100644 --- a/python/cudf/cudf/io/orc.py +++ b/python/cudf/cudf/io/orc.py @@ -41,37 +41,90 @@ def _parse_column_statistics(cs, column_statistics_blob): column_statistics["number_of_values"] = cs.numberOfValues if cs.HasField("hasNull"): column_statistics["has_null"] = cs.hasNull + if cs.HasField("intStatistics"): - column_statistics["minimum"] = cs.intStatistics.minimum - column_statistics["maximum"] = cs.intStatistics.maximum - column_statistics["sum"] = cs.intStatistics.sum + column_statistics["minimum"] = ( + cs.intStatistics.minimum + if cs.intStatistics.HasField("minimum") + else None + ) + column_statistics["maximum"] = ( + cs.intStatistics.maximum + if cs.intStatistics.HasField("maximum") + else None + ) + column_statistics["sum"] = ( + cs.intStatistics.sum if cs.intStatistics.HasField("sum") else None + ) + elif cs.HasField("doubleStatistics"): - column_statistics["minimum"] = cs.doubleStatistics.minimum - column_statistics["maximum"] = cs.doubleStatistics.maximum - column_statistics["sum"] = cs.doubleStatistics.sum + column_statistics["minimum"] = ( + cs.doubleStatistics.minimum + if cs.doubleStatistics.HasField("minimum") + else None + ) + column_statistics["maximum"] = ( + cs.doubleStatistics.maximum + if cs.doubleStatistics.HasField("maximum") + else None + ) + column_statistics["sum"] = ( + cs.doubleStatistics.sum + if cs.doubleStatistics.HasField("sum") + else None + ) + elif cs.HasField("stringStatistics"): - column_statistics["minimum"] = cs.stringStatistics.minimum - column_statistics["maximum"] = cs.stringStatistics.maximum + column_statistics["minimum"] = ( + cs.stringStatistics.minimum + if cs.stringStatistics.HasField("minimum") + else None + ) + column_statistics["maximum"] = ( + cs.stringStatistics.maximum + if cs.stringStatistics.HasField("maximum") + else None + ) column_statistics["sum"] = cs.stringStatistics.sum + elif cs.HasField("bucketStatistics"): column_statistics["true_count"] = cs.bucketStatistics.count[0] column_statistics["false_count"] = ( column_statistics["number_of_values"] - column_statistics["true_count"] ) + elif cs.HasField("decimalStatistics"): - column_statistics["minimum"] = cs.decimalStatistics.minimum - column_statistics["maximum"] = cs.decimalStatistics.maximum + column_statistics["minimum"] = ( + cs.decimalStatistics.minimum + if cs.decimalStatistics.HasField("minimum") + else None + ) + column_statistics["maximum"] = ( + cs.decimalStatistics.maximum + if cs.decimalStatistics.HasField("maximum") + else None + ) column_statistics["sum"] = cs.decimalStatistics.sum + elif cs.HasField("dateStatistics"): - column_statistics["minimum"] = datetime.datetime.fromtimestamp( - datetime.timedelta(cs.dateStatistics.minimum).total_seconds(), - datetime.timezone.utc, + column_statistics["minimum"] = ( + datetime.datetime.fromtimestamp( + datetime.timedelta(cs.dateStatistics.minimum).total_seconds(), + datetime.timezone.utc, + ) + if cs.dateStatistics.HasField("minimum") + else None ) - column_statistics["maximum"] = datetime.datetime.fromtimestamp( - datetime.timedelta(cs.dateStatistics.maximum).total_seconds(), - datetime.timezone.utc, + column_statistics["maximum"] = ( + datetime.datetime.fromtimestamp( + datetime.timedelta(cs.dateStatistics.maximum).total_seconds(), + datetime.timezone.utc, + ) + if cs.dateStatistics.HasField("maximum") + else None ) + elif cs.HasField("timestampStatistics"): # Before ORC-135, the local timezone offset was included and they were # stored as minimum and maximum. After ORC-135, the timestamp is @@ -87,6 +140,7 @@ def _parse_column_statistics(cs, column_statistics_blob): column_statistics["maximum"] = datetime.datetime.fromtimestamp( cs.timestampStatistics.maximumUtc / 1000, datetime.timezone.utc ) + elif cs.HasField("binaryStatistics"): column_statistics["sum"] = cs.binaryStatistics.sum diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index 61c2ff5ed36..1230b4b35f3 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -5,6 +5,7 @@ import os import random from io import BytesIO +from string import ascii_lowercase import numpy as np import pandas as pd @@ -1421,3 +1422,73 @@ def test_orc_writer_lists_empty_rg(data): pdf_out = pa.orc.ORCFile(buffer).read().to_pandas() assert_eq(pdf_in, pdf_out) + + +def test_statistics_sum_overflow(): + maxint64 = np.iinfo(np.int64).max + minint64 = np.iinfo(np.int64).min + + buff = BytesIO() + with po.Writer( + buff, po.Struct(a=po.BigInt(), b=po.BigInt(), c=po.BigInt()) + ) as writer: + writer.write((maxint64, minint64, minint64)) + writer.write((1, -1, 1)) + + file_stats, stripe_stats = cudf.io.orc.read_orc_statistics([buff]) + assert file_stats[0]["a"].get("sum") is None + assert file_stats[0]["b"].get("sum") is None + assert file_stats[0]["c"].get("sum") == minint64 + 1 + + assert stripe_stats[0]["a"].get("sum") is None + assert stripe_stats[0]["b"].get("sum") is None + assert stripe_stats[0]["c"].get("sum") == minint64 + 1 + + +def test_empty_statistics(): + buff = BytesIO() + orc_schema = po.Struct( + a=po.BigInt(), + b=po.Double(), + c=po.String(), + d=po.Decimal(11, 2), + e=po.Date(), + f=po.Timestamp(), + g=po.Boolean(), + h=po.Binary(), + i=po.BigInt(), + # One column with non null value, else cudf/pyorc readers crash + ) + data = tuple([None] * (len(orc_schema.fields) - 1) + [1]) + with po.Writer(buff, orc_schema) as writer: + writer.write(data) + + got = cudf.io.orc.read_orc_statistics([buff]) + + # Check for both file and stripe stats + for stats in got: + # Similar expected stats for the first 6 columns in this case + for col_name in ascii_lowercase[:6]: + assert stats[0][col_name].get("number_of_values") == 0 + assert stats[0][col_name].get("has_null") is True + assert stats[0][col_name].get("minimum") is None + assert stats[0][col_name].get("maximum") is None + for col_name in ascii_lowercase[:3]: + assert stats[0][col_name].get("sum") == 0 + # Sum for decimal column is a string + assert stats[0]["d"].get("sum") == "0" + + assert stats[0]["g"].get("number_of_values") == 0 + assert stats[0]["g"].get("has_null") is True + assert stats[0]["g"].get("true_count") == 0 + assert stats[0]["g"].get("false_count") == 0 + + assert stats[0]["h"].get("number_of_values") == 0 + assert stats[0]["h"].get("has_null") is True + assert stats[0]["h"].get("sum") == 0 + + assert stats[0]["i"].get("number_of_values") == 1 + assert stats[0]["i"].get("has_null") is False + assert stats[0]["i"].get("minimum") == 1 + assert stats[0]["i"].get("maximum") == 1 + assert stats[0]["i"].get("sum") == 1