Skip to content

Commit

Permalink
Revert "Merge pull request apache#23 from Shopify/profile-refactor"
Browse files Browse the repository at this point in the history
This reverts commit 11dfaa6, reversing
changes made to f54ccf8.
  • Loading branch information
airhorns committed Feb 24, 2015
1 parent bd71ee3 commit 2a9d540
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 198 deletions.
3 changes: 0 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,6 @@ Apart from these, the following properties are also available, and may be useful
or it will be displayed before the driver exiting. It also can be dumped into disk by
`sc.dump_profiles(path)`. If some of the profile results had been displayed maually,
they will not be displayed automatically before driver exiting.

By default the `pyspark.profiler.BasicProfiler` will be used, but this can be overridden by
passing a profiler class in as a parameter to the `SparkContext` constructor.
</td>
</tr>
<tr>
Expand Down
2 changes: 0 additions & 2 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,11 @@
from pyspark.accumulators import Accumulator, AccumulatorParam
from pyspark.broadcast import Broadcast
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.profiler import BasicProfiler

# for back compatibility
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row

__all__ = [
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",
"Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
"BasicProfiler",
]
50 changes: 37 additions & 13 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
import atexit

from pyspark import accumulators
from pyspark.accumulators import Accumulator
Expand All @@ -32,7 +33,6 @@
from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.profiler import ProfilerCollector

from py4j.java_collections import ListConverter

Expand Down Expand Up @@ -66,7 +66,7 @@ class SparkContext(object):

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
gateway=None, jsc=None, profiler=None):
gateway=None, jsc=None):
"""
Create a new SparkContext. At least the master and app name should be set,
either through the named parameters here or through C{conf}.
Expand Down Expand Up @@ -102,14 +102,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
SparkContext._ensure_initialized(self, gateway=gateway)
try:
self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf, jsc, profiler)
conf, jsc)
except:
# If an error occurs, clean up in order to allow future SparkContext creation:
self.stop()
raise

def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
conf, jsc, profiler):
conf, jsc):
self.environment = environment or {}
self._conf = conf or SparkConf(_jvm=self._jvm)
self._batchSize = batchSize # -1 represents an unlimited batch size
Expand Down Expand Up @@ -192,11 +192,7 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()

# profiling stats collected for each PythonRDD
if self._conf.get("spark.python.profile", "false") == "true":
self.profiler_collector = ProfilerCollector(profiler)
self.profiler_collector.profiles_dump_path = self._conf.get("spark.python.profile.dump", None)
else:
self.profiler_collector = None
self._profile_stats = []

def _initialize_context(self, jconf):
"""
Expand Down Expand Up @@ -822,11 +818,39 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))

def show_profiles(self):
self.profiler_collector.show_profiles()
def _add_profile(self, id, profileAcc):
if not self._profile_stats:
dump_path = self._conf.get("spark.python.profile.dump")
if dump_path:
atexit.register(self.dump_profiles, dump_path)
else:
atexit.register(self.show_profiles)

def dump_profiles(self):
self.profiler_collector.dump_profiles()
self._profile_stats.append([id, profileAcc, False])

def show_profiles(self):
""" Print the profile stats to stdout """
for i, (id, acc, showed) in enumerate(self._profile_stats):
stats = acc.value
if not showed and stats:
print "=" * 60
print "Profile of RDD<id=%d>" % id
print "=" * 60
stats.sort_stats("time", "cumulative").print_stats()
# mark it as showed
self._profile_stats[i][2] = True

def dump_profiles(self, path):
""" Dump the profile stats into directory `path`
"""
if not os.path.exists(path):
os.makedirs(path)
for id, acc, _ in self._profile_stats:
stats = acc.value
if stats:
p = os.path.join(path, "rdd_%d.pstats" % id)
stats.dump_stats(p)
self._profile_stats = []


def _test():
Expand Down
136 changes: 0 additions & 136 deletions python/pyspark/profiler.py

This file was deleted.

15 changes: 6 additions & 9 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import random
from math import sqrt, log, isinf, isnan

