Skip to content

Commit

Permalink
#613 Add the ability for record extractors to hav access to the full …
Browse files Browse the repository at this point in the history
…file stream so

they can use it for extracting headers and footers.
  • Loading branch information
yruslan committed May 4, 2023
1 parent 1f9a377 commit 8ce15a5
Show file tree
Hide file tree
Showing 24 changed files with 254 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
checkInputArgumentsValidity()

def recordExtractor(startingRecordNumber: Long,
binaryData: SimpleStream,
dataStream: SimpleStream,
headerStream: SimpleStream,
copybook: Copybook
): Option[RawRecordExtractor] = {
val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment)
Expand All @@ -71,7 +72,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment))
val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams))

val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo)
val reParams = RawRecordContext(startingRecordNumber, dataStream, headerStream, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo)

readerProperties.recordExtractor match {
case Some(recordExtractorClass) =>
Expand Down Expand Up @@ -104,26 +105,27 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],

override def isRdwBigEndian: Boolean = readerProperties.isRdwBigEndian

override def getRecordIterator(binaryData: SimpleStream,
override def getRecordIterator(dataStream: SimpleStream,
headerStream: SimpleStream,
startingFileOffset: Long,
fileNumber: Int,
startingRecordIndex: Long): Iterator[Seq[Any]] =
if (cobolSchema.copybook.isHierarchical) {
new VarLenHierarchicalIterator(cobolSchema.copybook,
binaryData,
dataStream,
readerProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook),
recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook),
fileNumber,
startingRecordIndex,
startingFileOffset,
handler)
} else {
new VarLenNestedIterator(cobolSchema.copybook,
binaryData,
dataStream,
readerProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook),
recordExtractor(startingRecordIndex, dataStream, headerStream, cobolSchema.copybook),
fileNumber,
startingRecordIndex,
startingFileOffset,
Expand All @@ -135,12 +137,14 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
* Traverses the data sequentially as fast as possible to generate record index.
* This index will be used to distribute workload of the conversion.
*
* @param binaryData A stream of input binary data
* @param fileNumber A file number uniquely identified a particular file of the data set
* @param dataStream A stream of input binary data
* @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing
* to a record in the middle.
* @param fileNumber A file number uniquely identified a particular file of the data set
* @return An index of the file
*
*/
override def generateIndex(binaryData: SimpleStream,
override def generateIndex(dataStream: SimpleStream,
headerStream: SimpleStream,
fileNumber: Int,
isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] = {
val inputSplitSizeRecords: Option[Int] = readerProperties.inputSplitRecords
Expand Down Expand Up @@ -173,21 +177,21 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],

segmentIdField match {
case Some(field) => IndexGenerator.sparseIndexGenerator(fileNumber,
binaryData,
dataStream,
readerProperties.fileStartOffset,
recordHeaderParser,
recordExtractor(0L, binaryData, copybook),
recordExtractor(0L, dataStream, headerStream, copybook),
inputSplitSizeRecords,
inputSplitSizeMB,
Some(copybook),
Some(field),
isHierarchical,
segmentIdValue)
case None => IndexGenerator.sparseIndexGenerator(fileNumber,
binaryData,
dataStream,
readerProperties.fileStartOffset,
recordHeaderParser,
recordExtractor(0L, binaryData, copybook),
recordExtractor(0L, dataStream, headerStream, copybook),
inputSplitSizeRecords,
inputSplitSizeMB,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ abstract class VarLenReader extends Reader with Serializable {
/**
* Returns a file iterator between particular offsets. This is for faster traversal of big binary files
*
* @param binaryData A stream positioned at the beginning of the intended file portion to read
* @param dataStream A stream positioned at the beginning of the intended file portion to read
* @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing
* to a record in the middle.
* @param startingFileOffset An offset of the file where parsing should be started
* @param fileNumber A file number uniquely identified a particular file of the data set
* @param startingRecordIndex A starting record index of the data
* @return An iterator of Spark Row objects
*
*/
def getRecordIterator(binaryData: SimpleStream,
def getRecordIterator(dataStream: SimpleStream,
headerStream: SimpleStream,
startingFileOffset: Long,
fileNumber: Int,
startingRecordIndex: Long): Iterator[Seq[Any]]
Expand All @@ -49,12 +52,15 @@ abstract class VarLenReader extends Reader with Serializable {
* Traverses the data sequentially as fast as possible to generate record index.
* This index will be used to distribute workload of the conversion.
*
* @param binaryData A stream of input binary data
* @param fileNumber A file number uniquely identified a particular file of the data set
* @param dataStream A stream of input binary data
* @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing
* to a record in the middle.
* @param fileNumber A file number uniquely identified a particular file of the data set
* @return An index of the file
*
*/
def generateIndex(binaryData: SimpleStream,
def generateIndex(dataStream: SimpleStream,
headerStream: SimpleStream,
fileNumber: Int,
isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry]
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw
import scala.collection.mutable

class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockParameters) extends Serializable with RawRecordExtractor {
ctx.headerStream.close()

private val recordQueue = new mutable.Queue[Array[Byte]]

private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize)
private val bdwSize = fbParams.blockLength.orElse(fbParams.recordsPerBlock.map(_ * recordSize))

override def offset: Long = ctx.inputStream.offset
override def offset: Long = ctx.dataStream.offset

override def hasNext: Boolean = {
if (recordQueue.isEmpty) {
Expand All @@ -34,17 +36,17 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa
}

private def readNextBlock(): Unit = {
if (!ctx.inputStream.isEndOfStream) {
var bdwOffset = ctx.inputStream.offset
if (!ctx.dataStream.isEndOfStream) {
var bdwOffset = ctx.dataStream.offset

val nextBlockSize = bdwSize.getOrElse({
val bdw = ctx.inputStream.next(ctx.bdwDecoder.headerSize)
val bdw = ctx.dataStream.next(ctx.bdwDecoder.headerSize)
val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
bdwOffset += ctx.bdwDecoder.headerSize
blockLength
})

val blockBuffer = ctx.inputStream.next(nextBlockSize)
val blockBuffer = ctx.dataStream.next(nextBlockSize)

var blockIndex = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

/**
* @param startingRecordNumber A record number the input stream is pointing to (zero-based).
* @param inputStream An input stream pointing to the beginning of a file or a record in a file.
* @param dataStream An input stream pointing to the beginning of a file or a record in a file. The
* record extractor should close the stream when the end of file is reached.
* @param headerStream A stream pointing to the beginning of the file, even if inputStream is pointing
* to a record in the middle. The record extractor should close the stream when it
* is no longer needed.
* @param copybook A copybook of the input stream.
* @param additionalInfo A string provided by a client for the raw record extractor.
*/
case class RawRecordContext(
startingRecordNumber: Long,
inputStream: SimpleStream,
dataStream: SimpleStream,
headerStream: SimpleStream,
copybook: Copybook,
rdwDecoder: RecordHeaderDecoder,
bdwDecoder: RecordHeaderDecoder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import java.util
* Hopefully, comments will help anyone reading this.
*/
class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
ctx.headerStream.close()

private val recordSize = ctx.copybook.getRecordSize

// Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF).
Expand Down Expand Up @@ -64,13 +66,13 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R
fetchNextRecord()
}

override def offset: Long = ctx.inputStream.offset - pendingBytesSize
override def offset: Long = ctx.dataStream.offset - pendingBytesSize

// This method ensures that pendingBytes contains the specified number of bytes read from the input stream
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - pendingBytesSize
if (bytesToRead > 0) {
val newBytes = ctx.inputStream.next(bytesToRead)
val newBytes = ctx.dataStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length)
pendingBytesSize = pendingBytesSize + newBytes.length
Expand Down Expand Up @@ -133,7 +135,7 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R
System.arraycopy(pendingBytes, recordLength + i, pendingBytes, recordLength, size - i)
pendingBytesSize -= i
}
endOfStream = ctx.inputStream.isEndOfStream
endOfStream = ctx.dataStream.isEndOfStream
if (!found && !endOfStream) {
ensureBytesRead(maxRecordSize)
}
Expand All @@ -153,7 +155,7 @@ class TextFullRecordExtractor(ctx: RawRecordContext) extends Serializable with R
} else {
// Last record or a record is too large?
// In the latter case
if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) {
if (pendingBytesSize <= recordSize && ctx.dataStream.isEndOfStream) {
// Last record
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import java.util
* Hopefully, comments will help anyone reading this.
*/
class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
ctx.headerStream.close()

private val recordSize = ctx.copybook.getRecordSize

// Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF).
Expand Down Expand Up @@ -64,13 +66,13 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
fetchNextRecord()
}

override def offset: Long = ctx.inputStream.offset - pendingBytesSize
override def offset: Long = ctx.dataStream.offset - pendingBytesSize

// This method ensures that pendingBytes contains the specified number of bytes read from the input stream
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - pendingBytesSize
if (bytesToRead > 0) {
val newBytes = ctx.inputStream.next(bytesToRead)
val newBytes = ctx.dataStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, pendingBytes, pendingBytesSize, newBytes.length)
pendingBytesSize = pendingBytesSize + newBytes.length
Expand Down Expand Up @@ -126,7 +128,7 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
} else {
// Last record or a record is too large?
// In the latter case
if (pendingBytesSize <= recordSize && ctx.inputStream.isEndOfStream) {
if (pendingBytesSize <= recordSize && ctx.dataStream.isEndOfStream) {
// Last record
curRecordSize = pendingBytesSize
curPayloadSize = pendingBytesSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,27 @@ import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
* determined neither from the copybook nor from record headers.
*/
class VarOccursRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
ctx.headerStream.close()

private val maxRecordSize = ctx.copybook.getRecordSize
private val ast = ctx.copybook.ast
private val hasVarSizeOccurs = copybookHasVarSizedOccurs
private val bytes = new Array[Byte](maxRecordSize)
private var bytesSize = 0

override def hasNext: Boolean = ctx.inputStream.offset < ctx.inputStream.size
override def hasNext: Boolean = ctx.dataStream.offset < ctx.dataStream.size

override def next(): Array[Byte] = {
if (hasVarSizeOccurs) {
bytesSize = 0
util.Arrays.fill(bytes, 0.toByte)
extractVarOccursRecordBytes()
} else {
ctx.inputStream.next(maxRecordSize)
ctx.dataStream.next(maxRecordSize)
}
}

def offset: Long = ctx.inputStream.offset
def offset: Long = ctx.dataStream.offset

private def extractVarOccursRecordBytes(): Array[Byte] = {
val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]]
Expand Down Expand Up @@ -138,7 +140,7 @@ class VarOccursRecordExtractor(ctx: RawRecordContext) extends Serializable with
private def ensureBytesRead(numOfBytes: Int): Unit = {
val bytesToRead = numOfBytes - bytesSize
if (bytesToRead > 0) {
val newBytes = ctx.inputStream.next(bytesToRead)
val newBytes = ctx.dataStream.next(bytesToRead)
if (newBytes.length > 0) {
System.arraycopy(newBytes, 0, bytes, bytesSize, newBytes.length)
bytesSize = numOfBytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package za.co.absa.cobrix.cobol.reader.extractors.raw
import scala.collection.mutable

class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
ctx.headerStream.close()

private val recordQueue = new mutable.Queue[Array[Byte]]
private var canSplitAtCurrentOffset = true
private var recordOffset: Long = ctx.inputStream.offset
private var recordOffset: Long = ctx.dataStream.offset

override def offset: Long = recordOffset

Expand All @@ -38,12 +40,12 @@ class VariableBlockVariableRecordExtractor(ctx: RawRecordContext) extends Serial
val bdwSize = ctx.bdwDecoder.headerSize
val rdwSize = ctx.rdwDecoder.headerSize

if (!ctx.inputStream.isEndOfStream) {
val bdwOffset = ctx.inputStream.offset
val bdw = ctx.inputStream.next(bdwSize)
if (!ctx.dataStream.isEndOfStream) {
val bdwOffset = ctx.dataStream.offset
val bdw = ctx.dataStream.next(bdwSize)

val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
val blockBuffer = ctx.inputStream.next(blockLength)
val blockBuffer = ctx.dataStream.next(blockLength)

var blockIndex = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package za.co.absa.cobrix.cobol.mock
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}

class RecordExtractorMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
override def offset: Long = ctx.inputStream.offset
override def offset: Long = ctx.dataStream.offset

override def hasNext: Boolean = !ctx.inputStream.isEndOfStream
override def hasNext: Boolean = !ctx.dataStream.isEndOfStream

override def next(): Array[Byte] = {
val header = ctx.inputStream.next(2)
val header = ctx.dataStream.next(2)
if (header.length == 2) {
ctx.inputStream.next(header.head + header(1) * 256)
ctx.dataStream.next(header.head + header(1) * 256)
} else {
Array.empty[Byte]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package za.co.absa.cobrix.cobol.mock
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor}

class RecordExtractorReadAhaedMock(ctx: RawRecordContext) extends Serializable with RawRecordExtractor {
ctx.inputStream.next(2)
ctx.dataStream.next(2)

override def offset: Long = ctx.inputStream.offset
override def offset: Long = ctx.dataStream.offset

override def hasNext: Boolean = !ctx.inputStream.isEndOfStream
override def hasNext: Boolean = !ctx.dataStream.isEndOfStream

override def next(): Array[Byte] = {
val header = ctx.inputStream.next(2)
val header = ctx.dataStream.next(2)
if (header.length == 2) {
ctx.inputStream.next(header.head + header(1) * 256)
ctx.dataStream.next(header.head + header(1) * 256)
} else {
Array.empty[Byte]
}
Expand Down
Loading

0 comments on commit 8ce15a5

Please sign in to comment.