Skip to content

Commit

Permalink
[SPARK-30199][DSTREAM] Recover spark.(ui|blockManager).port from ch…
Browse files Browse the repository at this point in the history
…eckpoint

### What changes were proposed in this pull request?

This PR aims to recover `spark.ui.port` and `spark.blockManager.port` from checkpoint like `spark.driver.port`.

### Why are the changes needed?

When the user configures these values, we can respect them.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins with the newly added test cases.

Closes #26827 from dongjoon-hyun/SPARK-30199.

Authored-by: Aaruna <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
aaruna authored and dongjoon-hyun committed Apr 23, 2020
1 parent 5183984 commit 0c14b27
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
"spark.driver.bindAddress",
"spark.driver.port",
"spark.master",
"spark.ui.port",
"spark.blockManager.port",
"spark.kubernetes.driver.pod.name",
"spark.kubernetes.executor.podNamePrefix",
"spark.yarn.jars",
Expand All @@ -66,6 +68,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
.remove("spark.driver.host")
.remove("spark.driver.bindAddress")
.remove("spark.driver.port")
.remove("spark.ui.port")
.remove("spark.blockManager.port")
.remove("spark.kubernetes.driver.pod.name")
.remove("spark.kubernetes.executor.podNamePrefix")
val newReloadConf = new SparkConf(loadDefaults = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,33 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(restoredConf1.get("spark.driver.port") !== "9999")
}

test("SPARK-30199 get ui port and blockmanager port") {
val conf = Map("spark.ui.port" -> "30001", "spark.blockManager.port" -> "30002")
conf.foreach { case (k, v) => System.setProperty(k, v) }
ssc = new StreamingContext(master, framework, batchDuration)
conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) }

val cp = new Checkpoint(ssc, Time(1000))
ssc.stop()

// Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))

val newCpConf = newCp.createSparkConf()
conf.foreach { case (k, v) => assert(newCpConf.contains(k) && newCpConf.get(k) === v) }

// Check if all the parameters have been restored
ssc = new StreamingContext(null, newCp, null)
conf.foreach { case (k, v) => assert(ssc.conf.get(k) === v) }
ssc.stop()

// If port numbers are not set in system property, these parameters should not be presented
// in the newly recovered conf.
conf.foreach(kv => System.clearProperty(kv._1))
val newCpConf1 = newCp.createSparkConf()
conf.foreach { case (k, _) => assert(!newCpConf1.contains(k)) }
}

// This tests whether the system can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
// source - TestInputDStream.
Expand Down

0 comments on commit 0c14b27

Please sign in to comment.