diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 3f455a3e06072..5d6740893ada5 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -16,6 +16,7 @@ # import sys +import time from signal import signal, SIGTERM, SIGINT from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer @@ -102,11 +103,12 @@ def start(self): def awaitTermination(self, timeout=None): """ Wait for the execution to stop. + timeout is milliseconds """ if timeout is None: self._jssc.awaitTermination() else: - self._jssc.awaitTermination(timeout) + time.sleep(timeout/1000) #TODO: add storageLevel def socketTextStream(self, hostname, port): diff --git a/python/pyspark/streaming_tests.py b/python/pyspark/streaming_tests.py index 8396c4f960e81..2964107f2d92e 100644 --- a/python/pyspark/streaming_tests.py +++ b/python/pyspark/streaming_tests.py @@ -48,7 +48,7 @@ def tearDown(self): self.ssc._sc.stop() # Why does it long time to terminate StremaingContext and SparkContext? # Should we change the sleep time if this depends on machine spec? - time.sleep(10) + time.sleep(1) @classmethod def tearDownClass(cls): @@ -436,7 +436,8 @@ def _run_stream(self, test_input, test_func, expected_output, numSlices=None): # Check time out. if (current_time - start_time) > self.timeout: break - self.ssc.awaitTermination(50) + #self.ssc.awaitTermination(50) + time.sleep(0.05) # Check if the output is the same length of expexted output. if len(expected_output) == len(result): break