Skip to content

Commit

Permalink
Add support for serializing Spark's DecimalType (#947)
Browse files Browse the repository at this point in the history
* Add support for serializing DecimalType in FlintDataType

Signed-off-by: Chase Engelbrecht <[email protected]>

* Fix checkstyle

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add documentation on the new serialization behavior

Signed-off-by: Chase Engelbrecht <[email protected]>

* Fix integ test

Signed-off-by: Chase Engelbrecht <[email protected]>

* Actually fix integ tests

Signed-off-by: Chase Engelbrecht <[email protected]>

* Move the decimal and map IT to the base suite instead of the iceberg suite

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Dec 2, 2024
1 parent 3849248 commit 0943f1f
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ The following table define the data type mapping between Flint data type and Spa
* Spark data types VarcharType(length) and CharType(length) are both currently mapped to Flint data
type *keyword*, dropping their length property. On the other hand, Flint data type *keyword* only
maps to StringType.
* Spark data type MapType is mapped to an empty OpenSearch object. The inner fields then rely on
dynamic mapping. On the other hand, Flint data type *object* only maps to StructType.
* Spark data type DecimalType is mapped to an OpenSearch double. On the other hand, Flint data type
*double* only maps to DoubleType.

Unsupported Spark data types:
* DecimalType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ object FlintDataType {
case ByteType => JObject("type" -> JString("byte"))
case DoubleType => JObject("type" -> JString("double"))
case FloatType => JObject("type" -> JString("float"))
case DecimalType() => JObject("type" -> JString("double"))

// Date
case TimestampType | _: TimestampNTZType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,20 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
|}""".stripMargin)
}

test("spark decimal type serialize") {
val sparkStructType = StructType(
StructField("decimalField", DecimalType(1, 1), true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{
| "properties": {
| "decimalField": {
| "type": "double"
| }
| }
|}""".stripMargin)
}

test("spark varchar and char type serialize") {
val flintDataType = """{
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,5 +523,45 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("create materialized view with decimal and map types") {
val decimalAndMapTable = s"$catalogName.default.mv_test_decimal_map"
val decimalAndMapMv = s"$catalogName.default.mv_test_decimal_map_ser"
withTable(decimalAndMapTable) {
createMapAndDecimalTimeSeriesTable(decimalAndMapTable)

withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $decimalAndMapMv
| AS
| SELECT
| base_score, mymap
| FROM $decimalAndMapTable
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)

// Wait for streaming job complete current micro batch
val flintIndex = getFlintIndexName(decimalAndMapMv)
val job = spark.streams.active.find(_.name == flintIndex)
job shouldBe defined
failAfter(streamingTimeout) {
job.get.processAllAvailable()
}

flint.describeIndex(flintIndex) shouldBe defined
checkAnswer(
flint.queryIndex(flintIndex).select("base_score", "mymap"),
Seq(
Row(3.1415926, Row(null, null, null, null, "mapvalue1")),
Row(4.1415926, Row("mapvalue2", null, null, null, null)),
Row(5.1415926, Row(null, null, "mapvalue3", null, null)),
Row(6.1415926, Row(null, null, null, "mapvalue4", null)),
Row(7.1415926, Row(null, "mapvalue5", null, null, null))))
}
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,34 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
sql(s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 'Vancouver')")
}

protected def createMapAndDecimalTimeSeriesTable(testTable: String): Unit = {
// CSV tables do not support MAP types so we use JSON instead
val finalTableType = if (tableType == "CSV") "JSON" else tableType

sql(s"""
| CREATE TABLE $testTable
| (
| time TIMESTAMP,
| name STRING,
| age INT,
| base_score DECIMAL(8, 7),
| mymap MAP<STRING, STRING>
| )
| USING $finalTableType $tableOptions
|""".stripMargin)

sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:01:00', 'A', 30, 3.1415926, Map('mapkey1', 'mapvalue1'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:10:00', 'B', 20, 4.1415926, Map('mapkey2', 'mapvalue2'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 00:15:00', 'C', 35, 5.1415926, Map('mapkey3', 'mapvalue3'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 01:00:00', 'D', 40, 6.1415926, Map('mapkey4', 'mapvalue4'))")
sql(
s"INSERT INTO $testTable VALUES (TIMESTAMP '2023-10-01 03:00:00', 'E', 15, 7.1415926, Map('mapkey5', 'mapvalue5'))")
}

protected def createTimeSeriesTransactionTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down

0 comments on commit 0943f1f

Please sign in to comment.