-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add ArrayType and explode function #25
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package org.virtuslab.iskra.functions | ||
|
||
import org.apache.spark.sql | ||
import org.virtuslab.iskra.Column | ||
import org.virtuslab.iskra.types.{ ArrayOptType, DataType } | ||
|
||
def explode[T <: DataType](c: Column[ArrayOptType[T]]): Column[T] = Column(sql.functions.explode(c.untyped)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,6 +86,14 @@ object Encoder: | |
type ColumnType = DoubleOptType | ||
def catalystType = sql.types.DoubleType | ||
|
||
inline given arrayFromMirror[A](using encoder: Encoder[A]): (Encoder[Seq[A]] { type ColumnType = ArrayOptType[encoder.ColumnType] }) = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding basic support for arrays is something that probably deserves a separate PR on its own. As it's slightly more complex: we should reuse encoders of element types and support both nullable and nonnullable arrays. I have some drafts of the implementation mixed with other changes locally but I'll try to extract it and get merged to main |
||
new Encoder[Seq[A]]: | ||
override type ColumnType = ArrayOptType[encoder.ColumnType] | ||
override def encode(value: Seq[A]): Any = if (value == null) Seq() else value.map(encoder.encode) | ||
override def decode(value: Any): Any = Seq(encoder.decode) | ||
override def catalystType = sql.types.ArrayType(encoder.catalystType) | ||
override def isNullable = true | ||
|
||
export StructEncoder.{fromMirror, optFromMirror} | ||
|
||
trait StructEncoder[-A] extends Encoder[A]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package org.virtuslab.iskra.test | ||
|
||
class ExplodeTest extends SparkUnitTest: | ||
import org.virtuslab.iskra.api.* | ||
import functions.explode | ||
|
||
case class Foo(ints: Seq[Int]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should consider more cases here: Not only |
||
|
||
val foos = Seq( | ||
Foo(Seq(1)), | ||
Foo(Seq(2)), | ||
Foo(Seq()), | ||
Foo(null), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For maximal type safety optional values should be represented by |
||
Foo(Seq(3,4)) | ||
).toTypedDF | ||
|
||
test("explode") { | ||
val result = foos | ||
.select(explode($.ints).as("int")) | ||
.collectAs[Int] | ||
|
||
result shouldEqual Seq(1,2,3,4) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a way to prevent users from using
explode
more than once in the sameselect
clause as that would result in a runtime error. This constraint doesn't seem to be easy to express in the current model of iskra. However I'm in the middle of a major redesign of the library's model so I'll try to take this use case into account