Skip to content

Commit

Permalink
Added test io.Avro object/class + minor refactor (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajayborra authored and tovbinm committed Aug 11, 2018
1 parent 3bd0494 commit dc46866
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 14 deletions.
Binary file added test-data/PassengerDataAll.avro
Binary file not shown.
18 changes: 18 additions & 0 deletions utils/src/main/scala/com/salesforce/op/test/TestCommon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.scalatest._

import scala.collection.JavaConverters._
import scala.io.Source
import scala.language.postfixOps
import scala.reflect.{ClassTag, _}

/**
Expand Down Expand Up @@ -66,6 +67,23 @@ trait TestCommon extends Matchers with Assertions {
def apply(klazz: Class[_]): String = klazz.getSimpleName.stripSuffix("$")
}

/**
* Test data directory
* @return directory path
*/
def testDataDir: String = {
Some(new File("test-data")) filter (_.isDirectory) getOrElse new File("../test-data") getPath
}

/**
* Load a file as string
* @param path absolute or relative path of a file
* @return the whole content of resource file as a string
*/
def loadFile(path: String): String = {
Source.fromFile(path).mkString
}

/**
* Load a test resource file
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object AvroInOut {
case ex: Exception => throw new IllegalArgumentException(s"Bad path $firstPath: ${ex.getMessage}")
}
val found = paths.filter(p => fs.exists(new Path(p)))
if (found.isEmpty) throw new IllegalArgumentException("No valid directory found in the list of paths <<$path>>")
if (found.isEmpty) throw new IllegalArgumentException(s"No valid directory found in the list of paths <<$path>>")
found.mkString(",")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,69 @@

package com.salesforce.op.utils.io.avro

import java.io.{File, FileWriter}
import java.io.{File, FileNotFoundException, FileWriter}

import com.salesforce.op.test.TestSparkContext
import com.salesforce.op.utils.io.avro.AvroInOut._
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class AvroInOutTest extends FlatSpec with TestSparkContext {
val avroSchemaPath = s"$testDataDir/PassengerDataAll.avsc"
val avroFilePath = s"$testDataDir/PassengerDataAll.avro"
val avroFileRecordCount = 891
val hdfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
lazy val avroTemp: String = tempDir + "/avro-inout-test"

"AvroInOut" should "readPathSeq" in {
// TODO
Spec(AvroInOut.getClass) should "creates RDD from an avro file" in {
val res = readPathSeq(avroFilePath, withCount = true, deepCopy = true, persist = false)
res shouldBe a[RDD[_]]
res.count shouldBe avroFileRecordCount
}

"AvroWriter" should "read" in {
// TODO
it should "creates RDD from a sequence of avro files" in {
val res = readPathSeq(s"$avroFilePath,$avroFilePath")
res.count shouldBe avroFileRecordCount*2
}

it should "create RDD from a mixed sequence of valid and invalid avro files" in {
val res = readPathSeq(s"badfile/path1,$avroFilePath,badfile/path2,$avroFilePath,badfile/path3")
res.count shouldBe avroFileRecordCount*2
}

it should "throw an error if passed in avro files are invalid" in {
val error = intercept[IllegalArgumentException](readPathSeq("badfile/path1,badfile/path2"))
error.getMessage shouldBe "No valid directory found in the list of paths <<badfile/path1,badfile/path2>>"
}

it should "creates Some(RDD) from an avro file" in {
val res = read(avroFilePath)
res.size shouldBe 1
res.get shouldBe an[RDD[_]]
res.get.count shouldBe avroFileRecordCount
}

it should "create None from an invalid avro file" in {
val res = read("badfile/path")
res shouldBe None
}

Spec[AvroWriter[_]] should "writeAvro to filesystem" in {
val avroData = readPathSeq(avroFilePath).asInstanceOf[RDD[GenericRecord]]
val avroSchema = loadFile(avroSchemaPath)

val error = intercept[FileNotFoundException](hdfs.listStatus(new Path(avroTemp)))
error.getMessage shouldBe s"File $avroTemp does not exist"

AvroWriter(avroData).writeAvro(avroTemp, avroSchema)
val hdfsFiles = hdfs.listStatus(new Path(avroTemp)) filter (x => x.getPath.getName.contains("part"))
val res = readPathSeq((for { x <- hdfsFiles } yield avroTemp + "/" + x.getPath.getName).mkString(","))
res.count shouldBe avroFileRecordCount
}

it should "checkPathsExist" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,17 @@

package com.salesforce.op.utils.io.csv

import java.io.File

import com.salesforce.op.test.TestSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

import scala.language.postfixOps

@RunWith(classOf[JUnitRunner])
class CSVInOutTest extends FlatSpec with TestSparkContext {
private val testDataDirPath: String = {
Some(new File("test-data")) filter (_.isDirectory) getOrElse new File("../test-data") getPath
}
private val csvReader = new CSVInOut(CSVOptions(header = true))
private val csvFile = s"$testDataDirPath/PassengerDataAllWithHeader.csv"
private val csvFile = s"$testDataDir/PassengerDataAllWithHeader.csv"

Spec[CSVInOut] should "throw error for bad file paths with DataFrame" in {
val error = intercept[AnalysisException](csvReader.readDataFrame("/bad/file/path/read/dataframe"))
Expand Down

0 comments on commit dc46866

Please sign in to comment.