Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the error when num of partitions increased #500

Merged
merged 11 commits into from
Oct 6, 2020

Conversation

tilumi
Copy link
Contributor

@tilumi tilumi commented Apr 11, 2020

Dedicated EventHub support partition increase, but Spark job get failed due to checkpoint doesn't contain sequence number for new partitions:

Caused by: java.lang.IllegalStateException: {"ehName":"hygieneraw","partitionId":63} doesn't have a fromSeqNo
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$7$$anonfun$8.apply(EventHubsSource.scala:283)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$7$$anonfun$8.apply(EventHubsSource.scala:283)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$7.apply(EventHubsSource.scala:281)
at org.apache.spark.sql.eventhubs.EventHubsSource$$anonfun$7.apply(EventHubsSource.scala:280)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

This PR contains following change:

  1. get num of partitions for every batch
  2. introduce parameter "StartingPositionForNewPartitions", which specify starting position for new partitions, default is fromStartOfStream
  3. In EventHubsSource, check if num of partitions increased, if yes, translate starting positions to sequence numbers for new partitions, combined with sequence numbers of previous batch as sequence numbers for new batch

@tilumi tilumi changed the title add changes for increasing partitions scenario fix the error when num of partitions increased Apr 12, 2020
Copy link
Contributor

@nyaghma nyaghma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed change updates the common path of a batch execution by getting the number of partitions for every single batch (partitionCount is not a lazy val anymore and this affects the execution of earliestAndLatest for every batch).
This is an overhead added to every batch execution for a scenario that would happen rarely. Therefore, instead of changing the common case scenario in batch execution we can add an exception handler which handles this scenario when needed.

@tilumi
Copy link
Contributor Author

tilumi commented Aug 9, 2020

The proposed change updates the common path of a batch execution by getting the number of partitions for every single batch (partitionCount is not a lazy val anymore and this affects the execution of earliestAndLatest for every batch).
This is an overhead added to every batch execution for a scenario that would happen rarely. Therefore, instead of changing the common case scenario in batch execution we can add an exception handler which handles this scenario when needed.

I remove getting number of partitions for each batch part, then users should restart their jobs every time partition number increased

Copy link
Contributor

@nyaghma nyaghma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an exception handling scenario so that when the exception happens it gets the new partition size and adds the new partitions. In this way, the user doesn't have to restart when he/she adds a new partition.

@tilumi tilumi requested a review from nyaghma August 12, 2020 15:06
@tilumi tilumi requested a review from sjkwak August 25, 2020 18:49
@sjkwak
Copy link
Member

sjkwak commented Sep 22, 2020

Hi @tilumi - have we done an upgrade testing with the change and verify whether the latest iteration works as expected?

@sjkwak sjkwak merged commit 3591789 into Azure:master Oct 6, 2020
nyaghma pushed a commit to nyaghma/azure-event-hubs-spark that referenced this pull request Dec 11, 2020
nyaghma pushed a commit to nyaghma/azure-event-hubs-spark that referenced this pull request May 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants