Skip to content

Commit

Permalink
Added configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed May 6, 2015
1 parent 9f1f9b1 commit 685fe00
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,15 @@ class StreamingContext private[streaming] (
waiter.waitForStopOrError(timeout)
}

/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed). The underlying SparkContext will also be stopped. Note that this can
* be configured using the SparkConf configuration spark.streaming.stopSparkContextByDefault.
*/
def stop(): Unit = synchronized {
stop(conf.getBoolean("spark.streaming.stopSparkContextByDefault", true), false)
}

/**
* Stop the execution of the streams immediately (does not wait for all received data
* to be processed).
Expand All @@ -569,7 +578,7 @@ class StreamingContext private[streaming] (
* will be stopped regardless of whether this StreamingContext has been
* started.
*/
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
def stop(stopSparkContext: Boolean): Unit = synchronized {
stop(stopSparkContext, false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
assert(ssc.state === ssc.StreamingContextState.Started)
ssc.stop()
assert(ssc.state === ssc.StreamingContextState.Stopped)

// Make sure that the SparkContext is also stopped by default
intercept[Exception] {
ssc.sparkContext.makeRDD(1 to 10)
}
}

test("start multiple times") {
Expand Down Expand Up @@ -154,16 +159,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}

test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
val conf = new SparkConf().setMaster(master).setAppName(appName)

// Explicitly do not stop SparkContext
ssc = new StreamingContext(conf, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop(stopSparkContext = false)
assert(sc.makeRDD(1 to 100).collect().size === 100)
ssc = new StreamingContext(sc, batchDuration)
sc.stop()

// Implicitly do not stop SparkContext
conf.set("spark.streaming.stopSparkContextByDefault", "false")
ssc = new StreamingContext(conf, batchDuration)
sc = ssc.sparkContext
addInputStream(ssc).register()
ssc.start()
ssc.stop()
assert(sc.makeRDD(1 to 100).collect().size === 100)
sc.stop()
}

test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
Expand Down

0 comments on commit 685fe00

Please sign in to comment.