From aee8a7f8f1fe76eb5c9ac969604610f11597b878 Mon Sep 17 00:00:00 2001 From: Munish Date: Thu, 10 Aug 2023 20:12:55 +0530 Subject: [PATCH 01/21] getting offset from getNextStreamCut for readlimit Signed-off-by: Munish --- .../spark/PravegaMicroBatchStream.scala | 38 ++++++++++++++++- .../spark/PravegaSourceProvider.scala | 7 ++++ .../spark/PravegaMicroBatchSourceSuite.scala | 42 +++++++++++++++++++ 3 files changed, 85 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index 1b8b525..c85dd3d 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -22,7 +22,7 @@ import io.pravega.client.stream.{Stream, StreamCut} import io.pravega.client.{BatchClientFactory, ClientConfig} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset} +import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset, ReadAllAvailable, ReadLimit, ReadMaxRows, SupportsAdmissionControl} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} import resource.managed @@ -43,12 +43,13 @@ class PravegaMicroBatchStream( startStreamCut: PravegaStreamCut, endStreamCut: PravegaStreamCut ) - extends MicroBatchStream with Logging { + extends MicroBatchStream with SupportsAdmissionControl with Logging { protected var batchStartStreamCut: StreamCut = _ protected var batchEndStreamCut: StreamCut = _ log.info(s"Initializing micro-batch stream: ${this}") + private val batchClientFactory = BatchClientFactory.withScope(scopeName, clientConfig) private val streamManager = StreamManager.create(clientConfig) @@ -76,7 +77,10 @@ class PravegaMicroBatchStream( } override def latestOffset(): Offset = { + //TODO Confirm if we can throw exception like below how it is implemented in kafka connector or we can keep existing implementation PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getTailStreamCut) + /*throw new UnsupportedOperationException( + "latestOffset(Offset, ReadLimit) should be called instead of this method")*/ } /** @@ -129,4 +133,34 @@ class PravegaMicroBatchStream( s"PravegaMicroBatchStream{clientConfig=${clientConfig}, scopeName=${scopeName}, streamName=${streamName}" + s" startStreamCut=${startStreamCut}, endStreamCut=${endStreamCut}}" } + + override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = + { + val startOffset = Option(start) + .map(_.asInstanceOf[PravegaSourceOffset].streamCut).get + val limits: Seq[ReadLimit] = readLimit match + { + case rows => Seq(rows) + } + val nextStreamCut = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) { + PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getTailStreamCut) + } else + { + val upperLimit = limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows]) + PravegaSourceOffset(batchClientFactory.getNextStreamCut(startOffset, upperLimit.get.maxRows())) + } + log.info(s"nextStreamCut = ${nextStreamCut.streamCut}") + nextStreamCut + } + + override def getDefaultReadLimit: ReadLimit = { + val maxOffsetsPerTrigger = Option(options.get( + PravegaSourceProvider.MAX_OFFSET_PER_TRIGGER)).get.map(_.toLong) + if ( maxOffsetsPerTrigger.isDefined) { + ReadLimit.maxRows(maxOffsetsPerTrigger.get) + } else { + // TODO (SPARK-37973) Directly call super.getDefaultReadLimit when scala issue 12523 is fixed + maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) + } + } } diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala index 9ea246e..b6ccf62 100755 --- a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala @@ -96,6 +96,7 @@ object PravegaSourceProvider extends Logging { private[spark] val DEFAULT_BATCH_TRANSACTION_TIMEOUT_MS: Long = 2 * 60 * 1000 // 2 minutes (maximum allowed by default server) private[spark] val DEFAULT_TRANSACTION_STATUS_POLL_INTERVAL_MS: Long = 50 + private[spark] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger" def buildStreamConfig(caseInsensitiveParams: Map[String, String]): StreamConfiguration = { var streamConfig = StreamConfiguration.builder @@ -201,6 +202,12 @@ object PravegaSourceProvider extends Logging { if (retentionMilliseconds.isDefined && (Try(retentionMilliseconds.get.toInt).isFailure || retentionMilliseconds.get.toInt < 0)) { throw new IllegalArgumentException(s"Retention time should be an integer morethan or equal to zero milliseconds") } + + val maxOffsetPerTrigger = caseInsensitiveParams.get(PravegaSourceProvider.MAX_OFFSET_PER_TRIGGER) + if (maxOffsetPerTrigger.isDefined && (Try(maxOffsetPerTrigger.get.toInt).isFailure || maxOffsetPerTrigger.get.toInt < 1)) + { + throw new IllegalArgumentException(s"Max events per trigger should be an integer more than or equal to one") + } } def buildClientConfig(caseInsensitiveParams: Map[String, String]): ClientConfig = { diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index 8ee8f28..4757f29 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -149,6 +149,33 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } + test(s"read from latest stream cut with maxoffset 1") + { + val streamName = newStreamName() + testFromLatestStreamCut( + streamName, + addSegments = false, + (MAX_OFFSET_PER_TRIGGER, "1")) + } + + test(s"read from latest stream cut with maxoffset 1000") + { + val streamName = newStreamName() + testFromLatestStreamCut( + streamName, + addSegments = false, + (MAX_OFFSET_PER_TRIGGER, "1000")) + } + + test(s"read from latest stream cut with maxoffset 10") + { + val streamName = newStreamName() + testFromLatestStreamCut( + streamName, + addSegments = false, + (MAX_OFFSET_PER_TRIGGER, "10")) + } + test(s"read from earliest stream cut") { val streamName = newStreamName() testFromEarliestStreamCut( @@ -163,6 +190,15 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } + test(s"read from specific stream cut with maxoffset 10") + { + val streamName = newStreamName() + testFromSpecificStreamCut( + streamName, + addSegments = false, + (MAX_OFFSET_PER_TRIGGER, "10")) + } + test(s"read from earliest stream cut, add new segments") { val streamName = newStreamName() testFromEarliestStreamCut( @@ -250,6 +286,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) .option(START_STREAM_CUT_OPTION_KEY, STREAM_CUT_EARLIEST) + .option(MAX_OFFSET_PER_TRIGGER, "1") options.foreach { case (k, v) => reader.option(k, v) } val dataset = reader.load() .selectExpr("CAST(event AS STRING)") @@ -307,6 +344,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) .option(START_STREAM_CUT_OPTION_KEY, tail1) + .option(MAX_OFFSET_PER_TRIGGER, "1") options.foreach { case (k, v) => reader.option(k, v) } val dataset = reader.load() .selectExpr("CAST(event AS STRING)") @@ -338,6 +376,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) .option(START_STREAM_CUT_OPTION_KEY, STREAM_CUT_EARLIEST) + .option(MAX_OFFSET_PER_TRIGGER, "1") .load() val query = df @@ -374,6 +413,7 @@ abstract class PravegaMicroBatchSourceSuiteBase extends PravegaSourceSuiteBase { .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) + .option(MAX_OFFSET_PER_TRIGGER, "1") testStream(reader.load)( makeSureGetOffsetCalled, @@ -393,6 +433,7 @@ abstract class PravegaMicroBatchSourceSuiteBase extends PravegaSourceSuiteBase { .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) + .option(MAX_OFFSET_PER_TRIGGER, "1") .load() .selectExpr("CAST(event AS STRING)") .as[String] @@ -420,6 +461,7 @@ abstract class PravegaMicroBatchSourceSuiteBase extends PravegaSourceSuiteBase { .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) + .option(MAX_OFFSET_PER_TRIGGER, "1") .load() val values = df From 9322cc3f45e0bb1c998cb97a155e48a28a981892 Mon Sep 17 00:00:00 2001 From: Munish Date: Thu, 10 Aug 2023 20:40:03 +0530 Subject: [PATCH 02/21] using getDefaultReadLimit Signed-off-by: Munish --- .../io/pravega/connectors/spark/PravegaMicroBatchStream.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index c85dd3d..a04464e 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -159,8 +159,7 @@ class PravegaMicroBatchStream( if ( maxOffsetsPerTrigger.isDefined) { ReadLimit.maxRows(maxOffsetsPerTrigger.get) } else { - // TODO (SPARK-37973) Directly call super.getDefaultReadLimit when scala issue 12523 is fixed - maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(ReadLimit.allAvailable()) + super.getDefaultReadLimit } } } From df5a7e04c9f2d175b94e69ad9e57f51285549883 Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 11 Aug 2023 10:48:40 +0530 Subject: [PATCH 03/21] Updated indentation Signed-off-by: Munish --- .../connectors/spark/PravegaMicroBatchStream.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index a04464e..788c812 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -77,10 +77,7 @@ class PravegaMicroBatchStream( } override def latestOffset(): Offset = { - //TODO Confirm if we can throw exception like below how it is implemented in kafka connector or we can keep existing implementation PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getTailStreamCut) - /*throw new UnsupportedOperationException( - "latestOffset(Offset, ReadLimit) should be called instead of this method")*/ } /** @@ -134,18 +131,15 @@ class PravegaMicroBatchStream( s" startStreamCut=${startStreamCut}, endStreamCut=${endStreamCut}}" } - override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = - { + override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { val startOffset = Option(start) .map(_.asInstanceOf[PravegaSourceOffset].streamCut).get - val limits: Seq[ReadLimit] = readLimit match - { + val limits: Seq[ReadLimit] = readLimit match { case rows => Seq(rows) } val nextStreamCut = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) { PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getTailStreamCut) - } else - { + } else { val upperLimit = limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows]) PravegaSourceOffset(batchClientFactory.getNextStreamCut(startOffset, upperLimit.get.maxRows())) } From 77b93822d0594286263f853e25d5391505a9cdde Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 11 Aug 2023 13:03:23 +0530 Subject: [PATCH 04/21] Updated validation for offset Signed-off-by: Munish --- .../io/pravega/connectors/spark/PravegaSourceProvider.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala index b6ccf62..37c98e0 100755 --- a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala @@ -204,9 +204,9 @@ object PravegaSourceProvider extends Logging { } val maxOffsetPerTrigger = caseInsensitiveParams.get(PravegaSourceProvider.MAX_OFFSET_PER_TRIGGER) - if (maxOffsetPerTrigger.isDefined && (Try(maxOffsetPerTrigger.get.toInt).isFailure || maxOffsetPerTrigger.get.toInt < 1)) + if (maxOffsetPerTrigger.isDefined && (Try(maxOffsetPerTrigger.get.toLong).isFailure || maxOffsetPerTrigger.get.toLong < 1)) { - throw new IllegalArgumentException(s"Max events per trigger should be an integer more than or equal to one") + throw new IllegalArgumentException(s"Max offset should be of type long more than or equal to one") } } From 37afbf2fb6197e40bf47accb44cda0240808bee2 Mon Sep 17 00:00:00 2001 From: Munish Date: Sun, 13 Aug 2023 13:26:38 +0530 Subject: [PATCH 05/21] Integration test for checking batch data Signed-off-by: Munish --- .../spark/PravegaMicroBatchSourceSuite.scala | 178 +++++++++++++++++- 1 file changed, 168 insertions(+), 10 deletions(-) diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index 4757f29..97722a8 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -11,14 +11,16 @@ package io.pravega.connectors.spark import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicInteger - import io.pravega.client.stream.StreamCut import io.pravega.connectors.spark.PravegaSourceProvider._ +import org.apache.spark.sql.Dataset import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, SparkDataStream} import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.test.SharedSparkSession +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ @@ -119,6 +121,19 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { import testImplicits._ + private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { + q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + test("stop stream before reading anything") { val streamName = newStreamName() testUtils.createTestStream(streamName, numSegments = 5) @@ -149,8 +164,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } - test(s"read from latest stream cut with maxoffset 1") - { + test(s"read from latest stream cut with maxoffset 1") { val streamName = newStreamName() testFromLatestStreamCut( streamName, @@ -158,8 +172,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { (MAX_OFFSET_PER_TRIGGER, "1")) } - test(s"read from latest stream cut with maxoffset 1000") - { + test(s"read from latest stream cut with maxoffset 1000") { val streamName = newStreamName() testFromLatestStreamCut( streamName, @@ -167,8 +180,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { (MAX_OFFSET_PER_TRIGGER, "1000")) } - test(s"read from latest stream cut with maxoffset 10") - { + test(s"read from latest stream cut with maxoffset 10") { val streamName = newStreamName() testFromLatestStreamCut( streamName, @@ -183,6 +195,26 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } + test(s"read from earliest stream cut with maxoffset 10") { + val streamName = newStreamName() + testFromEarliestStreamCut( + streamName, + addSegments = false, + 5, + Map("MAX_OFFSET_PER_TRIGGE" -> "10") + ) + } + + test(s"read from earliest stream cut with maxoffset 1000") { + val streamName = newStreamName() + testFromEarliestStreamCut( + streamName, + addSegments = false, + 5, + Map("MAX_OFFSET_PER_TRIGGE" -> "1000") + ) + } + test(s"read from specific stream cut") { val streamName = newStreamName() testFromSpecificStreamCut( @@ -190,8 +222,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } - test(s"read from specific stream cut with maxoffset 10") - { + test(s"read from specific stream cut with maxoffset 10") { val streamName = newStreamName() testFromSpecificStreamCut( streamName, @@ -225,6 +256,134 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { } } + test(s"read in batches of maxoffset 10") { + val streamName = newStreamName() + testForBatchSizeMinimum(getDataSet( + streamName, + addSegments = false, + numSegments = 3, + (MAX_OFFSET_PER_TRIGGER, "10"))) + } + + // ApproxDistance per segemnt(1/3=0.3) next StreamCut is fetched i.e 1 which is less then each event length i.e 10 . batch 1 will yield one event each + test(s"read in batches of maxoffset 1") + { + val streamName = newStreamName() + testForBatchSizeMinimum(getDataSet( + streamName, + addSegments = false, + numSegments = 3, + (MAX_OFFSET_PER_TRIGGER, "1"))) + } + + // ApproxDistance per segemnt(30/3=10) equal to each event length i.e 10 . batch 1 will yield one event each + test(s"read in batches of maxoffset 30") { + val streamName = newStreamName() + testForBatchSizeMinimum(getDataSet( + streamName, + addSegments = false, + numSegments = 3, + (MAX_OFFSET_PER_TRIGGER, "30"))) + } + + // ApproxDistance per segemnt(33/3=11) greater then each event length i.e 10 . batch 1 will yield two events each + // nextStreamCut = scope/stream0:0=20, 1=20, 2=20 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + test(s"read in batches of maxoffset 33") { + val streamName = newStreamName() + testForBatchSizeCustom(getDataSet( + streamName, + addSegments = false, + numSegments = 3, + (MAX_OFFSET_PER_TRIGGER, "33"))) + } + // ApproxDistance per segemnt(100/3=33) greater then each event length of all 3 events i.e 30 . batch 1 will yield all events till tail. + //nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + test(s"read in batches of maxoffset 100") { + val streamName = newStreamName() + testForBatchSizeTooLarge(getDataSet( + streamName, + addSegments = false, + numSegments = 3, + (MAX_OFFSET_PER_TRIGGER, "100"))) + } + + private def getDataSet( + streamName: String, + addSegments: Boolean, + numSegments: Int, + options: (String, String)*): Dataset[Int] = { + testUtils.createTestStream(streamName, numSegments = numSegments) + testUtils.sendMessages(streamName, Array(10, 40, 70).map(_.toString), Some(0)) // appends 10,40,70 to segment 0 each length is 10 + testUtils.sendMessages(streamName, Array(20, 50, 80).map(_.toString), Some(1)) // appends 20,50,80 to segment 0 each length is 10 + testUtils.sendMessages(streamName, Array(30, 60, 90).map(_.toString), Some(2)) // appends 30,60,90 to segment 0 each length is 10 + require(testUtils.getLatestStreamCut(Set(streamName)).asImpl().getPositions.size === numSegments) + + val reader = spark.readStream + reader + .format(SOURCE_PROVIDER_NAME) + .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) + .option(SCOPE_OPTION_KEY, testUtils.scope) + .option(STREAM_OPTION_KEY, streamName) + .option(START_STREAM_CUT_OPTION_KEY, STREAM_CUT_EARLIEST) + options.foreach + { case (k, v) => reader.option(k, v) } + val dataset = reader.load() + .selectExpr("CAST(event AS STRING)") + .as[String] + dataset.map(e => e.toInt + 1) + } + + /* + * Batch 1: nextStreamCut = scope/stream0:0=10, 1=10, 2=10 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + * Batch 2: nextStreamCut = scope/stream0:0=20, 1=20, 2=20 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + * Batch 3: nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + */ + private def testForBatchSizeMinimum( mapped: Dataset[Int]): Unit = { + val clock = new StreamManualClock + testStream(mapped)( + StartStream(Trigger.ProcessingTime(100), clock), + waitUntilBatchProcessed(clock), + CheckAnswer(11, 21, 31), + AdvanceManualClock(100), + waitUntilBatchProcessed(clock), + CheckAnswer(11, 21, 31, 41, 51, 61), + AdvanceManualClock(100), + waitUntilBatchProcessed(clock), + CheckAnswer(11, 21, 31, 41, 51, 61, 71, 81, 91) + ) + } + + /* + * Batch 1: nextStreamCut = scope/stream0:0=20, 1=20, 2=20 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + * Batch 2: nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + */ + private def testForBatchSizeCustom(mapped: Dataset[Int]): Unit = { + val clock = new StreamManualClock + testStream(mapped)( + StartStream(Trigger.ProcessingTime(100), clock), + waitUntilBatchProcessed(clock), + CheckAnswer(11, 21, 31, 41, 51, 61), + AdvanceManualClock(100), + waitUntilBatchProcessed(clock), + CheckAnswer(11, 21, 31, 41, 51, 61, 71, 81, 91) + ) + } + + /* + * Batch 1: nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + */ + private def testForBatchSizeTooLarge(mapped: Dataset[Int]): Unit = { + val clock = new StreamManualClock + testStream(mapped)( + StartStream(Trigger.ProcessingTime(100), clock), + waitUntilBatchProcessed(clock), + CheckAnswer(11, 21, 31, 41, 51, 61, 71, 81, 91), + AdvanceManualClock(100), + waitUntilBatchProcessed(clock), + CheckAnswer(11, 21, 31, 41, 51, 61, 71, 81, 91) + ) + } + private def testFromLatestStreamCut( streamName: String, addSegments: Boolean, @@ -286,7 +445,6 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) .option(START_STREAM_CUT_OPTION_KEY, STREAM_CUT_EARLIEST) - .option(MAX_OFFSET_PER_TRIGGER, "1") options.foreach { case (k, v) => reader.option(k, v) } val dataset = reader.load() .selectExpr("CAST(event AS STRING)") From 7130d97c509751f3277f5d581d1c12190701d565 Mon Sep 17 00:00:00 2001 From: Munish Date: Sun, 13 Aug 2023 13:45:51 +0530 Subject: [PATCH 06/21] Integration test for checking batch data Signed-off-by: Munish --- .../connectors/spark/PravegaMicroBatchSourceSuite.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index 97722a8..7e9d349 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -502,7 +502,6 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) .option(START_STREAM_CUT_OPTION_KEY, tail1) - .option(MAX_OFFSET_PER_TRIGGER, "1") options.foreach { case (k, v) => reader.option(k, v) } val dataset = reader.load() .selectExpr("CAST(event AS STRING)") @@ -534,7 +533,6 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) .option(START_STREAM_CUT_OPTION_KEY, STREAM_CUT_EARLIEST) - .option(MAX_OFFSET_PER_TRIGGER, "1") .load() val query = df @@ -571,7 +569,6 @@ abstract class PravegaMicroBatchSourceSuiteBase extends PravegaSourceSuiteBase { .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) - .option(MAX_OFFSET_PER_TRIGGER, "1") testStream(reader.load)( makeSureGetOffsetCalled, @@ -591,7 +588,6 @@ abstract class PravegaMicroBatchSourceSuiteBase extends PravegaSourceSuiteBase { .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) - .option(MAX_OFFSET_PER_TRIGGER, "1") .load() .selectExpr("CAST(event AS STRING)") .as[String] @@ -619,7 +615,6 @@ abstract class PravegaMicroBatchSourceSuiteBase extends PravegaSourceSuiteBase { .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) .option(SCOPE_OPTION_KEY, testUtils.scope) .option(STREAM_OPTION_KEY, streamName) - .option(MAX_OFFSET_PER_TRIGGER, "1") .load() val values = df From af8a23ad2a73c3791ab263b46f5be931ebf2bc61 Mon Sep 17 00:00:00 2001 From: Munish Date: Mon, 14 Aug 2023 12:13:59 +0530 Subject: [PATCH 07/21] Comments added Signed-off-by: Munish --- .../connectors/spark/PravegaMicroBatchSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index 7e9d349..98e48db 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -314,8 +314,8 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { options: (String, String)*): Dataset[Int] = { testUtils.createTestStream(streamName, numSegments = numSegments) testUtils.sendMessages(streamName, Array(10, 40, 70).map(_.toString), Some(0)) // appends 10,40,70 to segment 0 each length is 10 - testUtils.sendMessages(streamName, Array(20, 50, 80).map(_.toString), Some(1)) // appends 20,50,80 to segment 0 each length is 10 - testUtils.sendMessages(streamName, Array(30, 60, 90).map(_.toString), Some(2)) // appends 30,60,90 to segment 0 each length is 10 + testUtils.sendMessages(streamName, Array(20, 50, 80).map(_.toString), Some(1)) // appends 20,50,80 to segment 1 each length is 10 + testUtils.sendMessages(streamName, Array(30, 60, 90).map(_.toString), Some(2)) // appends 30,60,90 to segment 2 each length is 10 require(testUtils.getLatestStreamCut(Set(streamName)).asImpl().getPositions.size === numSegments) val reader = spark.readStream From c73d4237d8387040b2c1b17048cca36a6bf7fb8a Mon Sep 17 00:00:00 2001 From: Munish Date: Mon, 14 Aug 2023 16:11:34 +0530 Subject: [PATCH 08/21] Long max as offset test case added Signed-off-by: Munish --- .../spark/PravegaSourceProvider.scala | 2 +- .../spark/PravegaMicroBatchSourceSuite.scala | 22 ++++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala index 37c98e0..b4a0627 100755 --- a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala @@ -96,7 +96,7 @@ object PravegaSourceProvider extends Logging { private[spark] val DEFAULT_BATCH_TRANSACTION_TIMEOUT_MS: Long = 2 * 60 * 1000 // 2 minutes (maximum allowed by default server) private[spark] val DEFAULT_TRANSACTION_STATUS_POLL_INTERVAL_MS: Long = 50 - private[spark] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger" + private[spark] val MAX_OFFSET_PER_TRIGGER = "max_offsets_per_trigger" def buildStreamConfig(caseInsensitiveParams: Map[String, String]): StreamConfiguration = { var streamConfig = StreamConfiguration.builder diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index 98e48db..ea67613 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -262,7 +262,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, "10"))) + (MAX_OFFSET_PER_TRIGGER, 10))) } // ApproxDistance per segemnt(1/3=0.3) next StreamCut is fetched i.e 1 which is less then each event length i.e 10 . batch 1 will yield one event each @@ -273,7 +273,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, "1"))) + (MAX_OFFSET_PER_TRIGGER, 1))) } // ApproxDistance per segemnt(30/3=10) equal to each event length i.e 10 . batch 1 will yield one event each @@ -283,7 +283,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, "30"))) + (MAX_OFFSET_PER_TRIGGER, 30))) } // ApproxDistance per segemnt(33/3=11) greater then each event length i.e 10 . batch 1 will yield two events each @@ -294,7 +294,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, "33"))) + (MAX_OFFSET_PER_TRIGGER, 33))) } // ApproxDistance per segemnt(100/3=33) greater then each event length of all 3 events i.e 30 . batch 1 will yield all events till tail. //nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 @@ -304,14 +304,24 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, "100"))) + (MAX_OFFSET_PER_TRIGGER, 100))) + } + + test(s"read in batches of maxoffset Long max") + { + val streamName = newStreamName() + testForBatchSizeTooLarge(getDataSet( + streamName, + addSegments = false, + numSegments = 3, + (MAX_OFFSET_PER_TRIGGER, Long.MaxValue))) } private def getDataSet( streamName: String, addSegments: Boolean, numSegments: Int, - options: (String, String)*): Dataset[Int] = { + options: (String, Long)*): Dataset[Int] = { testUtils.createTestStream(streamName, numSegments = numSegments) testUtils.sendMessages(streamName, Array(10, 40, 70).map(_.toString), Some(0)) // appends 10,40,70 to segment 0 each length is 10 testUtils.sendMessages(streamName, Array(20, 50, 80).map(_.toString), Some(1)) // appends 20,50,80 to segment 1 each length is 10 From 66250fce80c2012f923f9588b52f79c2318a015f Mon Sep 17 00:00:00 2001 From: Munish Date: Mon, 21 Aug 2023 12:32:14 +0530 Subject: [PATCH 09/21] Updated name maxOffset to approxBytes Signed-off-by: Munish --- .../spark/PravegaMicroBatchStream.scala | 8 ++-- .../spark/PravegaSourceProvider.scala | 6 +-- .../spark/PravegaMicroBatchSourceSuite.scala | 44 +++++++++---------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index 788c812..4ecdca9 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -148,10 +148,10 @@ class PravegaMicroBatchStream( } override def getDefaultReadLimit: ReadLimit = { - val maxOffsetsPerTrigger = Option(options.get( - PravegaSourceProvider.MAX_OFFSET_PER_TRIGGER)).get.map(_.toLong) - if ( maxOffsetsPerTrigger.isDefined) { - ReadLimit.maxRows(maxOffsetsPerTrigger.get) + val approxBytesPerTrigger = Option(options.get( + PravegaSourceProvider.APPROX_BYTES_PER_TRIGGER)).get.map(_.toLong) + if ( approxBytesPerTrigger.isDefined) { + ReadLimit.maxRows(approxBytesPerTrigger.get) } else { super.getDefaultReadLimit } diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala index b4a0627..44b92bc 100755 --- a/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaSourceProvider.scala @@ -96,7 +96,7 @@ object PravegaSourceProvider extends Logging { private[spark] val DEFAULT_BATCH_TRANSACTION_TIMEOUT_MS: Long = 2 * 60 * 1000 // 2 minutes (maximum allowed by default server) private[spark] val DEFAULT_TRANSACTION_STATUS_POLL_INTERVAL_MS: Long = 50 - private[spark] val MAX_OFFSET_PER_TRIGGER = "max_offsets_per_trigger" + private[spark] val APPROX_BYTES_PER_TRIGGER = "approx_bytes_per_trigger" def buildStreamConfig(caseInsensitiveParams: Map[String, String]): StreamConfiguration = { var streamConfig = StreamConfiguration.builder @@ -203,8 +203,8 @@ object PravegaSourceProvider extends Logging { throw new IllegalArgumentException(s"Retention time should be an integer morethan or equal to zero milliseconds") } - val maxOffsetPerTrigger = caseInsensitiveParams.get(PravegaSourceProvider.MAX_OFFSET_PER_TRIGGER) - if (maxOffsetPerTrigger.isDefined && (Try(maxOffsetPerTrigger.get.toLong).isFailure || maxOffsetPerTrigger.get.toLong < 1)) + val approxBytesPerTrigger = caseInsensitiveParams.get(PravegaSourceProvider.APPROX_BYTES_PER_TRIGGER) + if (approxBytesPerTrigger.isDefined && (Try(approxBytesPerTrigger.get.toLong).isFailure || approxBytesPerTrigger.get.toLong < 1)) { throw new IllegalArgumentException(s"Max offset should be of type long more than or equal to one") } diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index ea67613..4be2c12 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -164,28 +164,28 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } - test(s"read from latest stream cut with maxoffset 1") { + test(s"read from latest stream cut with approxBytes 1") { val streamName = newStreamName() testFromLatestStreamCut( streamName, addSegments = false, - (MAX_OFFSET_PER_TRIGGER, "1")) + (APPROX_BYTES_PER_TRIGGER, "1")) } - test(s"read from latest stream cut with maxoffset 1000") { + test(s"read from latest stream cut with approxBytes 1000") { val streamName = newStreamName() testFromLatestStreamCut( streamName, addSegments = false, - (MAX_OFFSET_PER_TRIGGER, "1000")) + (APPROX_BYTES_PER_TRIGGER, "1000")) } - test(s"read from latest stream cut with maxoffset 10") { + test(s"read from latest stream cut with approxBytes 10") { val streamName = newStreamName() testFromLatestStreamCut( streamName, addSegments = false, - (MAX_OFFSET_PER_TRIGGER, "10")) + (APPROX_BYTES_PER_TRIGGER, "10")) } test(s"read from earliest stream cut") { @@ -195,7 +195,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } - test(s"read from earliest stream cut with maxoffset 10") { + test(s"read from earliest stream cut with approxBytes 10") { val streamName = newStreamName() testFromEarliestStreamCut( streamName, @@ -205,7 +205,7 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { ) } - test(s"read from earliest stream cut with maxoffset 1000") { + test(s"read from earliest stream cut with approxBytes 1000") { val streamName = newStreamName() testFromEarliestStreamCut( streamName, @@ -222,12 +222,12 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { addSegments = false) } - test(s"read from specific stream cut with maxoffset 10") { + test(s"read from specific stream cut with approxBytes 10") { val streamName = newStreamName() testFromSpecificStreamCut( streamName, addSegments = false, - (MAX_OFFSET_PER_TRIGGER, "10")) + (APPROX_BYTES_PER_TRIGGER, "10")) } test(s"read from earliest stream cut, add new segments") { @@ -256,65 +256,65 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { } } - test(s"read in batches of maxoffset 10") { + test(s"read in batches of approxBytes 10") { val streamName = newStreamName() testForBatchSizeMinimum(getDataSet( streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, 10))) + (APPROX_BYTES_PER_TRIGGER, 10))) } // ApproxDistance per segemnt(1/3=0.3) next StreamCut is fetched i.e 1 which is less then each event length i.e 10 . batch 1 will yield one event each - test(s"read in batches of maxoffset 1") + test(s"read in batches of approxBytes 1") { val streamName = newStreamName() testForBatchSizeMinimum(getDataSet( streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, 1))) + (APPROX_BYTES_PER_TRIGGER, 1))) } // ApproxDistance per segemnt(30/3=10) equal to each event length i.e 10 . batch 1 will yield one event each - test(s"read in batches of maxoffset 30") { + test(s"read in batches of approxBytes 30") { val streamName = newStreamName() testForBatchSizeMinimum(getDataSet( streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, 30))) + (APPROX_BYTES_PER_TRIGGER, 30))) } // ApproxDistance per segemnt(33/3=11) greater then each event length i.e 10 . batch 1 will yield two events each // nextStreamCut = scope/stream0:0=20, 1=20, 2=20 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 - test(s"read in batches of maxoffset 33") { + test(s"read in batches of approxBytes 33") { val streamName = newStreamName() testForBatchSizeCustom(getDataSet( streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, 33))) + (APPROX_BYTES_PER_TRIGGER, 33))) } // ApproxDistance per segemnt(100/3=33) greater then each event length of all 3 events i.e 30 . batch 1 will yield all events till tail. //nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 - test(s"read in batches of maxoffset 100") { + test(s"read in batches of approxBytes 100") { val streamName = newStreamName() testForBatchSizeTooLarge(getDataSet( streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, 100))) + (APPROX_BYTES_PER_TRIGGER, 100))) } - test(s"read in batches of maxoffset Long max") + test(s"read in batches of approxBytes Long max") { val streamName = newStreamName() testForBatchSizeTooLarge(getDataSet( streamName, addSegments = false, numSegments = 3, - (MAX_OFFSET_PER_TRIGGER, Long.MaxValue))) + (APPROX_BYTES_PER_TRIGGER, Long.MaxValue))) } private def getDataSet( From 93f1404713bf69d4789afd9648b65575ce896856 Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 1 Sep 2023 12:36:04 +0530 Subject: [PATCH 10/21] Updated a test case Signed-off-by: Munish --- .../connectors/spark/PravegaMicroBatchSourceSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index 4be2c12..e0ef48c 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -276,14 +276,14 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { (APPROX_BYTES_PER_TRIGGER, 1))) } - // ApproxDistance per segemnt(30/3=10) equal to each event length i.e 10 . batch 1 will yield one event each + // ApproxDistance per segemnt(20/3=6.6) next StreamCut is fetched i.e 1 which is less then each event length i.e 10 . batch 1 will yield one event each test(s"read in batches of approxBytes 30") { val streamName = newStreamName() testForBatchSizeMinimum(getDataSet( streamName, addSegments = false, numSegments = 3, - (APPROX_BYTES_PER_TRIGGER, 30))) + (APPROX_BYTES_PER_TRIGGER, 20))) } // ApproxDistance per segemnt(33/3=11) greater then each event length i.e 10 . batch 1 will yield two events each From d7eb3ae3e6d8f183a474238717b1d247eb6dee38 Mon Sep 17 00:00:00 2001 From: Munish Date: Tue, 12 Sep 2023 21:10:52 +0530 Subject: [PATCH 11/21] Exception handelled at connector side Signed-off-by: Munish --- .../spark/PravegaBatchPartitionReader.scala | 23 +++++++++++++------ .../spark/PravegaMicroBatchStream.scala | 8 +++++++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala index 25f615f..c113e0f 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala @@ -18,6 +18,7 @@ package io.pravega.connectors.spark import io.pravega.client.batch.SegmentRange +import io.pravega.client.stream.TruncatedDataException import io.pravega.client.stream.impl.ByteBufferSerializer import io.pravega.client.{BatchClientFactory, ClientConfig} import org.apache.spark.internal.Logging @@ -52,13 +53,21 @@ case class PravegaBatchPartitionReader( override def next(): Boolean = { if (iterator.hasNext) { val offset = iterator.getOffset - val event = iterator.next() - nextRow = converter.toUnsafeRow( - event.array, - segmentRange.getScope, - segmentRange.getStreamName, - segmentRange.getSegmentId, - offset) + + try + { + val event = iterator.next() + nextRow = converter.toUnsafeRow( + event.array, + segmentRange.getScope, + segmentRange.getStreamName, + segmentRange.getSegmentId, + offset) + } + catch + { + case e: TruncatedDataException => log.trace("next: TruncatedDataException while reading data", e) + } true } else false } diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index 4ecdca9..523349a 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -18,6 +18,7 @@ package io.pravega.connectors.spark import io.pravega.client.admin.StreamManager +import io.pravega.client.segment.impl.SegmentTruncatedException import io.pravega.client.stream.{Stream, StreamCut} import io.pravega.client.{BatchClientFactory, ClientConfig} import org.apache.spark.internal.Logging @@ -142,6 +143,13 @@ class PravegaMicroBatchStream( } else { val upperLimit = limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows]) PravegaSourceOffset(batchClientFactory.getNextStreamCut(startOffset, upperLimit.get.maxRows())) + try { + PravegaSourceOffset(batchClientFactory.getNextStreamCut(startOffset, upperLimit.get.maxRows())) + } + catch + { + case e: SegmentTruncatedException => PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getTailStreamCut) + } } log.info(s"nextStreamCut = ${nextStreamCut.streamCut}") nextStreamCut From 6fc9d0cb3490f04bc36ec08463fed9052f833256 Mon Sep 17 00:00:00 2001 From: Munish Date: Thu, 14 Sep 2023 19:13:16 +0530 Subject: [PATCH 12/21] Exception handelled Signed-off-by: Munish --- .../connectors/spark/PravegaBatchPartitionReader.scala | 4 +++- .../pravega/connectors/spark/PravegaMicroBatchStream.scala | 2 +- .../connectors/spark/PravegaMicroBatchSourceSuite.scala | 7 ++++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala index c113e0f..46e324b 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala @@ -66,7 +66,9 @@ case class PravegaBatchPartitionReader( } catch { - case e: TruncatedDataException => log.trace("next: TruncatedDataException while reading data", e) + case e: TruncatedDataException => + log.warn("next: TruncatedDataException while reading data", e) + return false } true } else false diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index 523349a..a33a95f 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -148,7 +148,7 @@ class PravegaMicroBatchStream( } catch { - case e: SegmentTruncatedException => PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getTailStreamCut) + case e: SegmentTruncatedException => PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getHeadStreamCut) } } log.info(s"nextStreamCut = ${nextStreamCut.streamCut}") diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index e0ef48c..f00d564 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -347,16 +347,16 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { * Batch 1: nextStreamCut = scope/stream0:0=10, 1=10, 2=10 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 * Batch 2: nextStreamCut = scope/stream0:0=20, 1=20, 2=20 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 * Batch 3: nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + * We have a probabilistic mechanism to get nextOffset so can not guarantee each batch will have consistent data + * but assert that eventually it must fetch all data. */ private def testForBatchSizeMinimum( mapped: Dataset[Int]): Unit = { val clock = new StreamManualClock testStream(mapped)( StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed(clock), - CheckAnswer(11, 21, 31), AdvanceManualClock(100), waitUntilBatchProcessed(clock), - CheckAnswer(11, 21, 31, 41, 51, 61), AdvanceManualClock(100), waitUntilBatchProcessed(clock), CheckAnswer(11, 21, 31, 41, 51, 61, 71, 81, 91) @@ -366,13 +366,14 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { /* * Batch 1: nextStreamCut = scope/stream0:0=20, 1=20, 2=20 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 * Batch 2: nextStreamCut = scope/stream0:0=30, 1=30, 2=30 Tail stream cut = scope/stream0:0=30, 1=30, 2=30 + * We have a probabilistic mechanism to get nextOffset so can not guarantee each batch will have consistent data + * but assert that eventually it must fetch all data. */ private def testForBatchSizeCustom(mapped: Dataset[Int]): Unit = { val clock = new StreamManualClock testStream(mapped)( StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed(clock), - CheckAnswer(11, 21, 31, 41, 51, 61), AdvanceManualClock(100), waitUntilBatchProcessed(clock), CheckAnswer(11, 21, 31, 41, 51, 61, 71, 81, 91) From 76d18b5192773525f317af252ab740719550e76c Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 15 Sep 2023 20:01:33 +0530 Subject: [PATCH 13/21] Test case added for TruncatedDataException Signed-off-by: Munish --- .../spark/PravegaMicroBatchSourceSuite.scala | 47 +++++++++++++++++++ .../connectors/spark/PravegaTestUtils.scala | 4 ++ 2 files changed, 51 insertions(+) diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala index f00d564..daed8cc 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaMicroBatchSourceSuite.scala @@ -134,6 +134,53 @@ abstract class PravegaSourceSuiteBase extends PravegaSourceTest { true } + // Test TruncatedDataException scenario. + test(s"read from some earlier streamcut in a truncated stream.") + { + val streamName = newStreamName() + testUtils.createTestStream(streamName, numSegments = 5) + + // Following messages will be truncated . + testUtils.sendMessages(streamName, Array(-10, -11, -12).map(_.toString), Some(1)) + testUtils.sendMessages(streamName, Array(10).map(_.toString), Some(3)) + testUtils.sendMessages(streamName, Array(20, 21).map(_.toString), Some(4)) + + val streamInfo1 = testUtils.getStreamInfo(streamName) + log.info(s"testFromSpecificStreamCut: streamInfo1=${streamInfo1}") + val head1 = streamInfo1.getHeadStreamCut.asText + val tail1 = streamInfo1.getTailStreamCut.asText + assert(head1 != tail1) + + //Truncating the Stream from tail stream cut to mimic TruncatedDataException scenario while reading from some earlier stream Cut. + assert(testUtils.truncateStream(streamName, streamInfo1.getTailStreamCut)) + + // Passing headStreaCut to reader to read from truncated Steam part. + val reader = spark.readStream + reader + .format(SOURCE_PROVIDER_NAME) + .option(CONTROLLER_OPTION_KEY, testUtils.controllerUri) + .option(SCOPE_OPTION_KEY, testUtils.scope) + .option(STREAM_OPTION_KEY, streamName) + .option(START_STREAM_CUT_OPTION_KEY, head1) + .option(APPROX_BYTES_PER_TRIGGER, 10) + + val dataset = reader.load() + .selectExpr("CAST(event AS STRING)") + .as[String] + val mapped = dataset.map(e => e.toInt) + testStream(mapped)( + makeSureGetOffsetCalled, + CheckAnswer(), + AddPravegaData(Set(streamName), 28, 29), + CheckAnswer(28, 29), + StopStream, + StartStream(), + AddPravegaData(Set(streamName), 30, 31, 32, 33, 34), + CheckAnswer(28, 29, 30, 31, 32, 33, 34), + StopStream + ) + } + test("stop stream before reading anything") { val streamName = newStreamName() testUtils.createTestStream(streamName, numSegments = 5) diff --git a/src/test/scala/io/pravega/connectors/spark/PravegaTestUtils.scala b/src/test/scala/io/pravega/connectors/spark/PravegaTestUtils.scala index eeeab9f..340da3b 100644 --- a/src/test/scala/io/pravega/connectors/spark/PravegaTestUtils.scala +++ b/src/test/scala/io/pravega/connectors/spark/PravegaTestUtils.scala @@ -145,6 +145,10 @@ class PravegaTestUtils extends Logging { } } } + + def truncateStream(streamName: String, streamCut : StreamCut): Boolean = { + SETUP_UTILS.getController.truncateStream(scope, streamName, streamCut).get() + } } object PravegaTestUtils { From 45252d3b758bb201f8e00614ed292a7091e72925 Mon Sep 17 00:00:00 2001 From: Munish Date: Mon, 18 Sep 2023 12:37:31 +0530 Subject: [PATCH 14/21] Testing pravega version Signed-off-by: Munish --- gradle.properties | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index db40738..9b8ef06 100644 --- a/gradle.properties +++ b/gradle.properties @@ -27,7 +27,9 @@ sparkVersion=3.4.0 # Version and base tags can be overridden at build time. connectorVersion=0.13.0-SNAPSHOT -pravegaVersion=0.13.0-3142.2542966-SNAPSHOT +#pravegaVersion=0.13.0-3142.2542966-SNAPSHOT +pravegaVersion=0.14.0-3255.11b61cf67-master-a2a4964-suse + # flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion' usePravegaVersionSubModule=false From c9dd4ab3a3655752b5c6ad5e890a7542597472fa Mon Sep 17 00:00:00 2001 From: Munish Date: Mon, 18 Sep 2023 16:39:24 +0530 Subject: [PATCH 15/21] Testing pravega version Signed-off-by: Munish --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 9b8ef06..ba715b0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -28,7 +28,7 @@ sparkVersion=3.4.0 # Version and base tags can be overridden at build time. connectorVersion=0.13.0-SNAPSHOT #pravegaVersion=0.13.0-3142.2542966-SNAPSHOT -pravegaVersion=0.14.0-3255.11b61cf67-master-a2a4964-suse +pravegaVersion=0.14.0-3257.fd53bec-SNAPSHOT # flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion' From 715f259876438e58f4010a5d7e692818d0c1b321 Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 22 Sep 2023 11:38:17 +0530 Subject: [PATCH 16/21] Added new logs and handled TruncateddataException Signed-off-by: Munish --- gradle.properties | 5 ++--- .../connectors/spark/PravegaBatchPartitionReader.scala | 2 +- .../pravega/connectors/spark/PravegaMicroBatchStream.scala | 4 +++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/gradle.properties b/gradle.properties index ba715b0..d39e1b6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -26,9 +26,8 @@ slf4jApiVersion=1.7.25 sparkVersion=3.4.0 # Version and base tags can be overridden at build time. -connectorVersion=0.13.0-SNAPSHOT -#pravegaVersion=0.13.0-3142.2542966-SNAPSHOT -pravegaVersion=0.14.0-3257.fd53bec-SNAPSHOT +connectorVersion=0.14.0-SNAPSHOT +pravegaVersion=0.14.0-3259.b1d51d5-SNAPSHOT # flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion' diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala index 46e324b..d9a0d98 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala @@ -68,7 +68,7 @@ case class PravegaBatchPartitionReader( { case e: TruncatedDataException => log.warn("next: TruncatedDataException while reading data", e) - return false + return next() } true } else false diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index a33a95f..1429858 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -151,7 +151,7 @@ class PravegaMicroBatchStream( case e: SegmentTruncatedException => PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getHeadStreamCut) } } - log.info(s"nextStreamCut = ${nextStreamCut.streamCut}") + log.info(s"nextStreamCut = ${nextStreamCut.streamCut} , limits : ${limits}") nextStreamCut } @@ -159,8 +159,10 @@ class PravegaMicroBatchStream( val approxBytesPerTrigger = Option(options.get( PravegaSourceProvider.APPROX_BYTES_PER_TRIGGER)).get.map(_.toLong) if ( approxBytesPerTrigger.isDefined) { + log.info(s"approxBytesPerTrigger : = ${approxBytesPerTrigger}") ReadLimit.maxRows(approxBytesPerTrigger.get) } else { + log.info("approxBytesPerTrigger is not configured") super.getDefaultReadLimit } } From d29e6b50224aa7acf53023181d76eb17efc848d2 Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 22 Sep 2023 12:09:55 +0530 Subject: [PATCH 17/21] Removed extra space. Signed-off-by: Munish --- gradle.properties | 1 - 1 file changed, 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index d39e1b6..1f8906d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -29,7 +29,6 @@ sparkVersion=3.4.0 connectorVersion=0.14.0-SNAPSHOT pravegaVersion=0.14.0-3259.b1d51d5-SNAPSHOT - # flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion' usePravegaVersionSubModule=false From 403c5b16058d2fe5829db5c031a80ed5a86ab5b1 Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 22 Sep 2023 19:19:36 +0530 Subject: [PATCH 18/21] Removed redundent line Signed-off-by: Munish --- .../io/pravega/connectors/spark/PravegaMicroBatchStream.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala index 1429858..e72b761 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaMicroBatchStream.scala @@ -142,7 +142,6 @@ class PravegaMicroBatchStream( PravegaSourceOffset(PravegaUtils.getStreamInfo(streamManager, scopeName, streamName).getTailStreamCut) } else { val upperLimit = limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows]) - PravegaSourceOffset(batchClientFactory.getNextStreamCut(startOffset, upperLimit.get.maxRows())) try { PravegaSourceOffset(batchClientFactory.getNextStreamCut(startOffset, upperLimit.get.maxRows())) } From 06dc9b91bbbe91ad9bceb9f9a3190b242c4531f2 Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 22 Sep 2023 19:38:18 +0530 Subject: [PATCH 19/21] retrigger build/empty commit Signed-off-by: Munish From 6a40698effee6346dc083e4415651ca426c451ca Mon Sep 17 00:00:00 2001 From: Munish Date: Fri, 22 Sep 2023 21:55:43 +0530 Subject: [PATCH 20/21] revert readme Signed-off-by: Munish --- README.md | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 5e2e967..0860078 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 --> -# Pravega Spark Connectors [![Build Status](https://travis-ci.org/pravega/spark-connectors.svg?branch=master)](https://travis-ci.org/pravega/spark-connectors) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) +# Pravega Spark Connectors [![Build Status](https://github.com/pravega/spark-connectors/actions/workflows/build.yml/badge.svg)](https://github.com/pravega/spark-connectors/actions/workflows/build.yml?query=branch%3Amaster) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0) This repository implements connectors to read and write [Pravega](http://pravega.io/) Streams with [Apache Spark](http://spark.apache.org/), a high-performance analytics engine for batch and streaming data. @@ -37,14 +37,10 @@ To learn more about Pravega, visit https://pravega.io The [master](https://github.com/pravega/spark-connectors) branch will always have the most recent supported versions of Spark and Pravega. -| Spark Version | Pravega Version | Java Version To Build Connector | Java Version To Run Connector | Git Branch | -|---------------|-----------------|---------------------------------|-------------------------------|-----------------------------------------------------------------------------------| -| 3.1+ | 0.13 | Java 11 | Java 8 or 11 | [master](https://github.com/pravega/spark-connectors) | -| 2.4 | 0.13 | Java 8 | Java 8 | [r0.13-spark2.4](https://github.com/pravega/spark-connectors/tree/r0.13-spark2.4) | -| 3.1+ | 0.12 | Java 11 | Java 8 or 11 | [r0.12](https://github.com/pravega/spark-connectors/tree/r0.12) | -| 2.4 | 0.12 | Java 8 | Java 8 | [r0.12-spark2.4](https://github.com/pravega/spark-connectors/tree/r0.12-spark2.4) | -| 3.1+ | 0.11 | Java 11 | Java 8 or 11 | [r0.11](https://github.com/pravega/spark-connectors/tree/r0.11) | -| 2.4 | 0.11 | Java 8 | Java 8 | [r0.11-spark2.4](https://github.com/pravega/spark-connectors/tree/r0.11-spark2.4) | +| Spark Version | Pravega Version | Java Version To Build Connector | Java Version To Run Connector | Git Branch | +|---------------|-----------------|---------------------------------|-------------------------------|-----------------------------------------------------------------| +| 3.4 | 0.14 | Java 11 | Java 8 or 11 | [master](https://github.com/pravega/spark-connectors) | +| 3.4 | 0.13 | Java 11 | Java 8 or 11 | [r0.13](https://github.com/pravega/spark-connectors/tree/r0.13) | ## Support From 1f153ba37bec6bcffca55fecbb4bc2063547376a Mon Sep 17 00:00:00 2001 From: Munish Date: Mon, 25 Sep 2023 10:26:50 +0530 Subject: [PATCH 21/21] Update log statement Signed-off-by: Munish --- .../pravega/connectors/spark/PravegaBatchPartitionReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala index d9a0d98..50475de 100644 --- a/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala +++ b/src/main/scala/io/pravega/connectors/spark/PravegaBatchPartitionReader.scala @@ -67,7 +67,7 @@ case class PravegaBatchPartitionReader( catch { case e: TruncatedDataException => - log.warn("next: TruncatedDataException while reading data", e) + log.warn("next: TruncatedDataException while reading data. Data was deleted, skipping over missing entries.", e) return next() } true