diff --git a/schemer-core/src/main/scala/schemer/CSVSchema.scala b/schemer-core/src/main/scala/schemer/CSVSchema.scala index 0941802..275ebc5 100644 --- a/schemer-core/src/main/scala/schemer/CSVSchema.scala +++ b/schemer-core/src/main/scala/schemer/CSVSchema.scala @@ -1,5 +1,6 @@ package schemer +import com.fasterxml.jackson.annotation.JsonProperty import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} import schemer.utils.JSONUtil @@ -29,8 +30,8 @@ case class CSVSchemaBase(csvOptions: CSVOptions) extends SchemaLikeBase[CSVSchem } case class CSVSchema( - fields: List[CSVField], - options: CSVOptions + @JsonProperty(required = true) fields: List[CSVField], + options: CSVOptions = CSVOptions() ) extends SchemaLike { override def validate: List[String] = diff --git a/schemer-core/src/main/scala/schemer/Schemer.scala b/schemer-core/src/main/scala/schemer/Schemer.scala index b6da6a1..700ec3f 100644 --- a/schemer-core/src/main/scala/schemer/Schemer.scala +++ b/schemer-core/src/main/scala/schemer/Schemer.scala @@ -1,3 +1,38 @@ package schemer -object Schemer {} +sealed trait SchemaType { + val `type`: String +} + +object SchemaType { + case object Avro extends SchemaType { + override val `type`: String = "avro" + } + case object Csv extends SchemaType { + override val `type`: String = "csv" + } + case object Json extends SchemaType { + override val `type`: String = "json" + } + case object ParquetAvro extends SchemaType { + override val `type`: String = "parquet_avro" + } + case object ParquetCsv extends SchemaType { + override val `type`: String = "parquet_csv" + } + case object ParquetJson extends SchemaType { + override val `type`: String = "parquet_json" + } + val supportedTypes = List(Avro, Csv, Json, ParquetAvro, ParquetCsv, ParquetJson) +} + +object Schemer { + def from(`type`: String, config: String): SchemaLike = `type` match { + case "avro" => AvroSchema(config) + case "csv" => CSVSchema(config) + case "json" => JSONSchema(config) + case "parquet_avro" => ParquetSchema(config, ParquetSchemaType.Avro) + case "parquet_csv" => ParquetSchema(config, ParquetSchemaType.Csv) + case "parquet_json" => ParquetSchema(config, ParquetSchemaType.Json) + } +} diff --git a/schemer-core/src/main/scala/schemer/utils/JSONUtil.scala b/schemer-core/src/main/scala/schemer/utils/JSONUtil.scala index 8aff255..d299ee2 100644 --- a/schemer-core/src/main/scala/schemer/utils/JSONUtil.scala +++ b/schemer-core/src/main/scala/schemer/utils/JSONUtil.scala @@ -1,6 +1,6 @@ package schemer.utils -import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.annotation.JsonInclude.Include diff --git a/schemer-core/src/test/scala/schemer/CSVSchemaSpec.scala b/schemer-core/src/test/scala/schemer/CSVSchemaSpec.scala index 56ec41b..16deac7 100644 --- a/schemer-core/src/test/scala/schemer/CSVSchemaSpec.scala +++ b/schemer-core/src/test/scala/schemer/CSVSchemaSpec.scala @@ -5,6 +5,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.scalatest._ +import scala.util.Try + class CSVSchemaSpec extends FlatSpec with Matchers { implicit val spark: SparkSession = SparkSession.builder .config(new SparkConf()) @@ -83,4 +85,18 @@ class CSVSchemaSpec extends FlatSpec with Matchers { ) ) } + + it should "handle empty fields" in { + val schema = CSVSchema( + "{\"fields\":[], \"options\": {}}" + ) + + schema.sparkSchema() should be( + StructType(List()) + ) + } + + it should "handle error parsing json" in { + Try(CSVSchema("{}")).failed.get.getMessage should startWith("Missing required creator property 'fields'") + } } diff --git a/schemer-registry/src/main/scala/schemer/registry/graphql/GraphQLService.scala b/schemer-registry/src/main/scala/schemer/registry/graphql/GraphQLService.scala index 84acc8d..a77b08a 100644 --- a/schemer-registry/src/main/scala/schemer/registry/graphql/GraphQLService.scala +++ b/schemer-registry/src/main/scala/schemer/registry/graphql/GraphQLService.scala @@ -1,27 +1,25 @@ package schemer.registry.graphql -import java.nio.charset.StandardCharsets -import java.util.{Base64, UUID} +import java.util.UUID import akka.actor.{ActorRef, ActorSystem} import akka.pattern.{ask, AskTimeoutException} import akka.util.Timeout +import com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException import org.apache.spark.sql.SparkSession import sangria.macros.derive.GraphQLField import schemer._ +import schemer.registry.Cursor import schemer.registry.actors._ import schemer.registry.dao.{PaginatedFilter, SchemaDao} -import schemer.registry.models._ -import schemer.registry.utils.Clock -import com.github.mauricio.async.db.postgresql.exceptions.GenericDatabaseException -import org.joda.time.DateTime -import schemer.registry.Cursor import schemer.registry.exceptions.{ SchemerException, SchemerInferenceException, SchemerSchemaCreationException, SchemerSchemaVersionCreationException } +import schemer.registry.models._ +import schemer.registry.utils.Clock import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps @@ -64,7 +62,16 @@ class GraphQLService( .find(schemaId) .flatMap { case Some(schema) => - schemaDao.createVersion(SchemaVersion(null, schema.id, version, schemaConfig, clock.nowUtc, user)) + val errors = Schemer.from(schema.`type`, schemaConfig).validate + if (errors.isEmpty) { + schemaDao.createVersion(SchemaVersion(null, schema.id, version, schemaConfig, clock.nowUtc, user)) + } else { + Future.failed( + SchemerSchemaVersionCreationException( + s"Error(s) validating schema config - ${errors.mkString("[", ", ", "]")}" + ) + ) + } case None => Future.failed(SchemerSchemaVersionCreationException(s"Schema with id $schemaId not found")) } .recoverWith { diff --git a/schemer-registry/src/main/scala/schemer/registry/graphql/schema/SchemaType.scala b/schemer-registry/src/main/scala/schemer/registry/graphql/schema/SchemaType.scala index 7901908..3499548 100644 --- a/schemer-registry/src/main/scala/schemer/registry/graphql/schema/SchemaType.scala +++ b/schemer-registry/src/main/scala/schemer/registry/graphql/schema/SchemaType.scala @@ -2,6 +2,7 @@ package schemer.registry.graphql.schema import sangria.macros.derive.deriveObjectType import sangria.schema.{Field, ObjectType, _} +import schemer.{SchemaType => SSchemaType} import schemer.registry.graphql.{SchemaVersionLatestDeferred, SchemaVersionsDeferred} import schemer.registry.graphql.schema.SchemaDefinition.constantComplexity import schemer.registry.models.{ @@ -9,8 +10,7 @@ import schemer.registry.models.{ SchemaSchemaVersionConnection, SchemaSchemaVersionEdge, SchemaVersion, - Schema => SSchema, - SchemaType => SSchemaType + Schema => SSchema } trait SchemaType extends GraphQLCustomTypes { @@ -21,7 +21,9 @@ trait SchemaType extends GraphQLCustomTypes { EnumValue("Avro", value = SSchemaType.Avro), EnumValue("Csv", value = SSchemaType.Csv), EnumValue("Json", value = SSchemaType.Json), - EnumValue("Parquet", value = SSchemaType.Parquet) + EnumValue("ParquetAvro", value = SSchemaType.ParquetAvro), + EnumValue("ParquetCsv", value = SSchemaType.ParquetCsv), + EnumValue("ParquetJson", value = SSchemaType.ParquetJson) ) ) lazy implicit val FirstArg = Argument("first", OptionInputType(IntType)) diff --git a/schemer-registry/src/main/scala/schemer/registry/models/Schema.scala b/schemer-registry/src/main/scala/schemer/registry/models/Schema.scala index 4b8e82d..a7de53a 100644 --- a/schemer-registry/src/main/scala/schemer/registry/models/Schema.scala +++ b/schemer-registry/src/main/scala/schemer/registry/models/Schema.scala @@ -4,27 +4,6 @@ import java.util.UUID import org.joda.time.DateTime -sealed trait SchemaType { - val `type`: String -} - -object SchemaType { - case object Avro extends SchemaType { - override val `type`: String = "avro" - } - case object Csv extends SchemaType { - override val `type`: String = "csv" - } - case object Json extends SchemaType { - override val `type`: String = "json" - } - case object Parquet extends SchemaType { - override val `type`: String = "parquet" - } - - val supportedTypes = List(Avro, Csv, Json, Parquet) -} - case class Schema( id: UUID, name: String,