From 4b4a8d0fd1786879f826864b28bea308405ea11d Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 16 Sep 2021 13:59:26 +0200 Subject: [PATCH] #420 Add implementation of raw record extractor for FB format. --- .../cobol/reader/VarLenNestedReader.scala | 6 +- .../extractors/raw/FixedBlockParameters.scala | 21 ++ .../raw/FixedBlockRawRecordExtractor.scala | 63 ++++++ .../FixedBlockRawRecordExtractorSuite.scala | 180 ++++++++++++++++++ 4 files changed, 268 insertions(+), 2 deletions(-) create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala create mode 100644 cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 94456091..7ec5a61d 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -24,7 +24,7 @@ import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC} import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory} import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock} import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} -import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor} +import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedBlockParameters, FixedBlockRawRecordExtractor, RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor} import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler import za.co.absa.cobrix.cobol.reader.index.IndexGenerator import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry @@ -79,7 +79,9 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], case None if readerProperties.isText => Some(new TextRecordExtractor(reParams)) case None if readerProperties.recordFormat == FixedBlock => - Some(new VariableBlockVariableRecordExtractor(reParams)) // ToDo FB record format + val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock) + FixedBlockParameters.validate(fbParams) + Some(new FixedBlockRawRecordExtractor(reParams, fbParams)) case None if readerProperties.recordFormat == VariableBlock => Some(new VariableBlockVariableRecordExtractor(reParams)) case None if readerProperties.variableSizeOccurs && diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala new file mode 100644 index 00000000..b348267e --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala @@ -0,0 +1,21 @@ +package za.co.absa.cobrix.cobol.reader.extractors.raw + +case class FixedBlockParameters( + recordLength: Option[Int], + blockLength: Option[Int], + recordsPerBlock: Option[Int] + ) + +object FixedBlockParameters { + def validate(params: FixedBlockParameters): Unit = { + if (params.blockLength.isEmpty && params.recordsPerBlock.isEmpty) { + throw new IllegalArgumentException("FB record format requires block length or number records per block to be specified.") + } + if (params.blockLength.nonEmpty && params.recordsPerBlock.nonEmpty) { + throw new IllegalArgumentException("FB record format requires either block length or number records per block to be specified, but not both.") + } + params.recordLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Record length should be positive. Got $x.")) + params.blockLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Block length should be positive. Got $x.")) + params.recordsPerBlock.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Records per block should be positive. Got $x.")) + } +} \ No newline at end of file diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala new file mode 100644 index 00000000..fa7c43f6 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.extractors.raw + +import scala.collection.mutable + +class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockParameters) extends Serializable with RawRecordExtractor { + private val recordQueue = new mutable.Queue[Array[Byte]] + + private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize) + private val bdwSize = fbParams.blockLength.getOrElse(fbParams.recordsPerBlock.get * recordSize) + + override def offset: Long = ctx.inputStream.offset + + override def hasNext: Boolean = { + if (recordQueue.isEmpty) { + readNextBlock() + } + recordQueue.nonEmpty + } + + private def readNextBlock(): Unit = { + if (!ctx.inputStream.isEndOfStream) { + val bdwOffset = ctx.inputStream.offset + val blockBuffer = ctx.inputStream.next(bdwSize) + + var blockIndex = 0 + + while (blockIndex < blockBuffer.length) { + val rdwOffset = bdwOffset + blockIndex + + val payload = blockBuffer.slice(blockIndex, blockIndex + recordSize) + if (payload.length > 0) { + recordQueue.enqueue(payload) + } + blockIndex += recordSize + } + } + } + + + @throws[NoSuchElementException] + override def next(): Array[Byte] = { + if (!hasNext) { + throw new NoSuchElementException + } + recordQueue.dequeue() + } +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala new file mode 100644 index 00000000..a68694dd --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala @@ -0,0 +1,180 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.extractors.raw + +import org.scalatest.WordSpec +import za.co.absa.cobrix.cobol.parser.CopybookParser +import za.co.absa.cobrix.cobol.reader.memorystream.TestByteStream +import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParametersFactory} + +class FixedBlockRawRecordExtractorSuite extends WordSpec { + private val copybookContent = + """ 01 RECORD. + 02 X PIC X(2). + """ + private val copybook = CopybookParser.parseTree(copybookContent) + + private val fbParams = FixedBlockParameters(None, Some(2), None) + + "fixed block fixed length records" should { + "be able to read a FB file that has no data" in { + val rc = getRawRecordContext(Array[Byte]()) + + val extractor = new FixedBlockRawRecordExtractor(rc, fbParams) + + assert(!extractor.hasNext) + + intercept[NoSuchElementException] { + extractor.next() + } + } + + "be able to read a FB file that has an incomplete record" in { + val rc = getRawRecordContext(Array[Byte](0xF0.toByte)) + + val extractor = new FixedBlockRawRecordExtractor(rc, fbParams) + + assert(extractor.hasNext) + + val r0 = extractor.next() + + assert(r0.length == 1) + assert(r0.head == 0xF0.toByte) + + intercept[NoSuchElementException] { + extractor.next() + } + } + + "be able to read a FB file that has one record per block" in { + val rc = getRawRecordContext(1) + + val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(Some(2), None, Some(1))) + + assert(extractor.hasNext) + + val r0 = extractor.next() + assert(r0.length == 2) + assert(r0.head == 0xF0.toByte) + assert(r0(1) == 0xF0.toByte) + + assert(extractor.next().head == 0xF1.toByte) + assert(extractor.next().head == 0xF2.toByte) + assert(!extractor.hasNext) + } + + "be able to read a VBVR file that has multiple records per block" in { + val rc = getRawRecordContext(3) + + val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(None, None, Some(3))) + + assert(extractor.hasNext) + + val r0 = extractor.next() + assert(r0.length == 2) + assert(r0.head == 0xF0.toByte) + assert(r0(1) == 0xF0.toByte) + + assert(extractor.next().head == 0xF1.toByte) + assert(extractor.next().head == 0xF2.toByte) + assert(extractor.next().head == 0xF3.toByte) + assert(extractor.next().head == 0xF4.toByte) + assert(extractor.next().head == 0xF5.toByte) + assert(extractor.next().head == 0xF6.toByte) + assert(extractor.next().head == 0xF7.toByte) + assert(extractor.next().head == 0xF8.toByte) + assert(!extractor.hasNext) + } + } + + "failures" should { + "throw an exception when neither block length nor records per block is specified" in { + val fb = FixedBlockParameters(Some(1), None, None) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("FB record format requires block length or number records per block to be specified.")) + } + + "throw an exception when both block length and records per block are specified" in { + val fb = FixedBlockParameters(Some(1), Some(1), Some(1)) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("FB record format requires either block length or number records per block to be specified, but not both.")) + } + + "throw an exception when record length is zero" in { + val fb = FixedBlockParameters(Some(0), Some(1), None) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("Record length should be positive. Got 0.")) + } + + "throw an exception when block size is zero" in { + val fb = FixedBlockParameters(Some(1), Some(0), None) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("Block length should be positive. Got 0.")) + } + + "throw an exception when records per block is zero" in { + val fb = FixedBlockParameters(Some(1), None, Some(0)) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("Records per block should be positive. Got 0.")) + } + } + + private def getRawRecordContext(recordsPerBlock: Int): RawRecordContext = { + val numOfBlocks = 3 + + val bytes = Range(0, numOfBlocks) + .flatMap(i => { + Range(0, recordsPerBlock).flatMap(j => { + val num = (i * recordsPerBlock + j) % 10 + val v = (0xF0 + num).toByte + Array[Byte](v, v) + }) + }).toArray[Byte] + + getRawRecordContext(bytes) + } + + private def getRawRecordContext(bytes: Array[Byte]): RawRecordContext = { + val ibs = new TestByteStream(bytes) + + val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) + val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) + + RawRecordContext(0, ibs, copybook, rdwDecoder, bdwDecoder, "") + } + +}