Skip to content

Commit

Permalink
Merge pull request #254 from AbsaOSS/bugfix/252-support-with-input-fi…
Browse files Browse the repository at this point in the history
…le-col

#251 Fix glob support and divisibility check for large amount of files.
  • Loading branch information
yruslan authored Feb 24, 2020
2 parents fe40a55 + 8782241 commit 3457176
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,12 @@ object CobolParametersParser {
*/
private def validateSparkCobolOptions(params: Parameters): Unit = {
val isRecordSequence = params.getOrElse(PARAM_IS_XCOM, "false").toBoolean ||
params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean
params.getOrElse(PARAM_IS_RECORD_SEQUENCE, "false").toBoolean ||
params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean ||
params.contains(PARAM_FILE_START_OFFSET) ||
params.contains(PARAM_FILE_END_OFFSET) ||
params.contains(PARAM_RECORD_LENGTH)

val isPedantic = params.getOrElse(PARAM_PEDANTIC, "false").toBoolean
val keysPassed = params.getMap.keys.toSeq
val unusedKeys = keysPassed.flatMap(key => {
Expand All @@ -448,15 +453,6 @@ object CobolParametersParser {
Some(key)
}
})
if (unusedKeys.nonEmpty) {
val unusedKeyStr = unusedKeys.mkString(",")
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
if (isPedantic) {
throw new IllegalArgumentException(msg)
} else {
logger.error(msg)
}
}
val segmentRedefineParents = getSegmentRedefineParents(params)
if (segmentRedefineParents.nonEmpty) {
val segmentIdLevels = parseSegmentLevels(params)
Expand All @@ -466,7 +462,18 @@ object CobolParametersParser {
}
}
if (!isRecordSequence && params.contains(PARAM_INPUT_FILE_COLUMN)) {
throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when '$PARAM_IS_RECORD_SEQUENCE' = true.")
val recordSequenceCondition = s"one of this holds: '$PARAM_IS_RECORD_SEQUENCE' = true or '$PARAM_VARIABLE_SIZE_OCCURS' = true" +
s" or one of these options is set: '$PARAM_RECORD_LENGTH', '$PARAM_FILE_START_OFFSET', '$PARAM_FILE_END_OFFSET'"
throw new IllegalArgumentException(s"Option '$PARAM_INPUT_FILE_COLUMN' is supported only when $recordSequenceCondition")
}
if (unusedKeys.nonEmpty) {
val unusedKeyStr = unusedKeys.mkString(",")
val msg = s"Redundant or unrecognized option(s) to 'spark-cobol': $unusedKeyStr."
if (isPedantic) {
throw new IllegalArgumentException(msg)
} else {
logger.error(msg)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package za.co.absa.cobrix.spark.cobol.source.integration

import org.apache.spark.sql.{DataFrame, DataFrameReader}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, DataFrameReader}
import org.scalatest.WordSpec
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase

Expand Down Expand Up @@ -85,7 +85,7 @@ class Test20InputFileNameSpec extends WordSpec with SparkTestBase {
.load(inputDataPath)
}

assert(ex.getMessage.contains("'with_input_file_name_col' is supported only when 'is_record_sequence' = true"))
assert(ex.getMessage.contains("'with_input_file_name_col' is supported only when one of this holds"))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.spark.cobol.source.regression

import org.apache.spark.sql.functions.col
import org.scalatest.FunSuite
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture

class Test08InputFileName extends FunSuite with SparkTestBase with BinaryFileFixture {

private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)

private val copybook =
""" 01 R.
03 A PIC X(1).
03 B PIC X(2).
"""

val binFileContents: Array[Byte] = Array[Byte](
// File offset start
0x00, 0x00, 0x00, 0x00,
// Records
0xF0.toByte, 0xF1.toByte, 0xF2.toByte,
0xF3.toByte, 0xF4.toByte, 0xF5.toByte,
0xF6.toByte, 0xF7.toByte, 0xF8.toByte,
// File offset end
0x00, 0x00, 0x00, 0x00, 0x00
)

test("Test input data has the input file column and file offsets") {
withTempBinFile("bin_file", ".dat", binFileContents) { tmpFileName =>
val df = spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("with_input_file_name_col", "file")
.option("file_start_offset", "4")
.option("file_end_offset", "5")
.option("schema_retention_policy", "collapse_root")
.load(tmpFileName)
.filter(col("file").contains("bin_file"))

assert(df.count == 3)
}
}

test("Test Cobrix throws an exceptions when conditions for 'schema_retention_policy' are not met ") {
intercept[IllegalArgumentException] {
spark
.read
.format("cobol")
.option("copybook_contents", copybook)
.option("with_input_file_name_col", "file")
.option("schema_retention_policy", "collapse_root")
.load("dummy.dat")
}
}

}

0 comments on commit 3457176

Please sign in to comment.