From 72ae0ee4c3f5311f00e208f80ee55fb814ff76e8 Mon Sep 17 00:00:00 2001 From: Manoj Date: Fri, 1 Dec 2017 14:16:45 +0530 Subject: [PATCH] Fixes to parquet schema api --- .../src/main/scala/schemer/ParquetSchema.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/schemer-core/src/main/scala/schemer/ParquetSchema.scala b/schemer-core/src/main/scala/schemer/ParquetSchema.scala index 7ab372d..96c5525 100644 --- a/schemer-core/src/main/scala/schemer/ParquetSchema.scala +++ b/schemer-core/src/main/scala/schemer/ParquetSchema.scala @@ -5,14 +5,16 @@ import org.apache.spark.sql.types.StructType import scala.reflect.runtime.universe._ case class ParquetSchemaBase[T <: SchemaLike: TypeTag](override val options: Map[String, String] = Map()) - extends SchemaLikeBase[T] { + extends SchemaLikeBase[ParquetSchema] { override def infer(paths: String*)(implicit spark: SparkSession) = { val schema = spark.read.parquet(paths: _*).schema - (typeOf[T] match { - case t if t =:= typeOf[AvroSchema] => AvroSchema(schema) - case t if t =:= typeOf[JSONSchema] => JSONSchema(schema) - case t if t =:= typeOf[CSVSchema] => CSVSchema(schema, options) - }).asInstanceOf[T] + val underlyingSchema = typeOf[T] match { + case t if t =:= typeOf[AvroSchema] => ("avro", AvroSchema(schema)) + case t if t =:= typeOf[JSONSchema] => ("json", JSONSchema(schema)) + case t if t =:= typeOf[CSVSchema] => ("csv", CSVSchema(schema, options)) + } + + ParquetSchema(underlyingSchema._2.schema(), underlyingSchema._1) } }