Skip to content

Commit

Permalink
Fixes to parquet schema api
Browse files Browse the repository at this point in the history
  • Loading branch information
manojlds committed Dec 1, 2017
1 parent 9b8cae6 commit 72ae0ee
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions schemer-core/src/main/scala/schemer/ParquetSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import org.apache.spark.sql.types.StructType
import scala.reflect.runtime.universe._

case class ParquetSchemaBase[T <: SchemaLike: TypeTag](override val options: Map[String, String] = Map())
extends SchemaLikeBase[T] {
extends SchemaLikeBase[ParquetSchema] {
override def infer(paths: String*)(implicit spark: SparkSession) = {
val schema = spark.read.parquet(paths: _*).schema
(typeOf[T] match {
case t if t =:= typeOf[AvroSchema] => AvroSchema(schema)
case t if t =:= typeOf[JSONSchema] => JSONSchema(schema)
case t if t =:= typeOf[CSVSchema] => CSVSchema(schema, options)
}).asInstanceOf[T]
val underlyingSchema = typeOf[T] match {
case t if t =:= typeOf[AvroSchema] => ("avro", AvroSchema(schema))
case t if t =:= typeOf[JSONSchema] => ("json", JSONSchema(schema))
case t if t =:= typeOf[CSVSchema] => ("csv", CSVSchema(schema, options))
}

ParquetSchema(underlyingSchema._2.schema(), underlyingSchema._1)
}
}

Expand Down

0 comments on commit 72ae0ee

Please sign in to comment.