Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an ability to specify multiple paths to load data from #399

Merged
merged 3 commits into from
Jul 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ $ spark-shell --jars spark-cobol-assembly-2.2.2-SNAPSHOT.jar

## Other Features

### Loading several paths
Currently, specifying multiple paths in `load()` is not supported. Use the following syntax:
```scala
spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("paths", inputPaths.mkString(","))
.load()
```

### Spark SQL schema extraction
This library also provides convenient methods to extract Spark SQL schemas and Cobol layouts from copybooks.

Expand Down Expand Up @@ -1075,6 +1086,7 @@ Again, the full example is available at

| Option (usage example) | Description |
| ------------------------------------------ |:----------------------------------------------------------------------------- |
| .option("paths", "/path1,/path2") | Allows loading data from multiple unrelated paths on the same filesystem. |
| .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. |
| .option("file_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each file. |
| .option("file_end_offset", "0") | Specifies the number of bytes to skip at the end of each file. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class CobolParameters(
copybookPath: Option[String],
multiCopybookPath: Seq[String],
copybookContent: Option[String],
sourcePath: Option[String],
sourcePaths: Seq[String],
isText: Boolean,
isEbcdic: Boolean,
ebcdicCodePage: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ object CobolParametersParser {
val PARAM_MULTI_COPYBOOK_PATH = "copybooks"
val PARAM_COPYBOOK_CONTENTS = "copybook_contents"
val PARAM_SOURCE_PATH = "path"
val PARAM_SOURCE_PATHS = "paths"
val PARAM_ENCODING = "encoding"
val PARAM_PEDANTIC = "pedantic"
val PARAM_RECORD_LENGTH_FIELD = "record_length_field"
Expand Down Expand Up @@ -208,11 +209,13 @@ object CobolParametersParser {
}
}

val paths = getParameter(PARAM_SOURCE_PATHS, params).map(_.split(',')).getOrElse(Array(getParameter(PARAM_SOURCE_PATH, params).get))

val cobolParameters = CobolParameters(
getParameter(PARAM_COPYBOOK_PATH, params),
params.getOrElse(PARAM_MULTI_COPYBOOK_PATH, "").split(','),
getParameter(PARAM_COPYBOOK_CONTENTS, params),
getParameter(PARAM_SOURCE_PATH, params),
paths,
params.getOrElse(PARAM_IS_TEXT, "false").toBoolean,
isEbcdic,
ebcdicCodePageName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ import scala.util.control.NonFatal


class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit =
private def writeObject(out: ObjectOutputStream): Unit =
try {
out.defaultWriteObject()
value.write(out)
out.defaultWriteObject()
value.write(out)
} catch {
case NonFatal(e) =>
throw new IOException(e)
}

private def readObject(in: ObjectInputStream): Unit =
private def readObject(in: ObjectInputStream): Unit =
try {
value = new Configuration(false)
value.readFields(in)
value = new Configuration(false)
value.readFields(in)
} catch {
case NonFatal(e) =>
throw new IOException(e)
Expand All @@ -63,18 +63,18 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser
*
* Its constructor is expected to change after the hierarchy of [[za.co.absa.cobrix.spark.cobol.reader.Reader]] is put in place.
*/
class CobolRelation(sourceDir: String,
class CobolRelation(sourceDirs: Seq[String],
cobolReader: Reader,
localityParams: LocalityParameters,
debugIgnoreFileSize: Boolean
)(@transient val sqlContext: SQLContext)
extends BaseRelation
with Serializable
with TableScan {
with Serializable
with TableScan {

private val logger = LoggerFactory.getLogger(this.getClass)

private val filesList = getListFilesWithOrder(sourceDir)
private val filesList = getListFilesWithOrder(sourceDirs)

private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext)(localityParams)

Expand All @@ -83,12 +83,11 @@ class CobolRelation(sourceDir: String,
}

override def buildScan(): RDD[Row] = {

cobolReader match {
case blockReader: FixedLenTextReader =>
CobolScanners.buildScanForTextFiles(blockReader, sourceDir, parseRecords, sqlContext)
CobolScanners.buildScanForTextFiles(blockReader, sourceDirs, parseRecords, sqlContext)
case blockReader: FixedLenReader =>
CobolScanners.buildScanForFixedLength(blockReader, sourceDir, parseRecords, debugIgnoreFileSize, sqlContext)
CobolScanners.buildScanForFixedLength(blockReader, sourceDirs, parseRecords, debugIgnoreFileSize, sqlContext)
case streamReader: VarLenReader if streamReader.isIndexGenerationNeeded =>
CobolScanners.buildScanForVarLenIndex(streamReader, indexes, filesList, sqlContext)
case streamReader: VarLenReader =>
Expand All @@ -104,13 +103,15 @@ class CobolRelation(sourceDir: String,
*
* The List contains [[za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder]] instances.
*/
private def getListFilesWithOrder(sourceDir: String): Array[FileWithOrder] = {
private def getListFilesWithOrder(sourceDirs: Seq[String]): Array[FileWithOrder] = {
val allFiles = sourceDirs.flatMap(sourceDir => {
FileUtils
.getFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, isRecursiveRetrieval)
}).toArray

FileUtils
.getFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, isRecursiveRetrieval)
allFiles
.zipWithIndex
.map(file => FileWithOrder(file._1, file._2))
.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DefaultSource
val cobolParameters = CobolParametersParser.parse(new Parameters(parameters))
CobolParametersValidator.checkSanity(cobolParameters)

new CobolRelation(parameters(PARAM_SOURCE_PATH),
new CobolRelation(cobolParameters.sourcePaths,
buildEitherReader(sqlContext.sparkSession, cobolParameters),
LocalityParameters.extract(cobolParameters),
cobolParameters.debugIgnoreFileSize)(sqlContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import za.co.absa.cobrix.spark.cobol.utils.FileNameUtils
object CobolParametersValidator {

def checkSanity(params: CobolParameters) = {

if (params.sourcePath.isEmpty) {
if (params.sourcePaths.isEmpty) {
throw new IllegalArgumentException("Data source path must be specified.")
}

Expand All @@ -53,8 +52,14 @@ object CobolParametersValidator {
val copyBookPathFileName = parameters.get(PARAM_COPYBOOK_PATH)
val copyBookMultiPathFileNames = parameters.get(PARAM_MULTI_COPYBOOK_PATH)

parameters.getOrElse(PARAM_SOURCE_PATH, throw new IllegalStateException(s"Cannot define path to source files: missing " +
s"parameter: '$PARAM_SOURCE_PATH'"))
if (!parameters.isDefinedAt(PARAM_SOURCE_PATH) && !parameters.isDefinedAt(PARAM_SOURCE_PATHS)) {
throw new IllegalStateException(s"Cannot define path to data files: missing " +
s"parameter: '$PARAM_SOURCE_PATH' or '$PARAM_SOURCE_PATHS'. It is automatically set when you invoke .load()")
}

if (parameters.isDefinedAt(PARAM_SOURCE_PATH) && parameters.isDefinedAt(PARAM_SOURCE_PATHS)) {
throw new IllegalStateException(s"Only one of '$PARAM_SOURCE_PATH' or '$PARAM_SOURCE_PATHS' should be defined.")
}

def validatePath(fileName: String): Unit = {
val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[source] object CobolScanners {
})
}

private[source] def buildScanForFixedLength(reader: FixedLenReader, sourceDir: String,
private[source] def buildScanForFixedLength(reader: FixedLenReader, sourceDirs: Seq[String],
recordParser: (FixedLenReader, RDD[Array[Byte]]) => RDD[Row],
debugIgnoreFileSize: Boolean,
sqlContext: SQLContext): RDD[Row] = {
Expand All @@ -85,21 +85,24 @@ private[source] object CobolScanners {

val recordSize = reader.getRecordSize

if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, recordSize)) {
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
}
sourceDirs.foreach(sourceDir => {
if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, recordSize)) {
throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.")
}
})

val records = sqlContext.sparkContext.binaryRecords(sourceDir, recordSize, sqlContext.sparkContext.hadoopConfiguration)
val records = sourceDirs.map(sourceDir => sqlContext.sparkContext.binaryRecords(sourceDir, recordSize, sqlContext.sparkContext.hadoopConfiguration))
.reduce((a ,b) => a.union(b))
recordParser(reader, records)
}

private[source] def buildScanForTextFiles(reader: FixedLenReader, sourceDir: String,
private[source] def buildScanForTextFiles(reader: FixedLenReader, sourceDirs: Seq[String],
recordParser: (FixedLenReader, RDD[Array[Byte]]) => RDD[Row],
sqlContext: SQLContext): RDD[Row] = {
sqlContext.read.text()

val rddText = sqlContext.sparkContext
.textFile(sourceDir)
val rddText = sourceDirs.map(sourceDir => sqlContext.sparkContext.textFile(sourceDir))
.reduce((a,b) => a.union(b))

val records = rddText
.filter(str => str.length > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {

it should "return an RDD[Row] if data are correct" in {
val testReader: FixedLenReader = new DummyFixedLenReader(sparkSchema, cobolSchema, testData)(() => Unit)
val relation = new CobolRelation(copybookFile.getParentFile.getAbsolutePath,
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
testReader,
localityParams = localityParams,
debugIgnoreFileSize = false)(sqlContext)
Expand All @@ -85,7 +85,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
it should "manage exceptions from Reader" in {
val exceptionMessage = "exception expected message"
val testReader: FixedLenReader = new DummyFixedLenReader(sparkSchema, cobolSchema, testData)(() => throw new Exception(exceptionMessage))
val relation = new CobolRelation(copybookFile.getParentFile.getAbsolutePath,
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
testReader,
localityParams = localityParams,
debugIgnoreFileSize = false)(sqlContext)
Expand All @@ -100,7 +100,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable {
val absentField = "absentField"
val modifiedSparkSchema = sparkSchema.add(StructField(absentField, StringType, false))
val testReader: FixedLenReader = new DummyFixedLenReader(modifiedSparkSchema, cobolSchema, testData)(() => Unit)
val relation = new CobolRelation(copybookFile.getParentFile.getAbsolutePath,
val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath),
testReader,
localityParams = localityParams,
debugIgnoreFileSize = false)(sqlContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.spark.cobol.source.integration

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.scalatest.WordSpec
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture

//noinspection NameBooleanParameters
class Test28MultipartLoadSpec extends WordSpec with SparkTestBase with BinaryFileFixture {

private val exampleName = "Test26 (custom record extractor)"

private val copybook =
""" 01 R.
03 A PIC X(2).
"""
private val data1 = "010203040506070809"
private val data2 = "101112131415161718"

"Multipart path spec" should {
"load avv available copybooks" in {
val expected = """[{"A":"01"},{"A":"02"},{"A":"03"},{"A":"04"},{"A":"05"},{"A":"06"},{"A":"07"},{"A":"08"},{"A":"09"},{"A":"10"},{"A":"11"},{"A":"12"},{"A":"13"},{"A":"14"},{"A":"15"},{"A":"16"},{"A":"17"},{"A":"18"}]"""

withTempBinFile("rec_len1", ".dat", data1.getBytes) { tmpFileName1 =>
withTempBinFile("rec_len2", ".dat", data2.getBytes) { tmpFileName2 =>
val df = getDataFrame(Seq(tmpFileName1, tmpFileName2))

val actual = df
.orderBy(col("A"))
.toJSON
.collect()
.mkString("[", ",", "]")

assert(df.count() == 18)
assert(actual == expected)
}
}
}
}

private def getDataFrame(inputPaths: Seq[String], extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame = {
spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("encoding", "ascii")
.option("schema_retention_policy", "collapse_root")
.option("paths", inputPaths.mkString(","))
.options(extraOptions)
.load()
}


}