Skip to content

Commit

Permalink
Refactor schema types handling
Browse files Browse the repository at this point in the history
  • Loading branch information
manojlds committed Feb 18, 2018
1 parent 733abef commit db04882
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 36 deletions.
5 changes: 3 additions & 2 deletions schemer-core/src/main/scala/schemer/CSVSchema.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package schemer

import com.fasterxml.jackson.annotation.JsonProperty
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import schemer.utils.JSONUtil
Expand Down Expand Up @@ -29,8 +30,8 @@ case class CSVSchemaBase(csvOptions: CSVOptions) extends SchemaLikeBase[CSVSchem
}

case class CSVSchema(
fields: List[CSVField],
options: CSVOptions
@JsonProperty(required = true) fields: List[CSVField],
options: CSVOptions = CSVOptions()
) extends SchemaLike {

override def validate: List[String] =
Expand Down
37 changes: 36 additions & 1 deletion schemer-core/src/main/scala/schemer/Schemer.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,38 @@
package schemer

object Schemer {}
sealed trait SchemaType {
val `type`: String
}

object SchemaType {
case object Avro extends SchemaType {
override val `type`: String = "avro"
}
case object Csv extends SchemaType {
override val `type`: String = "csv"
}
case object Json extends SchemaType {
override val `type`: String = "json"
}
case object ParquetAvro extends SchemaType {
override val `type`: String = "parquet_avro"
}
case object ParquetCsv extends SchemaType {
override val `type`: String = "parquet_csv"
}
case object ParquetJson extends SchemaType {
override val `type`: String = "parquet_json"
}
val supportedTypes = List(Avro, Csv, Json, ParquetAvro, ParquetCsv, ParquetJson)
}

object Schemer {
def from(`type`: String, config: String): SchemaLike = `type` match {
case "avro" => AvroSchema(config)
case "csv" => CSVSchema(config)
case "json" => JSONSchema(config)
case "parquet_avro" => ParquetSchema(config, ParquetSchemaType.Avro)
case "parquet_csv" => ParquetSchema(config, ParquetSchemaType.Csv)
case "parquet_json" => ParquetSchema(config, ParquetSchemaType.Json)
}
}
2 changes: 1 addition & 1 deletion schemer-core/src/main/scala/schemer/utils/JSONUtil.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package schemer.utils

import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.annotation.JsonInclude.Include

Expand Down
16 changes: 16 additions & 0 deletions schemer-core/src/test/scala/schemer/CSVSchemaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest._

import scala.util.Try

class CSVSchemaSpec extends FlatSpec with Matchers {
implicit val spark: SparkSession = SparkSession.builder
.config(new SparkConf())
Expand Down Expand Up @@ -83,4 +85,18 @@ class CSVSchemaSpec extends FlatSpec with Matchers {
)
)
}

it should "handle empty fields" in {
val schema = CSVSchema(
"{\"fields\":[], \"options\": {}}"
)

schema.sparkSchema() should be(
StructType(List())
)
}

it should "handle error parsing json" in {
Try(CSVSchema("{}")).failed.get.getMessage should startWith("Missing required creator property 'fields'")
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package schemer.registry.graphql

import java.nio.charset.StandardCharsets
import java.util.{Base64, UUID}
import java.util.UUID

import akka.actor.{ActorRef, ActorSystem}
import akka.pattern.{ask, AskTimeoutException}
import akka.util.Timeout
import com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException
import org.apache.spark.sql.SparkSession
import sangria.macros.derive.GraphQLField
import schemer._
import schemer.registry.Cursor
import schemer.registry.actors._
import schemer.registry.dao.{PaginatedFilter, SchemaDao}
import schemer.registry.models._
import schemer.registry.utils.Clock
import com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException
import org.joda.time.DateTime
import schemer.registry.Cursor
import schemer.registry.exceptions.{
SchemerException,
SchemerInferenceException,
SchemerSchemaCreationException,
SchemerSchemaVersionCreationException
}
import schemer.registry.models._
import schemer.registry.utils.Clock

import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
Expand Down Expand Up @@ -64,7 +62,16 @@ class GraphQLService(
.find(schemaId)
.flatMap {
case Some(schema) =>
schemaDao.createVersion(SchemaVersion(null, schema.id, version, schemaConfig, clock.nowUtc, user))
val errors = Schemer.from(schema.`type`, schemaConfig).validate
if (errors.isEmpty) {
schemaDao.createVersion(SchemaVersion(null, schema.id, version, schemaConfig, clock.nowUtc, user))
} else {
Future.failed(
SchemerSchemaVersionCreationException(
s"Error(s) validating schema config - ${errors.mkString("[", ", ", "]")}"
)
)
}
case None => Future.failed(SchemerSchemaVersionCreationException(s"Schema with id $schemaId not found"))
}
.recoverWith {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package schemer.registry.graphql.schema

import sangria.macros.derive.deriveObjectType
import sangria.schema.{Field, ObjectType, _}
import schemer.{SchemaType => SSchemaType}
import schemer.registry.graphql.{SchemaVersionLatestDeferred, SchemaVersionsDeferred}
import schemer.registry.graphql.schema.SchemaDefinition.constantComplexity
import schemer.registry.models.{
PageInfo,
SchemaSchemaVersionConnection,
SchemaSchemaVersionEdge,
SchemaVersion,
Schema => SSchema,
SchemaType => SSchemaType
Schema => SSchema
}

trait SchemaType extends GraphQLCustomTypes {
Expand All @@ -21,7 +21,9 @@ trait SchemaType extends GraphQLCustomTypes {
EnumValue("Avro", value = SSchemaType.Avro),
EnumValue("Csv", value = SSchemaType.Csv),
EnumValue("Json", value = SSchemaType.Json),
EnumValue("Parquet", value = SSchemaType.Parquet)
EnumValue("ParquetAvro", value = SSchemaType.ParquetAvro),
EnumValue("ParquetCsv", value = SSchemaType.ParquetCsv),
EnumValue("ParquetJson", value = SSchemaType.ParquetJson)
)
)
lazy implicit val FirstArg = Argument("first", OptionInputType(IntType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,6 @@ import java.util.UUID

import org.joda.time.DateTime

sealed trait SchemaType {
val `type`: String
}

object SchemaType {
case object Avro extends SchemaType {
override val `type`: String = "avro"
}
case object Csv extends SchemaType {
override val `type`: String = "csv"
}
case object Json extends SchemaType {
override val `type`: String = "json"
}
case object Parquet extends SchemaType {
override val `type`: String = "parquet"
}

val supportedTypes = List(Avro, Csv, Json, Parquet)
}

case class Schema(
id: UUID,
name: String,
Expand Down

0 comments on commit db04882

Please sign in to comment.