From 3f98312c417dd8afffa963dc36c482750fe10aef Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 18 Apr 2023 08:09:46 +0200 Subject: [PATCH] #607 Add 'minimum_record_length' and 'maximum_record_length' for ASCII files. --- .../reader/iterator/VRLRecordReader.scala | 6 +- .../reader/parameters/CobolParameters.scala | 4 + .../reader/parameters/ReaderParameters.scala | 8 +- .../builder/SparkCobolOptionsBuilder.scala | 5 +- .../parameters/CobolParametersParser.scala | 37 +++++++- .../cobol/reader/FixedLenNestedReader.scala | 2 + .../cobol/reader/FixedLenTextReader.scala | 2 + .../cobrix/spark/cobol/reader/Reader.scala | 4 + .../spark/cobol/reader/VarLenReader.scala | 4 - .../cobol/source/scanners/CobolScanners.scala | 10 +- .../base/impl/DummyFixedLenReader.scala | 3 + .../integration/Test27RecordLengthSpec.scala | 10 ++ .../source/text/Test01AsciiTextFiles.scala | 94 ++++++++++++++++++- 13 files changed, 172 insertions(+), 17 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala index 48ae8b15d..6b8a45076 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReader.scala @@ -55,6 +55,8 @@ class VRLRecordReader(cobolSchema: Copybook, private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema) private val recordLengthAdjustment = readerProperties.rdwAdjustment private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty + private val minimumRecordLength = readerProperties.minimumRecordLength + private val maximumRecordLength = readerProperties.maximumRecordLength fetchNext() @@ -199,14 +201,14 @@ class VRLRecordReader(cobolSchema: Copybook, byteIndex += headerBytes.length + isValidRecord = recordMetadata.isValid && recordLength >= minimumRecordLength && recordLength <= maximumRecordLength + if (recordLength > 0) { recordBytes = dataStream.next(recordLength) byteIndex += recordBytes.length } else { isEndOfFile = true } - - isValidRecord = recordMetadata.isValid } if (!isEndOfFile) { diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index b9c169078..3a4f2b0de 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -42,6 +42,8 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param recordStartOffset A number of bytes to skip at the beginning of the record before parsing a record according to a copybook * @param recordEndOffset A number of bytes to skip at the end of each record * @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length. + * @param minimumRecordLength Minium record length for which the record is considered valid. + * @param maximumRecordLength Maximum record length for which the record is considered valid. * @param variableLengthParams VariableLengthParameters containing the specifications for the consumption of variable-length Cobol records. * @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements * @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record @@ -75,6 +77,8 @@ case class CobolParameters( recordStartOffset: Int, recordEndOffset: Int, recordLength: Option[Int], + minimumRecordLength: Option[Int], + maximumRecordLength: Option[Int], variableLengthParams: Option[VariableLengthParameters], variableSizeOccurs: Boolean, generateRecordBytes: Boolean, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index be74abce1..848751e92 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -39,7 +39,9 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param floatingPointFormat A format of floating-point numbers * @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements * @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length. - * @param lengthFieldExpression A name of a field that contains record length. Optional. If not set the copybook record length will be used. + * @param minimumRecordLength Minium record length for which the record is considered valid. + * @param maximumRecordLength Maximum record length for which the record is considered valid. + * @param lengthFieldExpression A name of a field that contains record length. Optional. If not set the copybook record length will be used. * @param isRecordSequence Does input files have 4 byte record length headers * @param bdw Block descriptor word (if specified), for FB and VB record formats * @param isRdwPartRecLength Does RDW count itself as part of record length itself @@ -82,7 +84,9 @@ case class ReaderParameters( floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM, variableSizeOccurs: Boolean = false, recordLength: Option[Int] = None, - lengthFieldExpression: Option[String] = None, + minimumRecordLength: Int = 1, + maximumRecordLength: Int = Int.MaxValue, + lengthFieldExpression: Option[String] = None, isRecordSequence: Boolean = false, bdw: Option[Bdw] = None, isRdwBigEndian: Boolean = false, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala index 80c643972..70734bff0 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala @@ -93,8 +93,11 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes val schemaRetentionPolicy = readerParams.schemaPolicy + val minimumRecordLength = readerParams.minimumRecordLength + val maximumRecordLength = readerParams.maximumRecordLength + val rddRow = rdd - .filter(array => array.nonEmpty) + .filter(array => array.nonEmpty && array.length >= minimumRecordLength && array.length <= maximumRecordLength) .map(array => { val record = RecordExtractors.extractRecord[GenericRow](parsedCopybook.ast, array, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index abbcafb3c..cd12b9c9a 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -47,11 +47,20 @@ object CobolParametersParser extends Logging { val PARAM_SOURCE_PATHS_LEGACY = "paths" val PARAM_ENCODING = "encoding" val PARAM_PEDANTIC = "pedantic" + + // Record format options + val PARAM_RECORD_FORMAT = "record_format" + val PARAM_RECORD_LENGTH = "record_length" + val PARAM_MINIMUM_RECORD_LENGTH = "minimum_record_length" + val PARAM_MAXIMUM_RECORD_LENGTH = "maximum_record_length" + val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" val PARAM_RECORD_LENGTH_FIELD = "record_length_field" val PARAM_RECORD_START_OFFSET = "record_start_offset" val PARAM_RECORD_END_OFFSET = "record_end_offset" val PARAM_FILE_START_OFFSET = "file_start_offset" val PARAM_FILE_END_OFFSET = "file_end_offset" + val PARAM_IS_XCOM = "is_xcom" + val PARAM_IS_TEXT = "is_text" // Schema transformation parameters val PARAM_GENERATE_RECORD_ID = "generate_record_id" @@ -85,11 +94,6 @@ object CobolParametersParser extends Logging { val PARAM_FIELD_CODE_PAGE_PREFIX = "field_code_page:" // Parameters for multisegment variable length files - val PARAM_RECORD_FORMAT = "record_format" - val PARAM_RECORD_LENGTH = "record_length" - val PARAM_IS_XCOM = "is_xcom" - val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" - val PARAM_IS_TEXT = "is_text" val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian" val PARAM_IS_BDW_BIG_ENDIAN = "is_bdw_big_endian" val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length" @@ -250,6 +254,8 @@ object CobolParametersParser extends Logging { params.getOrElse(PARAM_RECORD_START_OFFSET, "0").toInt, params.getOrElse(PARAM_RECORD_END_OFFSET, "0").toInt, params.get(PARAM_RECORD_LENGTH).map(_.toInt), + params.get(PARAM_MINIMUM_RECORD_LENGTH).map(_.toInt), + params.get(PARAM_MAXIMUM_RECORD_LENGTH).map(_.toInt), variableLengthParams, params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean, params.getOrElse(PARAM_GENERATE_RECORD_BYTES, "false").toBoolean, @@ -368,6 +374,8 @@ object CobolParametersParser extends Logging { floatingPointFormat = parameters.floatingPointFormat, variableSizeOccurs = parameters.variableSizeOccurs, recordLength = parameters.recordLength, + minimumRecordLength = parameters.minimumRecordLength.getOrElse(1), + maximumRecordLength = parameters.maximumRecordLength.getOrElse(Int.MaxValue), lengthFieldExpression = recordLengthField, isRecordSequence = varLenParams.isRecordSequence, bdw = varLenParams.bdw, @@ -786,6 +794,12 @@ object CobolParametersParser extends Logging { if (params.contains(PARAM_IS_XCOM)) { incorrectParameters += PARAM_IS_XCOM } + if (params.contains(PARAM_MINIMUM_RECORD_LENGTH)) { + incorrectParameters += PARAM_MINIMUM_RECORD_LENGTH + } + if (params.contains(PARAM_MAXIMUM_RECORD_LENGTH)) { + incorrectParameters += PARAM_MAXIMUM_RECORD_LENGTH + } if (params.contains(PARAM_IS_RDW_BIG_ENDIAN)) { incorrectParameters += PARAM_IS_RDW_BIG_ENDIAN } @@ -857,6 +871,19 @@ object CobolParametersParser extends Logging { throw new IllegalArgumentException(s"Option '$PARAM_IS_TEXT' and ${incorrectParameters.mkString(", ")} cannot be used together.") } } + + if (params.contains(PARAM_ENCODING) && params(PARAM_ENCODING).toLowerCase() == "ebcdic") { + if (params.contains(PARAM_ASCII_CHARSET)) { + throw new IllegalArgumentException(s"Option '$PARAM_ASCII_CHARSET' cannot be used when '$PARAM_ENCODING = ebcdic'.") + } + } + + if (params.contains(PARAM_ENCODING) && params(PARAM_ENCODING).toLowerCase() == "ascii") { + if (params.contains(PARAM_EBCDIC_CODE_PAGE)) { + throw new IllegalArgumentException(s"Option '$PARAM_EBCDIC_CODE_PAGE' cannot be used when '$PARAM_ENCODING = ascii'.") + } + } + if (unusedKeys.nonEmpty) { val unusedKeyStr = unusedKeys.mkString(",") val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr." diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenNestedReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenNestedReader.scala index 9e2cb8f59..7bf84e9b3 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenNestedReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenNestedReader.scala @@ -66,6 +66,8 @@ final class FixedLenNestedReader(copyBookContents: Seq[String], override def next(): Row = Row.fromSeq(iterator.next()) } + override def getReaderProperties: ReaderParameters = readerProperties + override def getCobolSchema: CobolSchema = CobolSchema.fromBaseReader(cobolSchema) override def getSparkSchema: StructType = getCobolSchema.getSparkSchema diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenTextReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenTextReader.scala index a1a578db3..39c22ba91 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenTextReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/FixedLenTextReader.scala @@ -67,6 +67,8 @@ final class FixedLenTextReader(copyBookContents: Seq[String], override def next(): Row = Row.fromSeq(iterator.next()) } + override def getReaderProperties: ReaderParameters = readerProperties + override def getCobolSchema: CobolSchema = CobolSchema.fromBaseReader(cobolSchema) override def getSparkSchema: StructType = getCobolSchema.getSparkSchema diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/Reader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/Reader.scala index 9b0da4102..da54c6ea7 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/Reader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/Reader.scala @@ -17,10 +17,14 @@ package za.co.absa.cobrix.spark.cobol.reader import org.apache.spark.sql.types.StructType +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.{Reader => CobolReader} /** The abstract class for Cobol all data readers from various sources */ trait Reader extends CobolReader { def getSparkSchema: StructType + + /** All the properties that the reader might need. */ + def getReaderProperties: ReaderParameters } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala index a2a9f6ca6..332c8b688 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala @@ -18,7 +18,6 @@ package za.co.absa.cobrix.spark.cobol.reader import org.apache.spark.sql.Row import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.stream.SimpleStream import scala.collection.mutable.ArrayBuffer @@ -33,9 +32,6 @@ trait VarLenReader extends Reader with Serializable { /** Returns true if RDW header of variable length files is big endian */ def isRdwBigEndian: Boolean - /** All the properties that the reader might need. */ - def getReaderProperties: ReaderParameters - /** * Returns a file iterator between particular offsets. This is for faster traversal of big binary files * diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala index 8ee1d0238..97a92199a 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala @@ -113,11 +113,14 @@ private[source] object CobolScanners extends Logging { sourceDirs: Seq[String], recordParser: (FixedLenTextReader, RDD[Array[Byte]]) => RDD[Row], sqlContext: SQLContext): RDD[Row] = { + val minimumRecordLength = reader.getReaderProperties.minimumRecordLength + val maximumRecordLength = reader.getReaderProperties.maximumRecordLength + val rddText = sourceDirs.map(sourceDir => sqlContext.sparkContext.textFile(sourceDir)) .reduce((a, b) => a.union(b)) val records = rddText - .filter(str => str.nonEmpty) + .filter(str => str.nonEmpty && str.length >= minimumRecordLength && str.length <= maximumRecordLength) .map(str => { str.getBytes(StandardCharsets.UTF_8) }) @@ -128,6 +131,9 @@ private[source] object CobolScanners extends Logging { sourceDirs: Seq[String], recordParser: (FixedLenTextReader, RDD[Array[Byte]]) => RDD[Row], sqlContext: SQLContext): RDD[Row] = { + val minimumRecordLength = reader.getReaderProperties.minimumRecordLength + val maximumRecordLength = reader.getReaderProperties.maximumRecordLength + // The ides for the implementation is taken from the following Spark PR: // https://github.com/apache/spark/pull/21287/files val rddText = sourceDirs.map(sourceDir => sqlContext @@ -144,7 +150,7 @@ private[source] object CobolScanners extends Logging { } val records = rddText - .filter(str => str.nonEmpty) + .filter(str => str.nonEmpty && str.length >= minimumRecordLength && str.length <= maximumRecordLength) recordParser(reader, records) } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyFixedLenReader.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyFixedLenReader.scala index e7addc1a6..0d78e0732 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyFixedLenReader.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyFixedLenReader.scala @@ -19,10 +19,13 @@ package za.co.absa.cobrix.spark.cobol.source.base.impl import org.apache.commons.lang3.NotImplementedException import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.spark.cobol.reader.FixedLenReader import za.co.absa.cobrix.spark.cobol.schema.CobolSchema class DummyFixedLenReader(sparkSchema: StructType, cobolSchema: CobolSchema, data: List[Map[String, Option[String]]])(invokeOnTraverse: () => Unit) extends FixedLenReader with Serializable { + override def getReaderProperties: ReaderParameters = null + def getCobolSchema: CobolSchema = cobolSchema def getSparkSchema: StructType = sparkSchema diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala index c2fe086ed..326e6b8d8 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test27RecordLengthSpec.scala @@ -94,6 +94,16 @@ class Test27RecordLengthSpec extends AnyWordSpec with SparkTestBase with BinaryF getDataFrame("/dummy", Map("is_xcom" -> "true")) } } + "minimum_record_length" in { + intercept[IllegalArgumentException] { + getDataFrame("/dummy", Map("minimum_record_length" -> "10")) + } + } + "maximum_record_length" in { + intercept[IllegalArgumentException] { + getDataFrame("/dummy", Map("maximum_record_length" -> "10")) + } + } "is_rdw_big_endian" in { intercept[IllegalArgumentException] { getDataFrame("/dummy", Map("is_rdw_big_endian" -> "true")) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test01AsciiTextFiles.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test01AsciiTextFiles.scala index d055786e0..31d272d41 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test01AsciiTextFiles.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test01AsciiTextFiles.scala @@ -17,9 +17,9 @@ package za.co.absa.cobrix.spark.cobol.source.text import java.nio.charset.StandardCharsets - import org.scalatest.funsuite.AnyFunSuite import org.slf4j.{Logger, LoggerFactory} +import za.co.absa.cobrix.spark.cobol.Cobrix import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase} import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture @@ -147,4 +147,96 @@ class Test01AsciiTextFiles extends AnyFunSuite with SparkTestBase with BinaryFil } } + test("Test ASCII files with invalid record length (UTF-8)") { + val copybook = """ 05 A PIC 9(2)V9(2). """ + + val textFileContent = Seq("1234", "1", "23456").mkString("\n") + + withTempTextFile("text_ascii", ".txt", StandardCharsets.UTF_8, textFileContent) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("pedantic", "true") + .option("record_format", "D") + .option("minimum_record_length", 2) + .option("maximum_record_length", 4) + .load(tmpFileName) + + val expected = """[{"A":12.34}]""" + + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assertEqualsMultiline(actual, expected) + } + } + + test("Test ASCII files with invalid record length (us-ascii)") { + val copybook = """ 05 A PIC 9(2)V9(2). """ + + val textFileContent = Seq("1234", "1", "23456").mkString("\n") + + withTempTextFile("text_ascii", ".txt", StandardCharsets.UTF_8, textFileContent) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("pedantic", "true") + .option("record_format", "D") + .option("ascii_charset", "us-ascii") + .option("minimum_record_length", 2) + .option("maximum_record_length", 4) + .load(tmpFileName) + + val expected = """[{"A":12.34}]""" + + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assertEqualsMultiline(actual, expected) + } + } + + test("Test ASCII from RDD with invalid record length") { + val copybook = """ 05 A PIC 9(2)V9(2). """ + + val textFileArray = Seq("1234", "1", "23456") + + val rdd = spark.sparkContext.parallelize(textFileArray).map(_.getBytes) + + val df = Cobrix.fromRdd(spark) + .copybookContents(copybook) + .option("pedantic", "true") + .option("encoding", "ascii") + .option("ascii_charset", "us-ascii") + .option("minimum_record_length", "2") + .option("maximum_record_length", "4") + .load(rdd) + + val expected = """[{"A":12.34}]""" + + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assertEqualsMultiline(actual, expected) + } + + test("Test ASCII from Text RDD with invalid record length") { + val copybook = """ 05 A PIC 9(2)V9(2). """ + + val textFileArray = Seq("1234", "1", "23456") + + val rdd = spark.sparkContext.parallelize(textFileArray) + + val df = Cobrix.fromRdd(spark) + .copybookContents(copybook) + .option("pedantic", "true") + .option("minimum_record_length", "2") + .option("maximum_record_length", "4") + .loadText(rdd) + + val expected = """[{"A":12.34}]""" + + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assertEqualsMultiline(actual, expected) + } }