From 8ce15a52c098fb3fc31159283336b0670c4e52c4 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 4 May 2023 08:49:07 +0200 Subject: [PATCH] #613 Add the ability for record extractors to hav access to the full file stream so they can use it for extracting headers and footers. --- .../cobol/reader/VarLenNestedReader.scala | 34 +++++---- .../cobrix/cobol/reader/VarLenReader.scala | 16 ++-- .../raw/FixedBlockRawRecordExtractor.scala | 12 +-- .../extractors/raw/RawRecordContext.scala | 9 ++- .../raw/TextFullRecordExtractor.scala | 10 ++- .../extractors/raw/TextRecordExtractor.scala | 8 +- .../raw/VarOccursRecordExtractor.scala | 10 ++- ...VariableBlockVariableRecordExtractor.scala | 12 +-- .../cobol/mock/RecordExtractorMock.scala | 8 +- .../mock/RecordExtractorReadAhaedMock.scala | 10 +-- .../cobol/reader/SparseIndexSpecSpec.scala | 73 +++++++++++-------- .../cobol/reader/VarLenNestedReaderSpec.scala | 42 ++++++----- .../FixedBlockRawRecordExtractorSuite.scala | 3 +- .../raw/RawRecordContextFactory.scala | 3 +- ...bleBlockVariableRecordExtractorSuite.scala | 3 +- .../reader/iterator/VRLRecordReaderSpec.scala | 3 +- .../cobol/reader/VarLenNestedReader.scala | 12 +-- .../spark/cobol/reader/VarLenReader.scala | 12 ++- .../cobol/source/scanners/CobolScanners.scala | 7 +- .../mocks/CustomRecordExtractorMock.scala | 10 ++- ...tomRecordExtractorWithFileHeaderMock.scala | 51 +++++++++++++ .../integration/Test21VariableOccurs.scala | 3 +- .../integration/Test25OccursMappings.scala | 3 +- .../Test26CustomRecordExtractor.scala | 25 ++++++- 24 files changed, 254 insertions(+), 125 deletions(-) create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorWithFileHeaderMock.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 351ec8e8..21ca3df0 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -60,7 +60,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], checkInputArgumentsValidity() def recordExtractor(startingRecordNumber: Long, - binaryData: SimpleStream, + dataStream: SimpleStream, + headerStream: SimpleStream, copybook: Copybook ): Option[RawRecordExtractor] = { val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment) @@ -71,7 +72,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment)) val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams)) - val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo) + val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo) readerProperties.recordExtractor match { case Some(recordExtractorClass) => @@ -104,26 +105,27 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], override def isRdwBigEndian: Boolean = readerProperties.isRdwBigEndian - override def getRecordIterator(binaryData: SimpleStream, + override def getRecordIterator(dataStream: SimpleStream, + headerStream: SimpleStream, startingFileOffset: Long, fileNumber: Int, startingRecordIndex: Long): Iterator[Seq[Any]] = if (cobolSchema.copybook.isHierarchical) { new VarLenHierarchicalIterator(cobolSchema.copybook, - binaryData, + dataStream, readerProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), fileNumber, startingRecordIndex, startingFileOffset, handler) } else { new VarLenNestedIterator(cobolSchema.copybook, - binaryData, + dataStream, readerProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), fileNumber, startingRecordIndex, startingFileOffset, @@ -135,12 +137,14 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], * Traverses the data sequentially as fast as possible to generate record index. * This index will be used to distribute workload of the conversion. * - * @param binaryData A stream of input binary data - * @param fileNumber A file number uniquely identified a particular file of the data set + * @param dataStream A stream of input binary data + * @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing + * to a record in the middle. + * @param fileNumber A file number uniquely identified a particular file of the data set * @return An index of the file - * */ - override def generateIndex(binaryData: SimpleStream, + override def generateIndex(dataStream: SimpleStream, + headerStream: SimpleStream, fileNumber: Int, isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] = { val inputSplitSizeRecords: Option[Int] = readerProperties.inputSplitRecords @@ -173,10 +177,10 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], segmentIdField match { case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber, - binaryData, + dataStream, readerProperties.fileStartOffset, recordHeaderParser, - recordExtractor(0L, binaryData, copybook), + recordExtractor(0L, dataStream, headerStream, copybook), inputSplitSizeRecords, inputSplitSizeMB, Some(copybook), @@ -184,10 +188,10 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], isHierarchical, segmentIdValue) case None => IndexGenerator.sparseIndexGenerator(fileNumber, - binaryData, + dataStream, readerProperties.fileStartOffset, recordHeaderParser, - recordExtractor(0L, binaryData, copybook), + recordExtractor(0L, dataStream, headerStream, copybook), inputSplitSizeRecords, inputSplitSizeMB, None, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenReader.scala index 16d65d49..eaeeb74e 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenReader.scala @@ -33,14 +33,17 @@ abstract class VarLenReader extends Reader with Serializable { /** * Returns a file iterator between particular offsets. This is for faster traversal of big binary files * - * @param binaryData A stream positioned at the beginning of the intended file portion to read + * @param dataStream A stream positioned at the beginning of the intended file portion to read + * @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing + * to a record in the middle. * @param startingFileOffset An offset of the file where parsing should be started * @param fileNumber A file number uniquely identified a particular file of the data set * @param startingRecordIndex A starting record index of the data * @return An iterator of Spark Row objects * */ - def getRecordIterator(binaryData: SimpleStream, + def getRecordIterator(dataStream: SimpleStream, + headerStream: SimpleStream, startingFileOffset: Long, fileNumber: Int, startingRecordIndex: Long): Iterator[Seq[Any]] @@ -49,12 +52,15 @@ abstract class VarLenReader extends Reader with Serializable { * Traverses the data sequentially as fast as possible to generate record index. * This index will be used to distribute workload of the conversion. * - * @param binaryData A stream of input binary data - * @param fileNumber A file number uniquely identified a particular file of the data set + * @param dataStream A stream of input binary data + * @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing + * to a record in the middle. + * @param fileNumber A file number uniquely identified a particular file of the data set * @return An index of the file * */ - def generateIndex(binaryData: SimpleStream, + def generateIndex(dataStream: SimpleStream, + headerStream: SimpleStream, fileNumber: Int, isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala index f8b35554..604ca0b3 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala @@ -19,12 +19,14 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import scala.collection.mutable class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockParameters) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + private val recordQueue = new mutable.Queue[Array[Byte]] private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize) private val bdwSize = fbParams.blockLength.orElse(fbParams.recordsPerBlock.map(_ * recordSize)) - override def offset: Long = ctx.inputStream.offset + override def offset: Long = ctx.dataStream.offset override def hasNext: Boolean = { if (recordQueue.isEmpty) { @@ -34,17 +36,17 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa } private def readNextBlock(): Unit = { - if (!ctx.inputStream.isEndOfStream) { - var bdwOffset = ctx.inputStream.offset + if (!ctx.dataStream.isEndOfStream) { + var bdwOffset = ctx.dataStream.offset val nextBlockSize = bdwSize.getOrElse({ - val bdw = ctx.inputStream.next(ctx.bdwDecoder.headerSize) + val bdw = ctx.dataStream.next(ctx.bdwDecoder.headerSize) val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset) bdwOffset += ctx.bdwDecoder.headerSize blockLength }) - val blockBuffer = ctx.inputStream.next(nextBlockSize) + val blockBuffer = ctx.dataStream.next(nextBlockSize) var blockIndex = 0 diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala index d1e43747..ca014ec0 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContext.scala @@ -22,13 +22,18 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream /** * @param startingRecordNumber A record number the input stream is pointing to (zero-based). - * @param inputStream An input stream pointing to the beginning of a file or a record in a file. + * @param dataStream An input stream pointing to the beginning of a file or a record in a file. The + * record extractor should close the stream when the end of file is reached. + * @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing + * to a record in the middle. The record extractor should close the stream when it + * is no longer needed. * @param copybook A copybook of the input stream. * @param additionalInfo A string provided by a client for the raw record extractor. */ case class RawRecordContext( startingRecordNumber: Long, - inputStream: SimpleStream, + dataStream: SimpleStream, + headerStream: SimpleStream, copybook: Copybook, rdwDecoder: RecordHeaderDecoder, bdwDecoder: RecordHeaderDecoder, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextFullRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextFullRecordExtractor.scala index 67d38226..286925b6 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextFullRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextFullRecordExtractor.scala @@ -30,6 +30,8 @@ import java.util * Hopefully, comments will help anyone reading this. */ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + private val recordSize = ctx.copybook.getRecordSize // Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF). @@ -64,13 +66,13 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R fetchNextRecord() } - override def offset: Long = ctx.inputStream.offset - pendingBytesSize + override def offset: Long = ctx.dataStream.offset - pendingBytesSize // This method ensures that pendingBytes contains the specified number of bytes read from the input stream private def ensureBytesRead(numOfBytes: Int): Unit = { val bytesToRead = numOfBytes - pendingBytesSize if (bytesToRead > 0) { - val newBytes = ctx.inputStream.next(bytesToRead) + val newBytes = ctx.dataStream.next(bytesToRead) if (newBytes.length > 0) { System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length) pendingBytesSize = pendingBytesSize + newBytes.length @@ -133,7 +135,7 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R System.arraycopy(pendingBytes, recordLength + i, pendingBytes, recordLength, size - i) pendingBytesSize -= i } - endOfStream = ctx.inputStream.isEndOfStream + endOfStream = ctx.dataStream.isEndOfStream if (!found && !endOfStream) { ensureBytesRead(maxRecordSize) } @@ -153,7 +155,7 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R } else { // Last record or a record is too large? // In the latter case - if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) { + if (pendingBytesSize <= recordSize && ctx.dataStream.isEndOfStream) { // Last record curRecordSize = pendingBytesSize curPayloadSize = pendingBytesSize diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala index 3bd3a7bd..989912de 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/TextRecordExtractor.scala @@ -30,6 +30,8 @@ import java.util * Hopefully, comments will help anyone reading this. */ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + private val recordSize = ctx.copybook.getRecordSize // Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF). @@ -64,13 +66,13 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe fetchNextRecord() } - override def offset: Long = ctx.inputStream.offset - pendingBytesSize + override def offset: Long = ctx.dataStream.offset - pendingBytesSize // This method ensures that pendingBytes contains the specified number of bytes read from the input stream private def ensureBytesRead(numOfBytes: Int): Unit = { val bytesToRead = numOfBytes - pendingBytesSize if (bytesToRead > 0) { - val newBytes = ctx.inputStream.next(bytesToRead) + val newBytes = ctx.dataStream.next(bytesToRead) if (newBytes.length > 0) { System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length) pendingBytesSize = pendingBytesSize + newBytes.length @@ -126,7 +128,7 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe } else { // Last record or a record is too large? // In the latter case - if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) { + if (pendingBytesSize <= recordSize && ctx.dataStream.isEndOfStream) { // Last record curRecordSize = pendingBytesSize curPayloadSize = pendingBytesSize diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractor.scala index dd2db1d1..a4a0f74a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VarOccursRecordExtractor.scala @@ -28,13 +28,15 @@ import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement} * determined neither from the copybook nor from record headers. */ class VarOccursRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + private val maxRecordSize = ctx.copybook.getRecordSize private val ast = ctx.copybook.ast private val hasVarSizeOccurs = copybookHasVarSizedOccurs private val bytes = new Array[Byte](maxRecordSize) private var bytesSize = 0 - override def hasNext: Boolean = ctx.inputStream.offset < ctx.inputStream.size + override def hasNext: Boolean = ctx.dataStream.offset < ctx.dataStream.size override def next(): Array[Byte] = { if (hasVarSizeOccurs) { @@ -42,11 +44,11 @@ class VarOccursRecordExtractor(ctx: RawRecordContext) extends Serializable with util.Arrays.fill(bytes, 0.toByte) extractVarOccursRecordBytes() } else { - ctx.inputStream.next(maxRecordSize) + ctx.dataStream.next(maxRecordSize) } } - def offset: Long = ctx.inputStream.offset + def offset: Long = ctx.dataStream.offset private def extractVarOccursRecordBytes(): Array[Byte] = { val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]] @@ -138,7 +140,7 @@ class VarOccursRecordExtractor(ctx: RawRecordContext) extends Serializable with private def ensureBytesRead(numOfBytes: Int): Unit = { val bytesToRead = numOfBytes - bytesSize if (bytesToRead > 0) { - val newBytes = ctx.inputStream.next(bytesToRead) + val newBytes = ctx.dataStream.next(bytesToRead) if (newBytes.length > 0) { System.arraycopy(newBytes, 0, bytes, bytesSize, newBytes.length) bytesSize = numOfBytes diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala index f0189a27..ac26ed40 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractor.scala @@ -19,9 +19,11 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw import scala.collection.mutable class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + ctx.headerStream.close() + private val recordQueue = new mutable.Queue[Array[Byte]] private var canSplitAtCurrentOffset = true - private var recordOffset: Long = ctx.inputStream.offset + private var recordOffset: Long = ctx.dataStream.offset override def offset: Long = recordOffset @@ -38,12 +40,12 @@ class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serial val bdwSize = ctx.bdwDecoder.headerSize val rdwSize = ctx.rdwDecoder.headerSize - if (!ctx.inputStream.isEndOfStream) { - val bdwOffset = ctx.inputStream.offset - val bdw = ctx.inputStream.next(bdwSize) + if (!ctx.dataStream.isEndOfStream) { + val bdwOffset = ctx.dataStream.offset + val bdw = ctx.dataStream.next(bdwSize) val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset) - val blockBuffer = ctx.inputStream.next(blockLength) + val blockBuffer = ctx.dataStream.next(blockLength) var blockIndex = 0 diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorMock.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorMock.scala index 60e504cf..25f916e9 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorMock.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorMock.scala @@ -19,14 +19,14 @@ package za.co.absa.cobrix.cobol.mock import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor} class RecordExtractorMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { - override def offset: Long = ctx.inputStream.offset + override def offset: Long = ctx.dataStream.offset - override def hasNext: Boolean = !ctx.inputStream.isEndOfStream + override def hasNext: Boolean = !ctx.dataStream.isEndOfStream override def next(): Array[Byte] = { - val header = ctx.inputStream.next(2) + val header = ctx.dataStream.next(2) if (header.length == 2) { - ctx.inputStream.next(header.head + header(1) * 256) + ctx.dataStream.next(header.head + header(1) * 256) } else { Array.empty[Byte] } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorReadAhaedMock.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorReadAhaedMock.scala index cc21c63c..7fbec32b 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorReadAhaedMock.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/RecordExtractorReadAhaedMock.scala @@ -19,16 +19,16 @@ package za.co.absa.cobrix.cobol.mock import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor} class RecordExtractorReadAhaedMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { - ctx.inputStream.next(2) + ctx.dataStream.next(2) - override def offset: Long = ctx.inputStream.offset + override def offset: Long = ctx.dataStream.offset - override def hasNext: Boolean = !ctx.inputStream.isEndOfStream + override def hasNext: Boolean = !ctx.dataStream.isEndOfStream override def next(): Array[Byte] = { - val header = ctx.inputStream.next(2) + val header = ctx.dataStream.next(2) if (header.length == 2) { - ctx.inputStream.next(header.head + header(1) * 256) + ctx.dataStream.next(header.head + header(1) * 256) } else { Array.empty[Byte] } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala index e79474d8..15f98f93 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/SparseIndexSpecSpec.scala @@ -58,13 +58,14 @@ class SparseIndexSpecSpec extends AnyWordSpec { val segmentIdRootValue = "1" "Generate a sparse index for ASCII text data with partial records allowed" in { - val stream = new TestStringStream(textFileContent) + val dataStream = new TestStringStream(textFileContent) + val headerStream = new TestStringStream(textFileContent) val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0) - val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, "")) + val recordExtractor = new TextRecordExtractor(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, + val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue) assert(indexes.length == 4) @@ -79,13 +80,14 @@ class SparseIndexSpecSpec extends AnyWordSpec { } "Generate a sparse index for ASCII text data with partial records not allowed" in { - val stream = new TestStringStream(textFileContent) + val dataStream = new TestStringStream(textFileContent) + val headerStream = new TestStringStream(textFileContent) val recordHeaderParser = RecordHeaderParserFactory.createRecordHeaderParser(Constants.RhRdwLittleEndian, 0, 0, 0, 0) - val recordExtractor = new TextFullRecordExtractor(RawRecordContext(0L, stream, copybook, null, null, "")) + val recordExtractor = new TextFullRecordExtractor(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, + val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = recordHeaderParser, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(2), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = Some(segmentIdField), isHierarchical = true, rootSegmentId = segmentIdRootValue) assert(indexes.length == 4) @@ -100,11 +102,12 @@ class SparseIndexSpecSpec extends AnyWordSpec { } "Generate a sparse index for a empty file and record extractor" in { - val stream = new TestByteStream(Array.empty[Byte]) + val dataStream = new TestStringStream(textFileContent) + val headerStream = new TestStringStream(textFileContent) - val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, stream, copybook, null, null, "")) + val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, + val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = None, isHierarchical = false) assert(indexes.length == 1) @@ -137,15 +140,18 @@ class SparseIndexSpecSpec extends AnyWordSpec { } "Generate a sparse index for a data with Custom record parser" in { - val stream = new TestByteStream(Array(0x02, 0x00, 0xF1, 0xF2, // record 0 - 0x01, 0x00, 0xF3, // record 1 - 0x02, 0x00, 0xF4, 0xF5 // record 2 - ).map(_.toByte) ) + val data = Array(0x02, 0x00, 0xF1, 0xF2, // record 0 + 0x01, 0x00, 0xF3, // record 1 + 0x02, 0x00, 0xF4, 0xF5 // record 2 + ).map(_.toByte) + + val dataStream = new TestByteStream(data) + val headerStream = new TestByteStream(data) - val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, stream, copybook, null, null, "")) + val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) recordExtractor.onReceiveAdditionalInfo("dummy") - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 0L, + val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = None, isHierarchical = false) assert(indexes.length == 2) @@ -158,21 +164,24 @@ class SparseIndexSpecSpec extends AnyWordSpec { } "Generate a sparse index for a data with Custom record parser with a file start offset" in { - val stream = new TestByteStream(Array(0xF0, 0xF0, // header to skip - 0x02, 0x00, 0xF1, 0xF2, // record 0 - 0x01, 0x00, 0xF3, // record 1 - 0x02, 0x00, 0xF4, 0xF5, // record 2 - 0x02, 0x00, 0xF6, 0xF7, // record 3 - 0x01 // Invalid header - ).map(_.toByte)) + val data = Array(0xF0, 0xF0, // header to skip + 0x02, 0x00, 0xF1, 0xF2, // record 0 + 0x01, 0x00, 0xF3, // record 1 + 0x02, 0x00, 0xF4, 0xF5, // record 2 + 0x02, 0x00, 0xF6, 0xF7, // record 3 + 0x01 // Invalid header + ).map(_.toByte) + + val dataStream = new TestByteStream(data) + val headerStream = new TestByteStream(data) // Skip the first 2 bytes to the file offset - stream.next(2) + dataStream.next(2) - val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, stream, copybook, null, null, "")) + val recordExtractor = new RecordExtractorMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) - val indexes = IndexGenerator.sparseIndexGenerator(0, stream, 2L, + val indexes = IndexGenerator.sparseIndexGenerator(0, dataStream, 2L, recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = None, isHierarchical = false) assert(indexes.length == 4) @@ -191,15 +200,18 @@ class SparseIndexSpecSpec extends AnyWordSpec { } "Throws an exception if the record extractor reads data in constructor" in { - val stream = new TestByteStream(Array(0xF0, 0xF0, // header to skip - 0x02, 0x00, 0xF1, 0xF2 // record 0 - ).map(_.toByte)) + val data = Array(0xF0, 0xF0, // header to skip + 0x02, 0x00, 0xF1, 0xF2 // record 0 + ).map(_.toByte) - val recordExtractor = new RecordExtractorReadAhaedMock(RawRecordContext(0L, stream, copybook, null, null, "")) + val dataStream = new TestByteStream(data) + val headerStream = new TestByteStream(data) + + val recordExtractor = new RecordExtractorReadAhaedMock(RawRecordContext(0L, dataStream, headerStream, copybook, null, null, "")) val ex = intercept[IllegalStateException] { - IndexGenerator.sparseIndexGenerator(0, stream, 0L, + IndexGenerator.sparseIndexGenerator(0, dataStream, 0L, recordHeaderParser = null, recordExtractor = Some(recordExtractor), recordsPerIndexEntry = Some(1), sizePerIndexEntryMB = None, copybook = Some(copybook), segmentField = None, isHierarchical = false) } @@ -208,5 +220,4 @@ class SparseIndexSpecSpec extends AnyWordSpec { } - } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReaderSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReaderSpec.scala index 8bd71fdb..c1727971 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReaderSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReaderSpec.scala @@ -51,9 +51,9 @@ class VarLenNestedReaderSpec extends AnyWordSpec { "generateIndex" should { "work for fixed length flat files" in { - val (reader, stream) = getFlatUseCase(fixedLengthDataExample) + val (reader, dataStream, headerStream) = getFlatUseCase(fixedLengthDataExample) - val index = reader.generateIndex(stream, 0, isRdwBigEndian = false) + val index = reader.generateIndex(dataStream, headerStream, 0, isRdwBigEndian = false) assert(index.length == 3) assert(index(0).offsetFrom == 0) @@ -69,9 +69,9 @@ class VarLenNestedReaderSpec extends AnyWordSpec { } "work for fixed multi-copybook length flat files" in { - val (reader, stream) = getFlatUseCase(fixedLengthDataExample, multiCopyBook = true) + val (reader, dataStream, headerStream) = getFlatUseCase(fixedLengthDataExample, multiCopyBook = true) - val index = reader.generateIndex(stream, 0, isRdwBigEndian = false) + val index = reader.generateIndex(dataStream, headerStream, 0, isRdwBigEndian = false) assert(index.length == 7) assert(index(0).offsetFrom == 0) @@ -83,9 +83,9 @@ class VarLenNestedReaderSpec extends AnyWordSpec { } "work for files with little-endian RDWs" in { - val (reader, stream) = getFlatUseCase(rdwLittleEndianExample, hasRDW = true) + val (reader, dataStream, headerStream) = getFlatUseCase(rdwLittleEndianExample, hasRDW = true) - val index = reader.generateIndex(stream, 0, isRdwBigEndian = false) + val index = reader.generateIndex(dataStream, headerStream, 0, isRdwBigEndian = false) assert(index.length == 2) assert(index(0).offsetFrom == 0) @@ -97,9 +97,9 @@ class VarLenNestedReaderSpec extends AnyWordSpec { } "work for files with big-endian RDWs" in { - val (reader, stream) = getFlatUseCase(rdwBigEndianExample, hasRDW = true, isRdwBigEndian = true) + val (reader, dataStream, headerStream) = getFlatUseCase(rdwBigEndianExample, hasRDW = true, isRdwBigEndian = true) - val index = reader.generateIndex(stream, 0, isRdwBigEndian = true) + val index = reader.generateIndex(dataStream, headerStream, 0, isRdwBigEndian = true) assert(index.length == 2) assert(index(0).offsetFrom == 0) @@ -121,9 +121,9 @@ class VarLenNestedReaderSpec extends AnyWordSpec { 0x04, 0x00, // RDW header 0xF7, 0xF8, 0xF9, 0xF0 // record 2 ).map(_.toByte) - val (reader, stream) = getFlatUseCase(data, recordHeaderParser = recordHeaderParserClass) + val (reader, dataStream, headerStream) = getFlatUseCase(data, recordHeaderParser = recordHeaderParserClass) - val index = reader.generateIndex(stream, 0, isRdwBigEndian = true) + val index = reader.generateIndex(dataStream, headerStream, 0, isRdwBigEndian = true) assert(index.length == 2) assert(index(0).offsetFrom == 0) @@ -135,9 +135,9 @@ class VarLenNestedReaderSpec extends AnyWordSpec { } "work for hierarchical files" in { - val (reader, stream) = getHierarchicalUseCase + val (reader, dataStream, headerStream) = getHierarchicalUseCase - val index = reader.generateIndex(stream, 0, isRdwBigEndian = false) + val index = reader.generateIndex(dataStream, headerStream, 0, isRdwBigEndian = false) assert(index.length == 2) assert(index(0).offsetFrom == 0) @@ -151,17 +151,17 @@ class VarLenNestedReaderSpec extends AnyWordSpec { "getRecordIterator" should { "work for nested records" in { - val (reader, stream) = getFlatUseCase(fixedLengthDataExample) + val (reader, dataStream, headerStream) = getFlatUseCase(fixedLengthDataExample) - val it = reader.getRecordIterator(stream, 0, 0, 0) + val it = reader.getRecordIterator(dataStream, headerStream, 0, 0, 0) assert(it.isInstanceOf[VarLenNestedIterator[scala.Array[Any]]]) } "work for hierarchical records" in { - val (reader, stream) = getHierarchicalUseCase + val (reader, dataStream, headerStream) = getHierarchicalUseCase - val it = reader.getRecordIterator(stream, 0, 0, 0) + val it = reader.getRecordIterator(dataStream, headerStream, 0, 0, 0) assert(it.isInstanceOf[VarLenHierarchicalIterator[scala.Array[Any]]]) } @@ -173,7 +173,7 @@ class VarLenNestedReaderSpec extends AnyWordSpec { hasRDW: Boolean = false, isRdwBigEndian: Boolean = false, multiCopyBook: Boolean = false - ): (VarLenNestedReader[scala.Array[Any]], SimpleStream) = { + ): (VarLenNestedReader[scala.Array[Any]], SimpleStream, SimpleStream) = { val copybookContents: String = """ 01 RECORD. | 05 A PIC X(2). @@ -206,14 +206,15 @@ class VarLenNestedReaderSpec extends AnyWordSpec { } val dataStream = new TestByteStream(data) + val headerStreat = new TestByteStream(data) val reader = new VarLenNestedReader[scala.Array[Any]]( copybooks, readerProperties, new SimpleRecordHandler) - (reader, dataStream) + (reader, dataStream, headerStreat) } - def getHierarchicalUseCase: (VarLenNestedReader[scala.Array[Any]], SimpleStream) = { + def getHierarchicalUseCase: (VarLenNestedReader[scala.Array[Any]], SimpleStream, SimpleStream) = { val copybookContents: String = """ 01 RECORD. | 05 SEGMENT PIC X(1). @@ -243,11 +244,12 @@ class VarLenNestedReaderSpec extends AnyWordSpec { val data = "P123\nP456\nC78\nC90\nP876\n" val dataStream = new TestStringStream(data) + val headerStream = new TestStringStream(data) val reader = new VarLenNestedReader[scala.Array[Any]]( Seq(copybookContents), readerProperties, new SimpleRecordHandler) - (reader, dataStream) + (reader, dataStream, headerStream) } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala index 43aa494f..1bdcf016 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala @@ -167,11 +167,12 @@ class FixedBlockRawRecordExtractorSuite extends AnyWordSpec { private def getRawRecordContext(bytes: Array[Byte]): RawRecordContext = { val ibs = new TestByteStream(bytes) + val hbs = new TestByteStream(bytes) val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) - RawRecordContext(0, ibs, copybook, rdwDecoder, bdwDecoder, "") + RawRecordContext(0, ibs, hbs, copybook, rdwDecoder, bdwDecoder, "") } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala index 00361477..3290c012 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/RawRecordContextFactory.scala @@ -31,12 +31,13 @@ object RawRecordContextFactory { def getDummyRawRecordContext( startingRecordNumber: Long = 0L, inputStream: SimpleStream = new TestStringStream("A1\nB2\n"), + headerStream: SimpleStream = new TestStringStream("A1\nB2\n"), copybook: Copybook = copybook, rdwDecoder: RecordHeaderDecoder = new RecordHeaderDecoderBdw(RecordHeaderParameters(isBigEndian = false, 0)), bdwDecoder: RecordHeaderDecoder = new RecordHeaderDecoderRdw(RecordHeaderParameters(isBigEndian = false, 0)), additionalInfo: String = "" ): RawRecordContext = { - RawRecordContext(startingRecordNumber, inputStream, copybook, rdwDecoder, bdwDecoder, additionalInfo) + RawRecordContext(startingRecordNumber, inputStream, headerStream, copybook, rdwDecoder, bdwDecoder, additionalInfo) } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala index 1e3677f2..3a047678 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/VariableBlockVariableRecordExtractorSuite.scala @@ -146,11 +146,12 @@ class VariableBlockVariableRecordExtractorSuite extends AnyWordSpec { rdwAdjustment: Int ): RawRecordContext = { val ibs = new TestByteStream(bytes) + val hbs = new TestByteStream(bytes) val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(bdwBigEndian, bdwAdjustment)) val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(rdwBigEndian, rdwAdjustment)) - RawRecordContext(0, ibs, copybook, rdwDecoder, bdwDecoder, "") + RawRecordContext(0, ibs, hbs, copybook, rdwDecoder, bdwDecoder, "") } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala index 56b48954..a25af1c5 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/iterator/VRLRecordReaderSpec.scala @@ -88,7 +88,8 @@ class VRLRecordReaderSpec extends AnyWordSpec { } "work for custom record extractor" in { - val context = RawRecordContext(0, new ByteStreamMock(customHeaderRecords), null, null, null, "") + val stream = new ByteStreamMock(customHeaderRecords) + val context = RawRecordContext(0, stream, stream, null, null, null, "") val reader = getUseCase( records = customHeaderRecords, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala index d152b532..463d140e 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala @@ -49,18 +49,18 @@ final class VarLenNestedReader(copybookContents: Seq[String], override def getSparkSchema: StructType = getCobolSchema.getSparkSchema - - override def getRowIterator(binaryData: SimpleStream, + override def getRowIterator(dataStream: SimpleStream, + headerStream: SimpleStream, startingFileOffset: Long, fileNumber: Int, startingRecordIndex: Long): Iterator[Row] = if (cobolSchema.copybook.isHierarchical) { new RowIterator( new VarLenHierarchicalIterator(cobolSchema.copybook, - binaryData, + dataStream, getReaderProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), fileNumber, startingRecordIndex, startingFileOffset, @@ -69,10 +69,10 @@ final class VarLenNestedReader(copybookContents: Seq[String], } else { new RowIterator( new VarLenNestedIterator(cobolSchema.copybook, - binaryData, + dataStream, getReaderProperties, recordHeaderParser, - recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook), + recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook), fileNumber, startingRecordIndex, startingFileOffset, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala index 332c8b68..81f5cecd 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala @@ -36,6 +36,8 @@ trait VarLenReader extends Reader with Serializable { * Returns a file iterator between particular offsets. This is for faster traversal of big binary files * * @param binaryData A stream positioned at the beginning of the intended file portion to read + * @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing + * to a record in the middle. * @param startingFileOffset An offset of the file where parsing should be started * @param fileNumber A file number uniquely identified a particular file of the data set * @param startingRecordIndex A starting record index of the data @@ -43,6 +45,7 @@ trait VarLenReader extends Reader with Serializable { * */ @throws(classOf[Exception]) def getRowIterator(binaryData: SimpleStream, + headerStream: SimpleStream, startingFileOffset: Long, fileNumber: Int, startingRecordIndex: Long): Iterator[Row] @@ -51,12 +54,15 @@ trait VarLenReader extends Reader with Serializable { * Traverses the data sequentially as fast as possible to generate record index. * This index will be used to distribute workload of the conversion. * - * @param binaryData A stream of input binary data - * @param fileNumber A file number uniquely identified a particular file of the data set + * @param dataStream A stream of input binary data + * @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing + * to a record in the middle. + * @param fileNumber A file number uniquely identified a particular file of the data set * @return An index of the file * */ - @throws(classOf[Exception]) def generateIndex(binaryData: SimpleStream, + @throws(classOf[Exception]) def generateIndex(dataStream: SimpleStream, + headerStream: SimpleStream, fileNumber: Int, isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala index 97a92199..05027208 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/scanners/CobolScanners.scala @@ -50,7 +50,8 @@ private[source] object CobolScanners extends Logging { logger.info(s"Going to process offsets ${indexEntry.offsetFrom}...${indexEntry.offsetTo} ($numOfBytesMsg) of $fileName") val dataStream = new FileStreamer(filePathName, fileSystem, indexEntry.offsetFrom, numOfBytes) - reader.getRowIterator(dataStream, indexEntry.offsetFrom, indexEntry.fileId, indexEntry.recordIndex) + val headerStream = new FileStreamer(filePathName, fileSystem) + reader.getRowIterator(dataStream, headerStream, indexEntry.offsetFrom, indexEntry.fileId, indexEntry.recordIndex) }) } @@ -68,7 +69,9 @@ private[source] object CobolScanners extends Logging { val fileSystem = path.getFileSystem(sconf.value) logger.info(s"Going to parse file: $filePath") - reader.getRowIterator(new FileStreamer(filePath, fileSystem), 0L, fileOrder, 0L) + val dataStream = new FileStreamer(filePath, fileSystem) + val headerStream = new FileStreamer(filePath, fileSystem) + reader.getRowIterator(dataStream, headerStream, 0L, fileOrder, 0L) } ) }) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala index aebd98c7..896ef2c6 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorMock.scala @@ -25,12 +25,13 @@ import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecor */ class CustomRecordExtractorMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { CustomRecordExtractorMock.additionalInfo = ctx.additionalInfo + CustomRecordExtractorMock.catchContext = ctx private var recordNumber = ctx.startingRecordNumber - override def offset: Long = ctx.inputStream.offset + override def offset: Long = ctx.dataStream.offset - override def hasNext: Boolean = !ctx.inputStream.isEndOfStream + override def hasNext: Boolean = !ctx.dataStream.isEndOfStream @throws[NoSuchElementException] override def next(): Array[Byte] = { @@ -39,9 +40,9 @@ class CustomRecordExtractorMock(ctx: RawRecordContext) extends Serializable with } val rawRecord = if (recordNumber % 2 == 0) { - ctx.inputStream.next(2) + ctx.dataStream.next(2) } else { - ctx.inputStream.next(3) + ctx.dataStream.next(3) } recordNumber += 1 @@ -52,4 +53,5 @@ class CustomRecordExtractorMock(ctx: RawRecordContext) extends Serializable with object CustomRecordExtractorMock { var additionalInfo: String = "" + var catchContext: RawRecordContext = _ } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorWithFileHeaderMock.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorWithFileHeaderMock.scala new file mode 100644 index 00000000..e4c5341f --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/mocks/CustomRecordExtractorWithFileHeaderMock.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.mocks + +import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor} + +/** + * This is a record extractor for a binary file where + * the first byte of the file specifies the record size. + */ +class CustomRecordExtractorWithFileHeaderMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor { + private var recordNumber = ctx.startingRecordNumber + private val recordSize = ctx.headerStream.next(1).head + + ctx.headerStream.close() + + override def offset: Long = ctx.dataStream.offset + + override def hasNext: Boolean = !ctx.dataStream.isEndOfStream + + @throws[NoSuchElementException] + override def next(): Array[Byte] = { + if (!hasNext) { + throw new NoSuchElementException + } + + val rawRecord = ctx.dataStream.next(recordSize) + + if (rawRecord.length != recordSize || ctx.dataStream.isEndOfStream) { + ctx.dataStream.close() + } + + recordNumber += 1 + + rawRecord + } +} diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala index 91c8c82d..ec790672 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test21VariableOccurs.scala @@ -41,9 +41,10 @@ class Test21VariableOccurs extends AnyFunSuite with SparkTestBase { test("Test VarLenReader properly splits a file into records") { val inputStream = new FSStream(s"$inputDataPath/data.dat") + val headerStream = new FSStream(s"$inputDataPath/data.dat") val copybookContents = Files.readAllLines(Paths.get("../data/test21_copybook.cob"), StandardCharsets.ISO_8859_1).toArray.mkString("\n") val copybook = CopybookParser.parse(copybookContents, ASCII) - val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, copybook, null, null, "")) + val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, headerStream, copybook, null, null, "")) val expectedRecords = ListBuffer(Array(48.toByte), Array(49.toByte, 48.toByte), diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala index 5bcaa613..1c9a195b 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test25OccursMappings.scala @@ -52,6 +52,7 @@ class Test25OccursMappings extends AnyFunSuite with SparkTestBase { test("Test Occurs Mappings on VarOccursRecordExtractor") { val inputStream = new FSStream(s"$inputDataPath/data.dat") + val headerStream = new FSStream(s"$inputDataPath/data.dat") val copybookContents = Files.readAllLines(Paths.get("../data/test25_copybook.cob"), StandardCharsets.ISO_8859_1).toArray.mkString("\n") val occursMapping: Map[String, Map[String, Int]] = Map( @@ -66,7 +67,7 @@ class Test25OccursMappings extends AnyFunSuite with SparkTestBase { ) val copybook = CopybookParser.parse(copybookContents, ASCII, occursHandlers = occursMapping) - val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, copybook, null, null, "")) + val recordExtractor = new VarOccursRecordExtractor(RawRecordContext(0L, inputStream, headerStream, copybook, null, null, "")) val expectedRecords = ListBuffer( "1AX".getBytes, diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala index efef5610..3a51debb 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test26CustomRecordExtractor.scala @@ -22,7 +22,6 @@ import za.co.absa.cobrix.spark.cobol.mocks.CustomRecordExtractorMock import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture -//noinspection NameBooleanParameters class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with BinaryFileFixture { private val exampleName = "Test26 (custom record extractor)" @@ -44,6 +43,7 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi assert(actual == expected) assert(CustomRecordExtractorMock.additionalInfo == "re info") + assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.dataStream) } } @@ -57,6 +57,7 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi assert(actual == expected) assert(CustomRecordExtractorMock.additionalInfo == "re info") + assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.dataStream) } } @@ -70,6 +71,28 @@ class Test26CustomRecordExtractor extends AnyWordSpec with SparkTestBase with Bi assert(actual == expected) assert(CustomRecordExtractorMock.additionalInfo == "re info") + assert(CustomRecordExtractorMock.catchContext.headerStream != CustomRecordExtractorMock.catchContext.dataStream) + } + } + + "support file headers" in { + val expected = """[{"A":"012"},{"A":"345"},{"A":"6"}]""" + + val binData = Array(0x03, 0xF0, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6).map(_.toByte) + + withTempBinFile("custom_re", ".dat", binData) { tmpFileName => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("file_start_offset", 1) + .option("record_extractor", "za.co.absa.cobrix.spark.cobol.mocks.CustomRecordExtractorWithFileHeaderMock") + .option("pedantic", "true") + .load(tmpFileName) + + val actual = df.toJSON.collect().mkString("[", ",", "]") + + assert(actual == expected) } } }