From 2f37e0782f40f6e70b501a1390c712bf155eabd4 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Wed, 20 Mar 2024 05:00:25 +0700 Subject: [PATCH 1/5] feat: Enable Arrow-based columnar data transfers Signed-off-by: tanlocnguyen --- docs/reference/offline-stores/spark.md | 2 ++ .../feast/templates/spark/feature_repo/feature_store.yaml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/docs/reference/offline-stores/spark.md b/docs/reference/offline-stores/spark.md index 3cca2aab1af..2e2facba64a 100644 --- a/docs/reference/offline-stores/spark.md +++ b/docs/reference/offline-stores/spark.md @@ -30,6 +30,8 @@ offline_store: spark.sql.catalogImplementation: "hive" spark.sql.parser.quotedRegexColumnNames: "true" spark.sql.session.timeZone: "UTC" + spark.sql.execution.arrow.fallback.enabled: "true" + spark.sql.execution.arrow.pyspark.enabled: "true" online_store: path: data/online_store.db ``` diff --git a/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml b/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml index f72c7c65f4b..08383a29e13 100644 --- a/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/spark/feature_repo/feature_store.yaml @@ -12,6 +12,8 @@ offline_store: spark.sql.catalogImplementation: "hive" spark.sql.parser.quotedRegexColumnNames: "true" spark.sql.session.timeZone: "UTC" + spark.sql.execution.arrow.fallback.enabled: "true" + spark.sql.execution.arrow.pyspark.enabled: "true" online_store: path: data/online_store.db entity_key_serialization_version: 2 From c1c99901148d789a6fa21b48edea36d26e24dfb8 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Thu, 21 Mar 2024 22:19:27 +0700 Subject: [PATCH 2/5] fix: Add __eq__, __hash__ to SparkSource for comparision Signed-off-by: tanlocnguyen --- .../contrib/spark_offline_store/spark_source.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index 8cd392ce5df..d3568adac4f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -185,6 +185,21 @@ def get_table_query_string(self) -> str: return f"`{tmp_table_name}`" + def __eq__(self, other): + base_eq = super().__eq__(other) + if not base_eq: + return False + if self.table != other.table: + return False + if self.query != other.query: + return False + if self.path != other.path: + return False + return True + + def __hash__(self): + return super().__hash__() + class SparkOptions: allowed_formats = [format.value for format in SparkSourceFormat] From add7f3a472297c9b0e8794bbf5091b53dc9759e9 Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Fri, 22 Mar 2024 01:23:16 +0700 Subject: [PATCH 3/5] chore: simplify the logic Signed-off-by: tanlocnguyen --- .../contrib/spark_offline_store/spark_source.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index d3568adac4f..0809043a010 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -189,13 +189,11 @@ def __eq__(self, other): base_eq = super().__eq__(other) if not base_eq: return False - if self.table != other.table: - return False - if self.query != other.query: - return False - if self.path != other.path: - return False - return True + return ( + self.table == other.table + and self.query == other.query + and self.path == other.path + ) def __hash__(self): return super().__hash__() From 5bc1ec61f8c6e46999765faf4501b708409b8fbf Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Mon, 8 Apr 2024 21:35:50 +0700 Subject: [PATCH 4/5] feat: Add decimal datatype to spark mapping to feast data type Signed-off-by: tanlocnguyen --- sdk/python/feast/type_map.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index ad3e273d37b..23710051c1c 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -752,7 +752,7 @@ def _non_empty_value(value: Any) -> bool: def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: # TODO not all spark types are convertible - # Current non-convertible types: interval, map, struct, structfield, decimal, binary + # Current non-convertible types: interval, map, struct, structfield, binary type_map: Dict[str, ValueType] = { "null": ValueType.UNKNOWN, "byte": ValueType.BYTES, @@ -762,6 +762,7 @@ def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: "bigint": ValueType.INT64, "long": ValueType.INT64, "double": ValueType.DOUBLE, + "decimal": ValueType.DOUBLE, "float": ValueType.FLOAT, "boolean": ValueType.BOOL, "timestamp": ValueType.UNIX_TIMESTAMP, @@ -774,6 +775,8 @@ def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: "array": ValueType.BOOL_LIST, "array": ValueType.UNIX_TIMESTAMP_LIST, } + if spark_type_as_str.startswith("decimal"): + spark_type_as_str = "decimal" # TODO: Find better way of doing this. if not isinstance(spark_type_as_str, str) or spark_type_as_str not in type_map: return ValueType.NULL From 8dc98b3471aedded4dcce760563cc8edb4caad4d Mon Sep 17 00:00:00 2001 From: tanlocnguyen Date: Wed, 10 Apr 2024 11:37:11 +0700 Subject: [PATCH 5/5] feat: add array decimal Signed-off-by: tanlocnguyen --- sdk/python/feast/type_map.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 23710051c1c..e7fdf971209 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -771,12 +771,15 @@ def spark_to_feast_value_type(spark_type_as_str: str) -> ValueType: "array": ValueType.INT32_LIST, "array": ValueType.INT64_LIST, "array": ValueType.DOUBLE_LIST, + "array": ValueType.DOUBLE_LIST, "array": ValueType.FLOAT_LIST, "array": ValueType.BOOL_LIST, "array": ValueType.UNIX_TIMESTAMP_LIST, } if spark_type_as_str.startswith("decimal"): spark_type_as_str = "decimal" + if spark_type_as_str.startswith("array