Skip to content

Commit

Permalink
[Spark] Fix check to enumerate job IDs
Browse files Browse the repository at this point in the history
Add a conditional for the new code path (as it's for older versions of
Spark, and causes an additional HTTP request that's not necessary on
newer versions)

Added review feedback

Add unit test
  • Loading branch information
derekwbrown committed Jan 27, 2017
1 parent 92747a4 commit 222a870
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 7 deletions.
36 changes: 29 additions & 7 deletions checks.d/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@

# Project
from checks import AgentCheck
from config import _is_affirmative

# Identifier for cluster master address in `spark.yaml`
MASTER_ADDRESS = 'spark_url'
Expand All @@ -94,6 +95,9 @@
SPARK_STANDALONE_MODE = 'spark_standalone_mode'
SPARK_MESOS_MODE = 'spark_mesos_mode'

# option enabling compatibility mode for Spark ver < 2
SPARK_PRE_20_MODE = 'spark_pre_20_mode'

# Service Checks
SPARK_STANDALONE_SERVICE_CHECK = 'spark.standalone_master.can_connect'
YARN_SERVICE_CHECK = 'spark.resource_manager.can_connect'
Expand Down Expand Up @@ -263,7 +267,9 @@ def _get_running_apps(self, instance, tags):
cluster_mode = SPARK_YARN_MODE

if cluster_mode == SPARK_STANDALONE_MODE:
return self._standalone_init(master_address)
# check for PRE-20
pre20 = _is_affirmative(instance.get(SPARK_PRE_20_MODE, False))
return self._standalone_init(master_address, pre20)

elif cluster_mode == SPARK_MESOS_MODE:
running_apps = self._mesos_init(master_address)
Expand All @@ -278,7 +284,7 @@ def _get_running_apps(self, instance, tags):
raise Exception('Invalid setting for %s. Received %s.' % (SPARK_CLUSTER_MODE,
cluster_mode))

def _standalone_init(self, spark_master_address):
def _standalone_init(self, spark_master_address, pre_20_mode):
'''
Return a dictionary of {app_id: (app_name, tracking_url)} for the running Spark applications
'''
Expand All @@ -294,17 +300,32 @@ def _standalone_init(self, spark_master_address):
app_name = app.get('name')

# Parse through the HTML to grab the application driver's link
app_url = self._get_standalone_app_url(app_id, spark_master_address)

if app_id and app_name and app_url:
running_apps[app_id] = (app_name, app_url)
try:
app_url = self._get_standalone_app_url(app_id, spark_master_address)

if app_id and app_name and app_url:
if pre_20_mode:
self.log.debug('Getting application list in pre-20 mode')
applist = self._rest_request_to_json(app_url,
SPARK_APPS_PATH,
SPARK_STANDALONE_SERVICE_CHECK)
for appl in applist:
aid = appl.get('id')
aname = appl.get('name')
running_apps[aid] = (aname, app_url)
else:
running_apps[app_id] = (app_name, app_url)
except:
# it's possible for the requests to fail if the job
# completed since we got the list of apps. Just continue
pass

# Report success after gathering metrics from Spark master
self.service_check(SPARK_STANDALONE_SERVICE_CHECK,
AgentCheck.OK,
tags=['url:%s' % spark_master_address],
message='Connection to Spark master "%s" was successful' % spark_master_address)

self.log.info("Returning running apps %s" % running_apps)
return running_apps

def _mesos_init(self, master_address):
Expand Down Expand Up @@ -624,6 +645,7 @@ def _rest_request(self, address, object_path, service_name, *args, **kwargs):
url = urljoin(url, '?' + query)

try:
self.log.debug('Spark check URL: %s' % url)
response = requests.get(url)
response.raise_for_status()

Expand Down
4 changes: 4 additions & 0 deletions conf.d/spark.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ instances:
# spark_cluster_mode: spark_standalone_mode
# spark_cluster_mode: spark_mesos_mode

# To use an older (versions prior to 2.0) Standalone Spark cluster,
# the 'spark_pre_20_mode' must be set
# spark_pre_20_mode: true

# A Required friendly name for the cluster.
# cluster_name: MySparkCluster

Expand Down
14 changes: 14 additions & 0 deletions tests/checks/fixtures/spark/spark_apps_pre20
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"id": "PySparkShell",
"name": "PySparkShell",
"attempts": [
{
"startTime": "2016-04-12T12:48:17.576GMT",
"endTime": "1969-12-31T23:59:59.999GMT",
"sparkUser": "",
"completed": false
}
]
}
]
137 changes: 137 additions & 0 deletions tests/checks/mock/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ def join_url_dir(url, *args):
STANDALONE_SPARK_EXECUTOR_URL = join_url_dir(SPARK_APP_URL, SPARK_REST_PATH, SPARK_APP_ID, 'executors')
STANDALONE_SPARK_RDD_URL = join_url_dir(SPARK_APP_URL, SPARK_REST_PATH, SPARK_APP_ID, 'storage/rdd')