from pyspark.accumulators import PStatsParam
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
Expand Down Expand Up @@ -2105,13 +2106,9 @@ def _jrdd(self):
return self._jrdd_val
if self._bypass_serializer:
self._jrdd_deserializer = NoOpSerializer()

if self.ctx.profiler_collector:
profiler = self.ctx.profiler_collector.new_profiler(self.ctx)
else:
profiler = None

command = (self.func, profiler, self._prev_jrdd_deserializer,
enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true"
profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None
command = (self.func, profileStats, self._prev_jrdd_deserializer,
self._jrdd_deserializer)
# the serialized command will be compressed by broadcast
ser = CloudPickleSerializer()
Expand All @@ -2134,9 +2131,9 @@ def _jrdd(self):
broadcast_vars, self.ctx._javaAccumulator)
self._jrdd_val = python_rdd.asJavaRDD()

if profiler:
if enable_profile:
self._id = self._jrdd_val.id()
self.ctx.profiler_collector.add_profiler(self._id, profiler)
self.ctx._add_profile(self._id, profileStats)
return self._jrdd_val

def id(self):
Expand Down
38 changes: 8 additions & 30 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from pyspark.sql import SQLContext, IntegerType, Row, ArrayType, StructType, StructField, \
UserDefinedType, DoubleType
from pyspark import shuffle
from pyspark.profiler import BasicProfiler

_have_scipy = False
_have_numpy = False
Expand Down Expand Up @@ -715,13 +714,16 @@ def setUp(self):
conf = SparkConf().set("spark.python.profile", "true")
self.sc = SparkContext('local[4]', class_name, conf=conf)


def test_profiler(self):
self.do_computation()

profilers = self.sc.profiler_collector.profilers
self.assertEqual(1, len(profilers))
id, acc, _ = profilers[0]
def heavy_foo(x):
for i in range(1 << 20):
x = 1
rdd = self.sc.parallelize(range(100))
rdd.foreach(heavy_foo)
profiles = self.sc._profile_stats
self.assertEqual(1, len(profiles))
id, acc, _ = profiles[0]
stats = acc.value
self.assertTrue(stats is not None)
width, stat_list = stats.get_print_list([])
Expand All @@ -733,30 +735,6 @@ def test_profiler(self):
self.sc.dump_profiles(d)
self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))

def test_custom_profiler(self):
class TestCustomProfiler(BasicProfiler):
def show_profiles(self, profilers):
return "Custom formatting"

self.sc.profiler_collector.profiler = TestCustomProfiler

self.do_computation()

profilers = self.sc.profiler_collector.profilers
self.assertEqual(1, len(profilers))
id, profiler, _ = profilers[0]
self.assertTrue(isinstance(profiler, TestCustomProfiler))

self.assertEqual("Custom formatting", self.sc.show_profiles())

def do_computation(self):
def heavy_foo(x):
for i in range(1 << 20):
x = 1

rdd = self.sc.parallelize(range(100))
rdd.foreach(heavy_foo)


class ExamplePointUDT(UserDefinedType):
"""
Expand Down
13 changes: 9 additions & 4 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import time
import socket
import traceback
import cProfile
import pstats

from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
Expand Down Expand Up @@ -88,16 +90,19 @@ def main(infile, outfile):
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
(func, profiler, deserializer, serializer) = command
(func, stats, deserializer, serializer) = command
init_time = time.time()

def process():
iterator = deserializer.load_stream(infile)
serializer.dump_stream(func(split_index, iterator), outfile)

if profiler:
st = profiler.profile(process)
profiler.add(st)
if stats:
p = cProfile.Profile()
p.runcall(process)
st = pstats.Stats(p)
st.stream = None # make it picklable
stats.add(st.strip_dirs())
else:
process()
except Exception:
Expand Down
1 change: 0 additions & 1 deletion python/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ function run_core_tests() {
PYSPARK_DOC_TEST=1 run_test "pyspark/broadcast.py"
PYSPARK_DOC_TEST=1 run_test "pyspark/accumulators.py"
run_test "pyspark/serializers.py"
PYSPARK_DOC_TEST=1 run_test "pyspark/profiler.py"
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
}
Expand Down

0 comments on commit 2a9d540

Please sign in to comment.