From b95d6f5676eab16d884060843e889dd8279dd752 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 28 Jun 2021 21:32:12 +0200 Subject: [PATCH 1/3] #394 Add a test showing the failing behavior --- .../integration/Test28MultipartLoadSpec.scala | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala new file mode 100644 index 000000000..764654fae --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala @@ -0,0 +1,71 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.apache.spark.sql.DataFrame +import org.scalatest.WordSpec +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +//noinspection NameBooleanParameters +class Test28MultipartLoadSpec extends WordSpec with SparkTestBase with BinaryFileFixture { + + private val exampleName = "Test26 (custom record extractor)" + + private val copybook = + """ 01 R. + 03 A PIC X(2). + 03 B PIC X(1). + """ + private val data1 = "AABBBCCDDDEEFFFZYY" + private val data2 = "BAABBBCCDDDEEFFFZY" + + "Multipart path spec" should { + "load avv available copybooks" in { + val expected = """""" + + withTempBinFile("rec_len1", ".dat", data1.getBytes) { tmpFileName1 => + withTempBinFile("rec_len2", ".dat", data2.getBytes) { tmpFileName2 => + + + intercept[IllegalStateException] { + val df = getDataFrame(Seq(tmpFileName1, tmpFileName1)) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + } + + //assert(df.count() == 12) + //assert(actual == expected) + } + } + } + } + + private def getDataFrame(inputPaths: Seq[String], extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame = { + spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("encoding", "ascii") + .option("record_length", "2") + .option("schema_retention_policy", "collapse_root") + .options(extraOptions) + .load(inputPaths: _*) + } + + +} From 1dfb52306036efa3c40eae524bb922b13348fcdd Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 29 Jun 2021 20:54:01 +0200 Subject: [PATCH 2/3] #394 Add a workaround implemetation --- .../reader/parameters/CobolParameters.scala | 2 +- .../parameters/CobolParametersParser.scala | 5 ++- .../spark/cobol/source/CobolRelation.scala | 35 ++++++++++--------- .../spark/cobol/source/DefaultSource.scala | 2 +- .../parameters/CobolParametersValidator.scala | 13 ++++--- .../cobol/source/scanners/CobolScanners.scala | 19 +++++----- .../cobol/source/CobolRelationSpec.scala | 6 ++-- .../integration/Test28MultipartLoadSpec.scala | 28 +++++++-------- 8 files changed, 61 insertions(+), 49 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index f0456bd0e..edf15bf80 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -54,7 +54,7 @@ case class CobolParameters( copybookPath: Option[String], multiCopybookPath: Seq[String], copybookContent: Option[String], - sourcePath: Option[String], + sourcePaths: Seq[String], isText: Boolean, isEbcdic: Boolean, ebcdicCodePage: String, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index e5362347f..de24ddaf9 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -42,6 +42,7 @@ object CobolParametersParser { val PARAM_MULTI_COPYBOOK_PATH = "copybooks" val PARAM_COPYBOOK_CONTENTS = "copybook_contents" val PARAM_SOURCE_PATH = "path" + val PARAM_SOURCE_PATHS = "paths" val PARAM_ENCODING = "encoding" val PARAM_PEDANTIC = "pedantic" val PARAM_RECORD_LENGTH_FIELD = "record_length_field" @@ -208,11 +209,13 @@ object CobolParametersParser { } } + val paths = getParameter(PARAM_SOURCE_PATHS, params).map(_.split(',')).getOrElse(Array(getParameter(PARAM_SOURCE_PATH, params).get)) + val cobolParameters = CobolParameters( getParameter(PARAM_COPYBOOK_PATH, params), params.getOrElse(PARAM_MULTI_COPYBOOK_PATH, "").split(','), getParameter(PARAM_COPYBOOK_CONTENTS, params), - getParameter(PARAM_SOURCE_PATH, params), + paths, params.getOrElse(PARAM_IS_TEXT, "false").toBoolean, isEbcdic, ebcdicCodePageName, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala index 61692a531..806635187 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelation.scala @@ -37,19 +37,19 @@ import scala.util.control.NonFatal class SerializableConfiguration(@transient var value: Configuration) extends Serializable { - private def writeObject(out: ObjectOutputStream): Unit = + private def writeObject(out: ObjectOutputStream): Unit = try { - out.defaultWriteObject() - value.write(out) + out.defaultWriteObject() + value.write(out) } catch { case NonFatal(e) => throw new IOException(e) } - private def readObject(in: ObjectInputStream): Unit = + private def readObject(in: ObjectInputStream): Unit = try { - value = new Configuration(false) - value.readFields(in) + value = new Configuration(false) + value.readFields(in) } catch { case NonFatal(e) => throw new IOException(e) @@ -63,18 +63,18 @@ class SerializableConfiguration(@transient var value: Configuration) extends Ser * * Its constructor is expected to change after the hierarchy of [[za.co.absa.cobrix.spark.cobol.reader.Reader]] is put in place. */ -class CobolRelation(sourceDir: String, +class CobolRelation(sourceDirs: Seq[String], cobolReader: Reader, localityParams: LocalityParameters, debugIgnoreFileSize: Boolean )(@transient val sqlContext: SQLContext) extends BaseRelation - with Serializable - with TableScan { + with Serializable + with TableScan { private val logger = LoggerFactory.getLogger(this.getClass) - private val filesList = getListFilesWithOrder(sourceDir) + private val filesList = getListFilesWithOrder(sourceDirs) private lazy val indexes: RDD[SparseIndexEntry] = IndexBuilder.buildIndex(filesList, cobolReader, sqlContext)(localityParams) @@ -83,12 +83,11 @@ class CobolRelation(sourceDir: String, } override def buildScan(): RDD[Row] = { - cobolReader match { case blockReader: FixedLenTextReader => - CobolScanners.buildScanForTextFiles(blockReader, sourceDir, parseRecords, sqlContext) + CobolScanners.buildScanForTextFiles(blockReader, sourceDirs, parseRecords, sqlContext) case blockReader: FixedLenReader => - CobolScanners.buildScanForFixedLength(blockReader, sourceDir, parseRecords, debugIgnoreFileSize, sqlContext) + CobolScanners.buildScanForFixedLength(blockReader, sourceDirs, parseRecords, debugIgnoreFileSize, sqlContext) case streamReader: VarLenReader if streamReader.isIndexGenerationNeeded => CobolScanners.buildScanForVarLenIndex(streamReader, indexes, filesList, sqlContext) case streamReader: VarLenReader => @@ -104,13 +103,15 @@ class CobolRelation(sourceDir: String, * * The List contains [[za.co.absa.cobrix.spark.cobol.source.types.FileWithOrder]] instances. */ - private def getListFilesWithOrder(sourceDir: String): Array[FileWithOrder] = { + private def getListFilesWithOrder(sourceDirs: Seq[String]): Array[FileWithOrder] = { + val allFiles = sourceDirs.flatMap(sourceDir => { + FileUtils + .getFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, isRecursiveRetrieval) + }).toArray - FileUtils - .getFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, isRecursiveRetrieval) + allFiles .zipWithIndex .map(file => FileWithOrder(file._1, file._2)) - .toArray } /** diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala index 8b967c4ac..e30896673 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala @@ -55,7 +55,7 @@ class DefaultSource val cobolParameters = CobolParametersParser.parse(new Parameters(parameters)) CobolParametersValidator.checkSanity(cobolParameters) - new CobolRelation(parameters(PARAM_SOURCE_PATH), + new CobolRelation(cobolParameters.sourcePaths, buildEitherReader(sqlContext.sparkSession, cobolParameters), LocalityParameters.extract(cobolParameters), cobolParameters.debugIgnoreFileSize)(sqlContext) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala index 399c4e84a..7da483168 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala @@ -32,8 +32,7 @@ import za.co.absa.cobrix.spark.cobol.utils.FileNameUtils object CobolParametersValidator { def checkSanity(params: CobolParameters) = { - - if (params.sourcePath.isEmpty) { + if (params.sourcePaths.isEmpty) { throw new IllegalArgumentException("Data source path must be specified.") } @@ -53,8 +52,14 @@ object CobolParametersValidator { val copyBookPathFileName = parameters.get(PARAM_COPYBOOK_PATH) val copyBookMultiPathFileNames = parameters.get(PARAM_MULTI_COPYBOOK_PATH) - parameters.getOrElse(PARAM_SOURCE_PATH, throw new IllegalStateException(s"Cannot define path to source files: missing " + - s"parameter: '$PARAM_SOURCE_PATH'")) + if (!parameters.isDefinedAt(PARAM_SOURCE_PATH) && !parameters.isDefinedAt(PARAM_SOURCE_PATHS)) { + throw new IllegalStateException(s"Cannot define path to data files: missing " + + s"parameter: '$PARAM_SOURCE_PATH' or '$PARAM_SOURCE_PATHS'. It is automatically set when you invoke .load()") + } + + if (parameters.isDefinedAt(PARAM_SOURCE_PATH) && parameters.isDefinedAt(PARAM_SOURCE_PATHS)) { + throw new IllegalStateException(s"Only one of '$PARAM_SOURCE_PATH' or '$PARAM_SOURCE_PATHS' should be defined.") + } def validatePath(fileName: String): Unit = { val (isLocalFS, copyBookFileName) = FileNameUtils.getCopyBookFileName(fileName) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala index 776c2d5e3..402749c2c 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala @@ -74,7 +74,7 @@ private[source] object CobolScanners { }) } - private[source] def buildScanForFixedLength(reader: FixedLenReader, sourceDir: String, + private[source] def buildScanForFixedLength(reader: FixedLenReader, sourceDirs: Seq[String], recordParser: (FixedLenReader, RDD[Array[Byte]]) => RDD[Row], debugIgnoreFileSize: Boolean, sqlContext: SQLContext): RDD[Row] = { @@ -85,21 +85,24 @@ private[source] object CobolScanners { val recordSize = reader.getRecordSize - if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, recordSize)) { - throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.") - } + sourceDirs.foreach(sourceDir => { + if (!debugIgnoreFileSize && areThereNonDivisibleFiles(sourceDir, sqlContext.sparkContext.hadoopConfiguration, recordSize)) { + throw new IllegalArgumentException(s"There are some files in $sourceDir that are NOT DIVISIBLE by the RECORD SIZE calculated from the copybook ($recordSize bytes per record). Check the logs for the names of the files.") + } + }) - val records = sqlContext.sparkContext.binaryRecords(sourceDir, recordSize, sqlContext.sparkContext.hadoopConfiguration) + val records = sourceDirs.map(sourceDir => sqlContext.sparkContext.binaryRecords(sourceDir, recordSize, sqlContext.sparkContext.hadoopConfiguration)) + .reduce((a ,b) => a.union(b)) recordParser(reader, records) } - private[source] def buildScanForTextFiles(reader: FixedLenReader, sourceDir: String, + private[source] def buildScanForTextFiles(reader: FixedLenReader, sourceDirs: Seq[String], recordParser: (FixedLenReader, RDD[Array[Byte]]) => RDD[Row], sqlContext: SQLContext): RDD[Row] = { sqlContext.read.text() - val rddText = sqlContext.sparkContext - .textFile(sourceDir) + val rddText = sourceDirs.map(sourceDir => sqlContext.sparkContext.textFile(sourceDir)) + .reduce((a,b) => a.union(b)) val records = rddText .filter(str => str.length > 0) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala index 138f33196..d0f032bef 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/CobolRelationSpec.scala @@ -61,7 +61,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable { it should "return an RDD[Row] if data are correct" in { val testReader: FixedLenReader = new DummyFixedLenReader(sparkSchema, cobolSchema, testData)(() => Unit) - val relation = new CobolRelation(copybookFile.getParentFile.getAbsolutePath, + val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath), testReader, localityParams = localityParams, debugIgnoreFileSize = false)(sqlContext) @@ -85,7 +85,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable { it should "manage exceptions from Reader" in { val exceptionMessage = "exception expected message" val testReader: FixedLenReader = new DummyFixedLenReader(sparkSchema, cobolSchema, testData)(() => throw new Exception(exceptionMessage)) - val relation = new CobolRelation(copybookFile.getParentFile.getAbsolutePath, + val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath), testReader, localityParams = localityParams, debugIgnoreFileSize = false)(sqlContext) @@ -100,7 +100,7 @@ class CobolRelationSpec extends SparkCobolTestBase with Serializable { val absentField = "absentField" val modifiedSparkSchema = sparkSchema.add(StructField(absentField, StringType, false)) val testReader: FixedLenReader = new DummyFixedLenReader(modifiedSparkSchema, cobolSchema, testData)(() => Unit) - val relation = new CobolRelation(copybookFile.getParentFile.getAbsolutePath, + val relation = new CobolRelation(Seq(copybookFile.getParentFile.getAbsolutePath), testReader, localityParams = localityParams, debugIgnoreFileSize = false)(sqlContext) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala index 764654fae..112e6b031 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test28MultipartLoadSpec.scala @@ -17,6 +17,7 @@ package za.co.absa.cobrix.spark.cobol.source.integration import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col import org.scalatest.WordSpec import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture @@ -29,27 +30,26 @@ class Test28MultipartLoadSpec extends WordSpec with SparkTestBase with BinaryFil private val copybook = """ 01 R. 03 A PIC X(2). - 03 B PIC X(1). """ - private val data1 = "AABBBCCDDDEEFFFZYY" - private val data2 = "BAABBBCCDDDEEFFFZY" + private val data1 = "010203040506070809" + private val data2 = "101112131415161718" "Multipart path spec" should { "load avv available copybooks" in { - val expected = """""" + val expected = """[{"A":"01"},{"A":"02"},{"A":"03"},{"A":"04"},{"A":"05"},{"A":"06"},{"A":"07"},{"A":"08"},{"A":"09"},{"A":"10"},{"A":"11"},{"A":"12"},{"A":"13"},{"A":"14"},{"A":"15"},{"A":"16"},{"A":"17"},{"A":"18"}]""" withTempBinFile("rec_len1", ".dat", data1.getBytes) { tmpFileName1 => withTempBinFile("rec_len2", ".dat", data2.getBytes) { tmpFileName2 => + val df = getDataFrame(Seq(tmpFileName1, tmpFileName2)) + val actual = df + .orderBy(col("A")) + .toJSON + .collect() + .mkString("[", ",", "]") - intercept[IllegalStateException] { - val df = getDataFrame(Seq(tmpFileName1, tmpFileName1)) - - val actual = df.toJSON.collect().mkString("[", ",", "]") - } - - //assert(df.count() == 12) - //assert(actual == expected) + assert(df.count() == 18) + assert(actual == expected) } } } @@ -61,10 +61,10 @@ class Test28MultipartLoadSpec extends WordSpec with SparkTestBase with BinaryFil .format("cobol") .option("copybook_contents", copybook) .option("encoding", "ascii") - .option("record_length", "2") .option("schema_retention_policy", "collapse_root") + .option("paths", inputPaths.mkString(",")) .options(extraOptions) - .load(inputPaths: _*) + .load() } From 00ac5b590e0167fcb931ebabc70b112656c26899 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 30 Jun 2021 13:43:16 +0200 Subject: [PATCH 3/3] #394 Add documentation for the added feature --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index b25882040..09bb64ef8 100644 --- a/README.md +++ b/README.md @@ -272,6 +272,17 @@ $ spark-shell --jars spark-cobol-assembly-2.2.2-SNAPSHOT.jar ## Other Features +### Loading several paths +Currently, specifying multiple paths in `load()` is not supported. Use the following syntax: +```scala + spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("paths", inputPaths.mkString(",")) + .load() +``` + ### Spark SQL schema extraction This library also provides convenient methods to extract Spark SQL schemas and Cobol layouts from copybooks. @@ -1075,6 +1086,7 @@ Again, the full example is available at | Option (usage example) | Description | | ------------------------------------------ |:----------------------------------------------------------------------------- | +| .option("paths", "/path1,/path2") | Allows loading data from multiple unrelated paths on the same filesystem. | | .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. | | .option("file_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each file. | | .option("file_end_offset", "0") | Specifies the number of bytes to skip at the end of each file. |