STANDALONE_SPARK_JOB_URL_PRE20 = join_url_dir(SPARK_APP_URL, SPARK_REST_PATH, APP_NAME, 'jobs')
STANDALONE_SPARK_STAGE_URL_PRE20 = join_url_dir(SPARK_APP_URL, SPARK_REST_PATH, APP_NAME, 'stages')
STANDALONE_SPARK_EXECUTOR_URL_PRE20 = join_url_dir(SPARK_APP_URL, SPARK_REST_PATH, APP_NAME, 'executors')
STANDALONE_SPARK_RDD_URL_PRE20 = join_url_dir(SPARK_APP_URL, SPARK_REST_PATH, APP_NAME, 'storage/rdd')



def yarn_requests_get_mock(*args, **kwargs):

Expand Down Expand Up @@ -206,6 +212,69 @@ def raise_for_status(self):
body = f.read()
return MockStandaloneResponse(body, 200)

def standalone_requests_pre20_get_mock(*args, **kwargs):

class MockStandaloneResponse:
text = ''

def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
self.text = json_data

def json(self):
return json.loads(self.json_data)

def raise_for_status(self):
return True

if args[0] == STANDALONE_APP_URL:
with open(Fixtures.file('spark_standalone_apps'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)

elif args[0] == STANDALONE_APP_HTML_URL:
with open(Fixtures.file('spark_standalone_app'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)

elif args[0] == STANDALONE_SPARK_APP_URL:
with open(Fixtures.file('spark_apps_pre20'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)

elif args[0] == STANDALONE_SPARK_JOB_URL:
return MockStandaloneResponse("{}", 404)

elif args[0] == STANDALONE_SPARK_STAGE_URL:
return MockStandaloneResponse("{}", 404)

elif args[0] == STANDALONE_SPARK_EXECUTOR_URL:
return MockStandaloneResponse("{}", 404)

elif args[0] == STANDALONE_SPARK_RDD_URL:
return MockStandaloneResponse("{}", 404)

elif args[0] == STANDALONE_SPARK_JOB_URL_PRE20:
with open(Fixtures.file('job_metrics'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)

elif args[0] == STANDALONE_SPARK_STAGE_URL_PRE20:
with open(Fixtures.file('stage_metrics'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)

elif args[0] == STANDALONE_SPARK_EXECUTOR_URL_PRE20:
with open(Fixtures.file('executor_metrics'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)

elif args[0] == STANDALONE_SPARK_RDD_URL_PRE20:
with open(Fixtures.file('rdd_metrics'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)

class SparkCheck(AgentCheckTest):
CHECK_NAME = 'spark'

Expand All @@ -226,6 +295,12 @@ class SparkCheck(AgentCheckTest):
'cluster_name': CLUSTER_NAME,
'spark_cluster_mode': 'spark_standalone_mode'
}
STANDALONE_CONFIG_PRE_20 = {
'spark_url': 'http://localhost:8080',
'cluster_name': CLUSTER_NAME,
'spark_cluster_mode': 'spark_standalone_mode',
'spark_pre_20_mode': 'true'
}

SPARK_JOB_RUNNING_METRIC_VALUES = {
'spark.job.count': 2,
Expand Down Expand Up @@ -539,3 +614,65 @@ def test_standalone(self, mock_requests):
tags=['url:http://localhost:8080'])
self.assertServiceCheckOK(SPARK_SERVICE_CHECK,
tags=['url:http://localhost:4040'])

@mock.patch('requests.get', side_effect=standalone_requests_pre20_get_mock)
def test_standalone_pre20(self, mock_requests):
config = {
'instances': [self.STANDALONE_CONFIG_PRE_20],
}

self.run_check(config)

# Check the running job metrics
for metric, value in self.SPARK_JOB_RUNNING_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_JOB_RUNNING_METRIC_TAGS)

# Check the running job metrics
for metric, value in self.SPARK_JOB_RUNNING_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_JOB_RUNNING_METRIC_TAGS)

# Check the succeeded job metrics
for metric, value in self.SPARK_JOB_SUCCEEDED_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_JOB_SUCCEEDED_METRIC_TAGS)

# Check the running stage metrics
for metric, value in self.SPARK_STAGE_RUNNING_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_STAGE_RUNNING_METRIC_TAGS)

# Check the complete stage metrics
for metric, value in self.SPARK_STAGE_COMPLETE_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_STAGE_COMPLETE_METRIC_TAGS)

# Check the driver metrics
for metric, value in self.SPARK_DRIVER_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_METRIC_TAGS)

# Check the executor metrics
for metric, value in self.SPARK_EXECUTOR_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_METRIC_TAGS)

# Check the RDD metrics
for metric, value in self.SPARK_RDD_METRIC_VALUES.iteritems():
self.assertMetric(metric,
value=value,
tags=self.SPARK_METRIC_TAGS)

# Check the service tests
self.assertServiceCheckOK(STANDALONE_SERVICE_CHECK,
tags=['url:http://localhost:8080'])
self.assertServiceCheckOK(SPARK_SERVICE_CHECK,
tags=['url:http://localhost:4040'])

0 comments on commit 222a870

Please sign in to comment.