Skip to content

Commit

Permalink
#607 Add 'minimum_record_length' and 'maximum_record_length' for ASCI…
Browse files Browse the repository at this point in the history
…I files.
  • Loading branch information
yruslan committed Apr 21, 2023
1 parent fb61b66 commit 3f98312
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class VRLRecordReader(cobolSchema: Copybook,
private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema)
private val recordLengthAdjustment = readerProperties.rdwAdjustment
private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty
private val minimumRecordLength = readerProperties.minimumRecordLength
private val maximumRecordLength = readerProperties.maximumRecordLength

fetchNext()

Expand Down Expand Up @@ -199,14 +201,14 @@ class VRLRecordReader(cobolSchema: Copybook,

byteIndex += headerBytes.length

isValidRecord = recordMetadata.isValid && recordLength >= minimumRecordLength && recordLength <= maximumRecordLength

if (recordLength > 0) {
recordBytes = dataStream.next(recordLength)
byteIndex += recordBytes.length
} else {
isEndOfFile = true
}

isValidRecord = recordMetadata.isValid
}

if (!isEndOfFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param recordStartOffset A number of bytes to skip at the beginning of the record before parsing a record according to a copybook
* @param recordEndOffset A number of bytes to skip at the end of each record
* @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length.
* @param minimumRecordLength Minium record length for which the record is considered valid.
* @param maximumRecordLength Maximum record length for which the record is considered valid.
* @param variableLengthParams VariableLengthParameters containing the specifications for the consumption of variable-length Cobol records.
* @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements
* @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record
Expand Down Expand Up @@ -75,6 +77,8 @@ case class CobolParameters(
recordStartOffset: Int,
recordEndOffset: Int,
recordLength: Option[Int],
minimumRecordLength: Option[Int],
maximumRecordLength: Option[Int],
variableLengthParams: Option[VariableLengthParameters],
variableSizeOccurs: Boolean,
generateRecordBytes: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param floatingPointFormat A format of floating-point numbers
* @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements
* @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length.
* @param lengthFieldExpression A name of a field that contains record length. Optional. If not set the copybook record length will be used.
* @param minimumRecordLength Minium record length for which the record is considered valid.
* @param maximumRecordLength Maximum record length for which the record is considered valid.
* @param lengthFieldExpression A name of a field that contains record length. Optional. If not set the copybook record length will be used.
* @param isRecordSequence Does input files have 4 byte record length headers
* @param bdw Block descriptor word (if specified), for FB and VB record formats
* @param isRdwPartRecLength Does RDW count itself as part of record length itself
Expand Down Expand Up @@ -82,7 +84,9 @@ case class ReaderParameters(
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
variableSizeOccurs: Boolean = false,
recordLength: Option[Int] = None,
lengthFieldExpression: Option[String] = None,
minimumRecordLength: Int = 1,
maximumRecordLength: Int = Int.MaxValue,
lengthFieldExpression: Option[String] = None,
isRecordSequence: Boolean = false,
bdw: Option[Bdw] = None,
isRdwBigEndian: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes

val schemaRetentionPolicy = readerParams.schemaPolicy

val minimumRecordLength = readerParams.minimumRecordLength
val maximumRecordLength = readerParams.maximumRecordLength

val rddRow = rdd
.filter(array => array.nonEmpty)
.filter(array => array.nonEmpty && array.length >= minimumRecordLength && array.length <= maximumRecordLength)
.map(array => {
val record = RecordExtractors.extractRecord[GenericRow](parsedCopybook.ast,
array,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,20 @@ object CobolParametersParser extends Logging {
val PARAM_SOURCE_PATHS_LEGACY = "paths"
val PARAM_ENCODING = "encoding"
val PARAM_PEDANTIC = "pedantic"

// Record format options
val PARAM_RECORD_FORMAT = "record_format"
val PARAM_RECORD_LENGTH = "record_length"
val PARAM_MINIMUM_RECORD_LENGTH = "minimum_record_length"
val PARAM_MAXIMUM_RECORD_LENGTH = "maximum_record_length"
val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence"
val PARAM_RECORD_LENGTH_FIELD = "record_length_field"
val PARAM_RECORD_START_OFFSET = "record_start_offset"
val PARAM_RECORD_END_OFFSET = "record_end_offset"
val PARAM_FILE_START_OFFSET = "file_start_offset"
val PARAM_FILE_END_OFFSET = "file_end_offset"
val PARAM_IS_XCOM = "is_xcom"
val PARAM_IS_TEXT = "is_text"

// Schema transformation parameters
val PARAM_GENERATE_RECORD_ID = "generate_record_id"
Expand Down Expand Up @@ -85,11 +94,6 @@ object CobolParametersParser extends Logging {
val PARAM_FIELD_CODE_PAGE_PREFIX = "field_code_page:"

// Parameters for multisegment variable length files
val PARAM_RECORD_FORMAT = "record_format"
val PARAM_RECORD_LENGTH = "record_length"
val PARAM_IS_XCOM = "is_xcom"
val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence"
val PARAM_IS_TEXT = "is_text"
val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian"
val PARAM_IS_BDW_BIG_ENDIAN = "is_bdw_big_endian"
val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length"
Expand Down Expand Up @@ -250,6 +254,8 @@ object CobolParametersParser extends Logging {
params.getOrElse(PARAM_RECORD_START_OFFSET, "0").toInt,
params.getOrElse(PARAM_RECORD_END_OFFSET, "0").toInt,
params.get(PARAM_RECORD_LENGTH).map(_.toInt),
params.get(PARAM_MINIMUM_RECORD_LENGTH).map(_.toInt),
params.get(PARAM_MAXIMUM_RECORD_LENGTH).map(_.toInt),
variableLengthParams,
params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean,
params.getOrElse(PARAM_GENERATE_RECORD_BYTES, "false").toBoolean,
Expand Down Expand Up @@ -368,6 +374,8 @@ object CobolParametersParser extends Logging {
floatingPointFormat = parameters.floatingPointFormat,
variableSizeOccurs = parameters.variableSizeOccurs,
recordLength = parameters.recordLength,
minimumRecordLength = parameters.minimumRecordLength.getOrElse(1),
maximumRecordLength = parameters.maximumRecordLength.getOrElse(Int.MaxValue),
lengthFieldExpression = recordLengthField,
isRecordSequence = varLenParams.isRecordSequence,
bdw = varLenParams.bdw,
Expand Down Expand Up @@ -786,6 +794,12 @@ object CobolParametersParser extends Logging {
if (params.contains(PARAM_IS_XCOM)) {
incorrectParameters += PARAM_IS_XCOM
}
if (params.contains(PARAM_MINIMUM_RECORD_LENGTH)) {
incorrectParameters += PARAM_MINIMUM_RECORD_LENGTH
}
if (params.contains(PARAM_MAXIMUM_RECORD_LENGTH)) {
incorrectParameters += PARAM_MAXIMUM_RECORD_LENGTH
}
if (params.contains(PARAM_IS_RDW_BIG_ENDIAN)) {
incorrectParameters += PARAM_IS_RDW_BIG_ENDIAN
}
Expand Down Expand Up @@ -857,6 +871,19 @@ object CobolParametersParser extends Logging {
throw new IllegalArgumentException(s"Option '$PARAM_IS_TEXT' and ${incorrectParameters.mkString(", ")} cannot be used together.")
}
}

if (params.contains(PARAM_ENCODING) && params(PARAM_ENCODING).toLowerCase() == "ebcdic") {
if (params.contains(PARAM_ASCII_CHARSET)) {
throw new IllegalArgumentException(s"Option '$PARAM_ASCII_CHARSET' cannot be used when '$PARAM_ENCODING = ebcdic'.")
}
}

if (params.contains(PARAM_ENCODING) && params(PARAM_ENCODING).toLowerCase() == "ascii") {
if (params.contains(PARAM_EBCDIC_CODE_PAGE)) {
throw new IllegalArgumentException(s"Option '$PARAM_EBCDIC_CODE_PAGE' cannot be used when '$PARAM_ENCODING = ascii'.")
}
}

if (unusedKeys.nonEmpty) {
val unusedKeyStr = unusedKeys.mkString(",")
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ final class FixedLenNestedReader(copyBookContents: Seq[String],
override def next(): Row = Row.fromSeq(iterator.next())
}

override def getReaderProperties: ReaderParameters = readerProperties

override def getCobolSchema: CobolSchema = CobolSchema.fromBaseReader(cobolSchema)

override def getSparkSchema: StructType = getCobolSchema.getSparkSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ final class FixedLenTextReader(copyBookContents: Seq[String],
override def next(): Row = Row.fromSeq(iterator.next())
}

override def getReaderProperties: ReaderParameters = readerProperties

override def getCobolSchema: CobolSchema = CobolSchema.fromBaseReader(cobolSchema)

override def getSparkSchema: StructType = getCobolSchema.getSparkSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package za.co.absa.cobrix.spark.cobol.reader

import org.apache.spark.sql.types.StructType
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.{Reader => CobolReader}


/** The abstract class for Cobol all data readers from various sources */
trait Reader extends CobolReader {
def getSparkSchema: StructType

/** All the properties that the reader might need. */
def getReaderProperties: ReaderParameters
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package za.co.absa.cobrix.spark.cobol.reader

import org.apache.spark.sql.Row
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

import scala.collection.mutable.ArrayBuffer
Expand All @@ -33,9 +32,6 @@ trait VarLenReader extends Reader with Serializable {
/** Returns true if RDW header of variable length files is big endian */
def isRdwBigEndian: Boolean

/** All the properties that the reader might need. */
def getReaderProperties: ReaderParameters

/**
* Returns a file iterator between particular offsets. This is for faster traversal of big binary files
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,14 @@ private[source] object CobolScanners extends Logging {
sourceDirs: Seq[String],
recordParser: (FixedLenTextReader, RDD[Array[Byte]]) => RDD[Row],
sqlContext: SQLContext): RDD[Row] = {
val minimumRecordLength = reader.getReaderProperties.minimumRecordLength
val maximumRecordLength = reader.getReaderProperties.maximumRecordLength

val rddText = sourceDirs.map(sourceDir => sqlContext.sparkContext.textFile(sourceDir))
.reduce((a, b) => a.union(b))

val records = rddText
.filter(str => str.nonEmpty)
.filter(str => str.nonEmpty && str.length >= minimumRecordLength && str.length <= maximumRecordLength)
.map(str => {
str.getBytes(StandardCharsets.UTF_8)
})
Expand All @@ -128,6 +131,9 @@ private[source] object CobolScanners extends Logging {
sourceDirs: Seq[String],
recordParser: (FixedLenTextReader, RDD[Array[Byte]]) => RDD[Row],
sqlContext: SQLContext): RDD[Row] = {
val minimumRecordLength = reader.getReaderProperties.minimumRecordLength
val maximumRecordLength = reader.getReaderProperties.maximumRecordLength

// The ides for the implementation is taken from the following Spark PR:
// https://github.com/apache/spark/pull/21287/files
val rddText = sourceDirs.map(sourceDir => sqlContext
Expand All @@ -144,7 +150,7 @@ private[source] object CobolScanners extends Logging {
}

val records = rddText
.filter(str => str.nonEmpty)
.filter(str => str.nonEmpty && str.length >= minimumRecordLength && str.length <= maximumRecordLength)

recordParser(reader, records)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package za.co.absa.cobrix.spark.cobol.source.base.impl
import org.apache.commons.lang3.NotImplementedException
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.spark.cobol.reader.FixedLenReader
import za.co.absa.cobrix.spark.cobol.schema.CobolSchema

class DummyFixedLenReader(sparkSchema: StructType, cobolSchema: CobolSchema, data: List[Map[String, Option[String]]])(invokeOnTraverse: () => Unit) extends FixedLenReader with Serializable {
override def getReaderProperties: ReaderParameters = null

def getCobolSchema: CobolSchema = cobolSchema

def getSparkSchema: StructType = sparkSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ class Test27RecordLengthSpec extends AnyWordSpec with SparkTestBase with BinaryF
getDataFrame("/dummy", Map("is_xcom" -> "true"))
}
}
"minimum_record_length" in {
intercept[IllegalArgumentException] {
getDataFrame("/dummy", Map("minimum_record_length" -> "10"))
}
}
"maximum_record_length" in {
intercept[IllegalArgumentException] {
getDataFrame("/dummy", Map("maximum_record_length" -> "10"))
}
}
"is_rdw_big_endian" in {
intercept[IllegalArgumentException] {
getDataFrame("/dummy", Map("is_rdw_big_endian" -> "true"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package za.co.absa.cobrix.spark.cobol.source.text

import java.nio.charset.StandardCharsets

import org.scalatest.funsuite.AnyFunSuite
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.cobrix.spark.cobol.Cobrix
import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase}
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture

Expand Down Expand Up @@ -147,4 +147,96 @@ class Test01AsciiTextFiles extends AnyFunSuite with SparkTestBase with BinaryFil
}
}

test("Test ASCII files with invalid record length (UTF-8)") {
val copybook = """ 05 A PIC 9(2)V9(2). """

val textFileContent = Seq("1234", "1", "23456").mkString("\n")

withTempTextFile("text_ascii", ".txt", StandardCharsets.UTF_8, textFileContent) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("pedantic", "true")
.option("record_format", "D")
.option("minimum_record_length", 2)
.option("maximum_record_length", 4)
.load(tmpFileName)

val expected = """[{"A":12.34}]"""

val actual = df.toJSON.collect().mkString("[", ",", "]")

assertEqualsMultiline(actual, expected)
}
}

test("Test ASCII files with invalid record length (us-ascii)") {
val copybook = """ 05 A PIC 9(2)V9(2). """

val textFileContent = Seq("1234", "1", "23456").mkString("\n")

withTempTextFile("text_ascii", ".txt", StandardCharsets.UTF_8, textFileContent) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("pedantic", "true")
.option("record_format", "D")
.option("ascii_charset", "us-ascii")
.option("minimum_record_length", 2)
.option("maximum_record_length", 4)
.load(tmpFileName)

val expected = """[{"A":12.34}]"""

val actual = df.toJSON.collect().mkString("[", ",", "]")

assertEqualsMultiline(actual, expected)
}
}

test("Test ASCII from RDD with invalid record length") {
val copybook = """ 05 A PIC 9(2)V9(2). """

val textFileArray = Seq("1234", "1", "23456")

val rdd = spark.sparkContext.parallelize(textFileArray).map(_.getBytes)

val df = Cobrix.fromRdd(spark)
.copybookContents(copybook)
.option("pedantic", "true")
.option("encoding", "ascii")
.option("ascii_charset", "us-ascii")
.option("minimum_record_length", "2")
.option("maximum_record_length", "4")
.load(rdd)

val expected = """[{"A":12.34}]"""

val actual = df.toJSON.collect().mkString("[", ",", "]")

assertEqualsMultiline(actual, expected)
}

test("Test ASCII from Text RDD with invalid record length") {
val copybook = """ 05 A PIC 9(2)V9(2). """

val textFileArray = Seq("1234", "1", "23456")

val rdd = spark.sparkContext.parallelize(textFileArray)

val df = Cobrix.fromRdd(spark)
.copybookContents(copybook)
.option("pedantic", "true")
.option("minimum_record_length", "2")
.option("maximum_record_length", "4")
.loadText(rdd)

val expected = """[{"A":12.34}]"""

val actual = df.toJSON.collect().mkString("[", ",", "]")

assertEqualsMultiline(actual, expected)
}
}

0 comments on commit 3f98312

Please sign in to comment.