Skip to content

Commit

Permalink
#420 Add implementation of raw record extractor for FB format.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 21, 2021
1 parent 5193410 commit 4b4a8d0
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock}
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedBlockParameters, FixedBlockRawRecordExtractor, RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
Expand Down Expand Up @@ -79,7 +79,9 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
case None if readerProperties.isText =>
Some(new TextRecordExtractor(reParams))
case None if readerProperties.recordFormat == FixedBlock =>
Some(new VariableBlockVariableRecordExtractor(reParams)) // ToDo FB record format
val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock)
FixedBlockParameters.validate(fbParams)
Some(new FixedBlockRawRecordExtractor(reParams, fbParams))
case None if readerProperties.recordFormat == VariableBlock =>
Some(new VariableBlockVariableRecordExtractor(reParams))
case None if readerProperties.variableSizeOccurs &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package za.co.absa.cobrix.cobol.reader.extractors.raw

case class FixedBlockParameters(
recordLength: Option[Int],
blockLength: Option[Int],
recordsPerBlock: Option[Int]
)

object FixedBlockParameters {
def validate(params: FixedBlockParameters): Unit = {
if (params.blockLength.isEmpty && params.recordsPerBlock.isEmpty) {
throw new IllegalArgumentException("FB record format requires block length or number records per block to be specified.")
}
if (params.blockLength.nonEmpty && params.recordsPerBlock.nonEmpty) {
throw new IllegalArgumentException("FB record format requires either block length or number records per block to be specified, but not both.")
}
params.recordLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Record length should be positive. Got $x."))
params.blockLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Block length should be positive. Got $x."))
params.recordsPerBlock.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Records per block should be positive. Got $x."))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader.extractors.raw

import scala.collection.mutable

class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockParameters) extends Serializable with RawRecordExtractor {
private val recordQueue = new mutable.Queue[Array[Byte]]

private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize)
private val bdwSize = fbParams.blockLength.getOrElse(fbParams.recordsPerBlock.get * recordSize)

override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = {
if (recordQueue.isEmpty) {
readNextBlock()
}
recordQueue.nonEmpty
}

private def readNextBlock(): Unit = {
if (!ctx.inputStream.isEndOfStream) {
val bdwOffset = ctx.inputStream.offset
val blockBuffer = ctx.inputStream.next(bdwSize)

var blockIndex = 0

while (blockIndex < blockBuffer.length) {
val rdwOffset = bdwOffset + blockIndex

val payload = blockBuffer.slice(blockIndex, blockIndex + recordSize)
if (payload.length > 0) {
recordQueue.enqueue(payload)
}
blockIndex += recordSize
}
}
}


@throws[NoSuchElementException]
override def next(): Array[Byte] = {
if (!hasNext) {
throw new NoSuchElementException
}
recordQueue.dequeue()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader.extractors.raw

import org.scalatest.WordSpec
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.reader.memorystream.TestByteStream
import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParametersFactory}

class FixedBlockRawRecordExtractorSuite extends WordSpec {
private val copybookContent =
""" 01 RECORD.
02 X PIC X(2).
"""
private val copybook = CopybookParser.parseTree(copybookContent)

private val fbParams = FixedBlockParameters(None, Some(2), None)

"fixed block fixed length records" should {
"be able to read a FB file that has no data" in {
val rc = getRawRecordContext(Array[Byte]())

val extractor = new FixedBlockRawRecordExtractor(rc, fbParams)

assert(!extractor.hasNext)

intercept[NoSuchElementException] {
extractor.next()
}
}

"be able to read a FB file that has an incomplete record" in {
val rc = getRawRecordContext(Array[Byte](0xF0.toByte))

val extractor = new FixedBlockRawRecordExtractor(rc, fbParams)

assert(extractor.hasNext)

val r0 = extractor.next()

assert(r0.length == 1)
assert(r0.head == 0xF0.toByte)

intercept[NoSuchElementException] {
extractor.next()
}
}

"be able to read a FB file that has one record per block" in {
val rc = getRawRecordContext(1)

val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(Some(2), None, Some(1)))

assert(extractor.hasNext)

val r0 = extractor.next()
assert(r0.length == 2)
assert(r0.head == 0xF0.toByte)
assert(r0(1) == 0xF0.toByte)

assert(extractor.next().head == 0xF1.toByte)
assert(extractor.next().head == 0xF2.toByte)
assert(!extractor.hasNext)
}

"be able to read a VBVR file that has multiple records per block" in {
val rc = getRawRecordContext(3)

val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(None, None, Some(3)))

assert(extractor.hasNext)

val r0 = extractor.next()
assert(r0.length == 2)
assert(r0.head == 0xF0.toByte)
assert(r0(1) == 0xF0.toByte)

assert(extractor.next().head == 0xF1.toByte)
assert(extractor.next().head == 0xF2.toByte)
assert(extractor.next().head == 0xF3.toByte)
assert(extractor.next().head == 0xF4.toByte)
assert(extractor.next().head == 0xF5.toByte)
assert(extractor.next().head == 0xF6.toByte)
assert(extractor.next().head == 0xF7.toByte)
assert(extractor.next().head == 0xF8.toByte)
assert(!extractor.hasNext)
}
}

"failures" should {
"throw an exception when neither block length nor records per block is specified" in {
val fb = FixedBlockParameters(Some(1), None, None)

val ex = intercept[IllegalArgumentException] {
FixedBlockParameters.validate(fb)
}

assert(ex.getMessage.contains("FB record format requires block length or number records per block to be specified."))
}

"throw an exception when both block length and records per block are specified" in {
val fb = FixedBlockParameters(Some(1), Some(1), Some(1))

val ex = intercept[IllegalArgumentException] {
FixedBlockParameters.validate(fb)
}

assert(ex.getMessage.contains("FB record format requires either block length or number records per block to be specified, but not both."))
}

"throw an exception when record length is zero" in {
val fb = FixedBlockParameters(Some(0), Some(1), None)

val ex = intercept[IllegalArgumentException] {
FixedBlockParameters.validate(fb)
}

assert(ex.getMessage.contains("Record length should be positive. Got 0."))
}

"throw an exception when block size is zero" in {
val fb = FixedBlockParameters(Some(1), Some(0), None)

val ex = intercept[IllegalArgumentException] {
FixedBlockParameters.validate(fb)
}

assert(ex.getMessage.contains("Block length should be positive. Got 0."))
}

"throw an exception when records per block is zero" in {
val fb = FixedBlockParameters(Some(1), None, Some(0))

val ex = intercept[IllegalArgumentException] {
FixedBlockParameters.validate(fb)
}

assert(ex.getMessage.contains("Records per block should be positive. Got 0."))
}
}

private def getRawRecordContext(recordsPerBlock: Int): RawRecordContext = {
val numOfBlocks = 3

val bytes = Range(0, numOfBlocks)
.flatMap(i => {
Range(0, recordsPerBlock).flatMap(j => {
val num = (i * recordsPerBlock + j) % 10
val v = (0xF0 + num).toByte
Array[Byte](v, v)
})
}).toArray[Byte]

getRawRecordContext(bytes)
}

private def getRawRecordContext(bytes: Array[Byte]): RawRecordContext = {
val ibs = new TestByteStream(bytes)

val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0))
val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0))

RawRecordContext(0, ibs, copybook, rdwDecoder, bdwDecoder, "")
}

}

0 comments on commit 4b4a8d0

Please sign in to comment.