Skip to content

Commit

Permalink
fix maptype doesn't use the frameless injection properly
Browse files Browse the repository at this point in the history
  • Loading branch information
ukby1234 committed Jan 23, 2025
1 parent 7ec79fc commit 3e9715b
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ trait FromCatalystHelpers {
input,
(in: Expression) => singleFieldValueFromCatalyst(mapEntryCmp, keyDesc, in),
(in: Expression) => singleFieldValueFromCatalyst(mapEntryCmp, valDesc, in),
ProtoSQL.dataTypeFor(fd).asInstanceOf[MapType],
protoSql.dataTypeFor(fd).asInstanceOf[MapType],
classOf[Vector[(Any, Any)]]
)
val objs = MyCatalystToExternalMap(urobjs)
Expand Down
4 changes: 4 additions & 0 deletions sparksql-scalapb/src/test/protobuf/customizations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ message BothTimestampTypes {
google.protobuf.Timestamp google_ts = 1;
google.protobuf.Timestamp google_ts_as_sql_ts = 2 [(scalapb.field).type = "java.sql.Timestamp"];
}

message TimestampTypesMap {
map<string, SQLTimestampFromGoogleTimestamp> map_field = 1;
}
4 changes: 2 additions & 2 deletions sparksql-scalapb/src/test/scala/PersonSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ class PersonSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
}

"UDFs that returns protos" should "work when reading local files" in {
val df = spark.read.json("./sparksql-scalapb/src/test/assets/address.json")
val df = spark.read.json(getClass.getResource("/address.json").toURI.toString)

val returnAddress = ProtoSQL.udf { s: String => Address() }

Expand Down Expand Up @@ -349,7 +349,7 @@ class PersonSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
"parsing null repeated from json" should "work" in {
spark.read
.schema(ProtoSQL.schemaFor[Person].asInstanceOf[types.StructType])
.json("./sparksql-scalapb/src/test/assets/person_null_repeated.json")
.json(getClass.getResource("/person_null_repeated.json").toURI.toString)
.as[Person]
.collect() must contain theSameElementsAs Seq(
Person().withTags(Seq("foo", "bar")),
Expand Down
18 changes: 17 additions & 1 deletion sparksql-scalapb/src/test/scala/TimestampSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import org.scalatest.matchers.must.Matchers
import scalapb.spark.test3.customizations.{
BothTimestampTypes,
SQLTimestampFromGoogleTimestamp,
StructFromGoogleTimestamp
StructFromGoogleTimestamp,
TimestampTypesMap
}

import java.sql.{Timestamp => SQLTimestamp}
Expand Down Expand Up @@ -158,6 +159,21 @@ class TimestampSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
)
}

"spark.createDataset from proto messages with spark timestamp in map" should "be able to convert items with correct timestamp values" in {
import ProtoSQL.withSparkTimestamps.implicits._

val value = TimestampTypesMap(mapField =
Map(
"a" -> SQLTimestampFromGoogleTimestamp(googleTsAsSqlTs = Some(sqlTimestampMicrosPrecision))
)
)
val ds: Dataset[TimestampTypesMap] = spark.createDataset(Seq(value))

ds.collect() must contain theSameElementsAs Seq(
value
)
}

"df with case class timestamp as well as both types of google timestamp" should "not have StructType for timestamps" in {
import ProtoSQL.withSparkTimestamps.implicits._

Expand Down

0 comments on commit 3e9715b

Please sign in to comment.