Skip to content

Commit

Permalink
#69 Add an option to add debug columns to the output schema
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Mar 18, 2020
1 parent 046f78a commit 2b7aac2
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ Again, the full example is available at
| .option("non_terminals", "GROUP1,GROUP2") | Specifies groups to also be added to the schema as string fields. When this option is specified, the reader will add one extra data field after each matching group containing the string data for the group. |
| .option("generate_record_id", false) | Generate autoincremental 'File_Id' and 'Record_Id' fields. This is used for processing record order dependent data. |
| .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length files use `input_file_name()`. |
| .option("debug", "false") | If true, each primitive field will be accompanied by a debug field specifying raw bytes in hexadecimal. |

##### Variable record length files options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ object CopybookParser {
* @param isUtf16BigEndian If true UTF-16 strings are considered big-endian.
* @param floatingPointFormat A format of floating-point numbers (IBM/IEEE754)
* @param nonTerminals A list of non-terminals that should be extracted as strings
* @param isDebug If true, additional debug fields will be added alongside all non-redefined primitives
* @return Seq[Group] where a group is a record inside the copybook
*/
def parseTree(copyBookContents: String,
Expand All @@ -79,7 +80,8 @@ object CopybookParser {
asciiCharset: Charset = StandardCharsets.US_ASCII,
isUtf16BigEndian: Boolean = true,
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
nonTerminals: Seq[String] = Nil): Copybook = {
nonTerminals: Seq[String] = Nil,
isDebug: Boolean = false): Copybook = {
parseTree(EBCDIC,
copyBookContents,
dropGroupFillers,
Expand All @@ -91,7 +93,8 @@ object CopybookParser {
asciiCharset,
isUtf16BigEndian,
floatingPointFormat,
nonTerminals)
nonTerminals,
isDebug)
}

/**
Expand All @@ -110,6 +113,7 @@ object CopybookParser {
* @param isUtf16BigEndian If true UTF-16 strings are considered big-endian.
* @param floatingPointFormat A format of floating-point numbers (IBM/IEEE754)
* @param nonTerminals A list of non-terminals that should be extracted as strings
* @param isDebug If true, additional debug fields will be added alongside all non-redefined primitives
* @return Seq[Group] where a group is a record inside the copybook
*/
@throws(classOf[SyntaxErrorException])
Expand All @@ -124,7 +128,8 @@ object CopybookParser {
asciiCharset: Charset,
isUtf16BigEndian: Boolean,
floatingPointFormat: FloatingPointFormat,
nonTerminals: Seq[String]): Copybook = {
nonTerminals: Seq[String],
isDebug: Boolean): Copybook = {

val schemaANTLR: CopybookAST = ANTLRParser.parse(copyBookContents, enc, stringTrimmingPolicy, commentPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat)

Expand All @@ -137,13 +142,33 @@ object CopybookParser {

new Copybook(
if (dropGroupFillers) {
calculateNonFillerSizes(setSegmentParents(markSegmentRedefines(processGroupFillers(markDependeeFields(
addNonTerminals(calculateBinaryProperties(schemaANTLR), nonTerms, enc, stringTrimmingPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat)
)), segmentRedefines), correctedFieldParentMap))
addDebugFields(
calculateNonFillerSizes(
setSegmentParents(
markSegmentRedefines(
processGroupFillers(
markDependeeFields(
addNonTerminals(
calculateBinaryProperties(schemaANTLR), nonTerms, enc, stringTrimmingPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat)
)
), segmentRedefines), correctedFieldParentMap
)
), isDebug
)
} else {
calculateNonFillerSizes(setSegmentParents(markSegmentRedefines(renameGroupFillers(markDependeeFields(
addNonTerminals(calculateBinaryProperties(schemaANTLR), nonTerms, enc, stringTrimmingPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat)
)), segmentRedefines), correctedFieldParentMap))
addDebugFields(
calculateNonFillerSizes(
setSegmentParents(
markSegmentRedefines(
renameGroupFillers(
markDependeeFields(
addNonTerminals(
calculateBinaryProperties(schemaANTLR), nonTerms, enc, stringTrimmingPolicy, ebcdicCodePage, asciiCharset, isUtf16BigEndian, floatingPointFormat)
)
), segmentRedefines), correctedFieldParentMap
)
), isDebug
)
}
)
}
Expand Down Expand Up @@ -723,6 +748,17 @@ object CopybookParser {
newSchema
}

/**
* Add debugging fields if debug mode is enabled
*
* @param ast An AST as a set of copybook records
* @param addDebuggingFields If true, debugging fields will be added
* @return The same AST with debugging fields added
*/
private def addDebugFields(ast: CopybookAST, addDebuggingFields: Boolean): CopybookAST = {
ast
}

/**
* For each group calculates the number of non-filler items
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ final class FixedLenNestedReader(copyBookContents: Seq[String],
asciiCharset,
readerProperties.isUtf16BigEndian,
floatingPointFormat,
nonTerminals)
nonTerminals,
readerProperties.isDebug)
else
Copybook.merge(
copyBookContents.map(
Expand All @@ -119,7 +120,8 @@ final class FixedLenNestedReader(copyBookContents: Seq[String],
asciiCharset,
readerProperties.isUtf16BigEndian,
floatingPointFormat,
nonTerminals)
nonTerminals,
readerProperties.isDebug)
)
)
new CobolSchema(schema, schemaRetentionPolicy, "",false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import za.co.absa.cobrix.spark.cobol.schema.SchemaRetentionPolicy.SchemaRetentio
* @param commentPolicy A comment truncation policy
* @param dropGroupFillers If true the parser will drop all FILLER fields, even GROUP FILLERS that have non-FILLER nested fields
* @param nonTerminals A list of non-terminals (GROUPS) to combine and parse as primitive fields
* @param isDebug If true, additional debugging fields will be added
* @param recordHeaderParser A parser used to parse data field record headers
* @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser
* @param inputFileNameColumn A column name to add to the dataframe. The column will contain input file name for each record similar to 'input_file_name()' function
Expand Down Expand Up @@ -85,6 +86,7 @@ case class ReaderParameters(
commentPolicy: CommentPolicy = CommentPolicy(),
dropGroupFillers: Boolean = false,
nonTerminals: Seq[String] = Nil,
isDebug: Boolean = false,
recordHeaderParser: Option[String] = None,
rhpAdditionalInfo: Option[String] = None,
inputFileNameColumn: String = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ final class VarLenNestedReader(copybookContents: Seq[String],
asciiCharset,
readerProperties.isUtf16BigEndian,
readerProperties.floatingPointFormat,
readerProperties.nonTerminals)
readerProperties.nonTerminals,
readerProperties.isDebug)
else
Copybook.merge(copyBookContents.map(
CopybookParser.parseTree(encoding,
Expand All @@ -171,7 +172,8 @@ final class VarLenNestedReader(copybookContents: Seq[String],
asciiCharset,
readerProperties.isUtf16BigEndian,
readerProperties.floatingPointFormat,
nonTerminals = readerProperties.nonTerminals)
nonTerminals = readerProperties.nonTerminals,
readerProperties.isDebug)
))
val segIdFieldCount = readerProperties.multisegment.map(p => p.segmentLevelIds.size).getOrElse(0)
val segmentIdPrefix = readerProperties.multisegment.map(p => p.segmentIdPrefix).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class DefaultSource
parameters.commentPolicy,
parameters.dropGroupFillers,
parameters.nonTerminals,
parameters.isDebug,
varLenParams.recordHeaderParser,
varLenParams.rhpAdditionalInfo,
varLenParams.inputFileNameColumn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import za.co.absa.cobrix.spark.cobol.schema.SchemaRetentionPolicy.SchemaRetentio
* @param commentPolicy A comment truncation policy
* @param dropGroupFillers If true the parser will drop all FILLER fields, even GROUP FILLERS that have non-FILLER nested fields
* @param nonTerminals A list of non-terminals (GROUPS) to combine and parse as primitive fields
* @param isDebug If true, additional debugging fields will be added
* @param debugIgnoreFileSize If true the fixed length file reader won't check file size divisibility. Useful for debugging binary file / copybook mismatches.
*/
case class CobolParameters(
Expand All @@ -66,5 +67,6 @@ case class CobolParameters(
commentPolicy: CommentPolicy,
dropGroupFillers: Boolean,
nonTerminals: Seq[String],
isDebug: Boolean,
debugIgnoreFileSize: Boolean
)
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ object CobolParametersParser {
val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy"
val PARAM_GROUP_FILLERS = "drop_group_fillers"
val PARAM_GROUP_NOT_TERMINALS = "non_terminals"
val PARAM_DEBUG = "debug"

// General parsing parameters
val PARAM_TRUNCATE_COMMENTS = "truncate_comments"
Expand Down Expand Up @@ -206,6 +207,7 @@ object CobolParametersParser {
parseCommentTruncationPolicy(params),
params.getOrElse(PARAM_GROUP_FILLERS, "false").toBoolean,
params.getOrElse(PARAM_GROUP_NOT_TERMINALS, "").split(','),
params.getOrElse(PARAM_DEBUG, "false").toBoolean,
params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean
)
validateSparkCobolOptions(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Test21VariableOccurs extends FunSuite with SparkTestBase {
val inputStream = new FSStream(s"$inputDataPath/data.dat")
val copybookContents = Files.readAllLines(Paths.get("../data/test21_copybook.cob"), StandardCharsets.ISO_8859_1).toArray.mkString("\n")
val copybook = CopybookParser.parseTree(ASCII, copybookContents, true, Nil, HashMap[String, String](), StringTrimmingPolicy.TrimBoth,
CommentPolicy(), new CodePageCommon, StandardCharsets.US_ASCII, true, FloatingPointFormat.IBM, Nil)
CommentPolicy(), new CodePageCommon, StandardCharsets.US_ASCII, true, FloatingPointFormat.IBM, Nil, false)
val recordExtractor = new VarOccursRecordExtractor(inputStream, copybook)

val expectedRecords = ListBuffer(Array(48.toByte),
Expand Down

0 comments on commit 2b7aac2

Please sign in to comment.