Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added test coverage for CSVToAvro util #50

Merged
merged 5 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions utils/src/main/scala/com/salesforce/op/test/TestCommon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ import scala.reflect.{ClassTag, _}
* Trait with test commons such as Spec and Resource functions
*/
trait TestCommon extends Matchers with Assertions {
private val _resourceDir = "src/test/resources"

/**
* Returns the resource directory path
*/
protected def resourceDir = _resourceDir
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to create a private val _resourceDir = "src/test/resources" simply do:

protected def resourceDir: String ="src/test/resources"


/**
* Set logging level for
Expand Down Expand Up @@ -91,7 +97,7 @@ trait TestCommon extends Matchers with Assertions {
* @param name resource name
* @return resource file
*/
def resourceFile(parent: String = "src/test/resources", name: String): File = {
def resourceFile(parent: String = resourceDir, name: String): File = {
val file = new File(parent, name)
if (!file.canRead) throw new IllegalStateException(s"File $file unreadable")
file
Expand All @@ -106,7 +112,7 @@ trait TestCommon extends Matchers with Assertions {
* @return resource file
*/
@deprecated("Use loadResource", "3.2.3")
def resourceString(parent: String = "src/test/resources", noSpaces: Boolean = true, name: String): String = {
def resourceString(parent: String = resourceDir, noSpaces: Boolean = true, name: String): String = {
val file = resourceFile(parent = parent, name = name)
val contents = Source.fromFile(file, "UTF-8").mkString
if (noSpaces) contents.replaceAll("\\s", "") else contents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
package com.salesforce.op.utils.avro

import org.apache.avro.generic.GenericRecord
import scala.collection.JavaConverters._


import scala.util.Try

object RichGenericRecord {

Expand All @@ -55,7 +51,7 @@ object RichGenericRecord {
def getValue[T](fieldName: String): Option[T] = {
// Check that field exists in schema
require(Option(r.getSchema.getField(fieldName)).isDefined,
s"${fieldName} is not found in Avro schema!")
s"$fieldName is not found in Avro schema!")

val field = Option(r.get(fieldName))
(field map {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,false,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,fail,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,true,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,true,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,false,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
5 changes: 5 additions & 0 deletions utils/src/test/resources/PassengerDataModifiedDataTypes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,false,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,true,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,true,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,true,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,false,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
46 changes: 46 additions & 0 deletions utils/src/test/resources/PassengerSchemaInvalidField.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"type" : "record",
"name" : "Passenger",
"namespace" : "com.salesforce.app.schema",
"fields" : [ {
"name" : "PassengerId",
"type" : [ "int", "null" ]
}, {
"name" : "Survived",
"type" : "boolean",
"default": false
}, {
"name" : "Pclass",
"type" : [ "int", "null" ]
}, {
"name" : "Name",
"type" : [ "string", "null" ]
}, {
"name" : "Sex",
"type" : [ "string", "null" ]
}, {
"name" : "Age",
"type" : [ "double", "null" ]
}, {
"name" : "SibSp",
"type" : [ "int", "null" ]
}, {
"name" : "Parch",
"type" : [ "long", "null" ]
}, {
"name" : "Ticket",
"type" : [ "string", "null" ]
}, {
"name" : "Fare",
"type" : [ "float", "null" ]
}, {
"name" : "Cabin",
"type" : [ "string", "null" ]
}, {
"name" : "Embarked",
"type" : [ "string", "null" ]
}, {
"name" : "FailTest",
"type" : [ "string", "null" ]
} ]
}
43 changes: 43 additions & 0 deletions utils/src/test/resources/PassengerSchemaModifiedDataTypes.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"type" : "record",
"name" : "Passenger",
"namespace" : "com.salesforce.app.schema",
"fields" : [ {
"name" : "PassengerId",
"type" : [ "int", "null" ]
}, {
"name" : "Survived",
"type" : "boolean",
"default": false
}, {
"name" : "Pclass",
"type" : [ "int", "null" ]
}, {
"name" : "Name",
"type" : [ "string", "null" ]
}, {
"name" : "Sex",
"type" : [ "string", "null" ]
}, {
"name" : "Age",
"type" : [ "double", "null" ]
}, {
"name" : "SibSp",
"type" : [ "int", "null" ]
}, {
"name" : "Parch",
"type" : [ "long", "null" ]
}, {
"name" : "Ticket",
"type" : [ "string", "null" ]
}, {
"name" : "Fare",
"type" : [ "float", "null" ]
}, {
"name" : "Cabin",
"type" : [ "string", "null" ]
}, {
"name" : "Embarked",
"type" : [ "string", "null" ]
} ]
}
47 changes: 47 additions & 0 deletions utils/src/test/resources/PassengerSchemaNestedTypeCSV.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"type" : "record",
"name" : "Passenger",
"namespace" : "com.salesforce.app.schema",
"fields" : [ {
"name" : "PassengerId",
"type" : [ "int", "null" ]
}, {
"name" : "Survived",
"type" : "boolean",
"default": false
}, {
"name" : "Pclass",
"type" : [ "int", "null" ]
}, {
"name" : "Name",
"type" : [ "string", "null" ]
}, {
"name" : "Sex",
"type" : {
"name": "Sex",
"type": "enum",
"symbols": [ "male", "female" ]
}
}, {
"name" : "Age",
"type" : [ "double", "null" ]
}, {
"name" : "SibSp",
"type" : [ "int", "null" ]
}, {
"name" : "Parch",
"type" : [ "int", "null" ]
}, {
"name" : "Ticket",
"type" : [ "string", "null" ]
}, {
"name" : "Fare",
"type" : [ "double", "null" ]
}, {
"name" : "Cabin",
"type" : [ "string", "null" ]
}, {
"name" : "Embarked",
"type" : [ "string", "null" ]
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import com.salesforce.op.test.{TestCommon, TestSparkContext}
import com.salesforce.op.utils.io.avro.AvroInOut
import org.apache.avro.generic.GenericRecord
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Matchers}


@RunWith(classOf[JUnitRunner])
Expand All @@ -51,33 +51,42 @@ class RichGenericRecordTest extends FlatSpec
val firstRow = passengerData.first

Spec[RichGenericRecord] should "get value of Int" in {

val id = firstRow.getValue[Int]("passengerId")
id shouldBe Some(1)
}

it should "get value of Double" in {
val survived = firstRow.getValue[Double]("survived")
survived shouldBe Some(0.0)
}

it should "get value of Long" in {
val height = firstRow.getValue[Long]("height")
height shouldBe Some(168L)
}

it should "get value of String" in {
val gender = firstRow.getValue[String]("gender")
gender shouldBe Some("Female")
}

it should "get value of Char" in {
val gender = firstRow.getValue[Char]("gender")
gender shouldBe Some("Female")
}

it should "get value of Float" in {
val age = firstRow.getValue[Float]("age")
age shouldBe Some(32.0)
}

it should "get value of Short" in {
val weight = firstRow.getValue[Short]("weight")
weight shouldBe Some(67)
}

it should "throw error for invalid field" in {
val error = intercept[IllegalArgumentException](firstRow.getValue[Short]("invalidField"))
error.getMessage shouldBe "requirement failed: invalidField is not found in Avro schema!"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.utils.io.csv

import com.salesforce.op.test.{Passenger, TestSparkContext}
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class CSVToAvroTest extends FlatSpec with TestSparkContext {
val avroSchema: String = loadFile(s"$resourceDir/PassengerSchemaModifiedDataTypes.avsc")
val csvReader: CSVInOut = new CSVInOut(CSVOptions(header = true))
lazy val csvRDD: RDD[Seq[String]] = csvReader.readRDD(s"$resourceDir/PassengerDataModifiedDataTypes.csv")
lazy val csvFileRecordCount: Long = csvRDD.count

Spec(CSVToAvro.getClass) should "convert RDD[Seq[String]] to RDD[GenericRecord]" in {
val res = CSVToAvro.toAvro(csvRDD, avroSchema)
res shouldBe a[RDD[_]]
res.count shouldBe csvFileRecordCount
}

it should "convert RDD[Seq[String]] to RDD[T]" in {
val res = CSVToAvro.toAvroTyped[Passenger](csvRDD, avroSchema)
res shouldBe a[RDD[_]]
res.count shouldBe csvFileRecordCount
}

it should "throw an error for nested schema" in {
val invalidAvroSchema = loadFile(s"$resourceDir/PassengerSchemaNestedTypeCSV.avsc")
val exceptionMsg = "CSV should be a flat file and not have nested records (unsupported column(Sex schemaType=ENUM)"
val error = intercept[SparkException](CSVToAvro.toAvro(csvRDD, invalidAvroSchema).count())
error.getCause.getMessage shouldBe exceptionMsg
}

it should "throw an error for mis-matching schema fields" in {
val invalidAvroSchema = loadFile(s"$resourceDir/PassengerSchemaInvalidField.avsc")
val error = intercept[SparkException](CSVToAvro.toAvro(csvRDD, invalidAvroSchema).count())
error.getCause.getMessage shouldBe "Mismatch number of fields in csv record and avro schema"
}

it should "throw an error for bad data" in {
val invalidDataRDD = csvReader.readRDD(s"$resourceDir/PassengerDataContentTypeMisMatch.csv")
val error = intercept[SparkException](CSVToAvro.toAvro(invalidDataRDD, avroSchema).count())
error.getCause.getMessage shouldBe "Boolean column not actually a boolean. Invalid value: 'fail'"
}
}