Skip to content

Commit

Permalink
[Spark] Fix check to enumerate job IDs
Browse files Browse the repository at this point in the history
Add a conditional for the new code path (as it's for older versions of
Spark, and causes an additional HTTP request that's not necessary on
newer versions)

Added review feedback
  • Loading branch information
derekwbrown committed Jan 19, 2017
1 parent 92747a4 commit 30eb58d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
36 changes: 29 additions & 7 deletions checks.d/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@

# Project
from checks import AgentCheck
from config import _is_affirmative

# Identifier for cluster master address in `spark.yaml`
MASTER_ADDRESS = 'spark_url'
Expand All @@ -94,6 +95,9 @@
SPARK_STANDALONE_MODE = 'spark_standalone_mode'
SPARK_MESOS_MODE = 'spark_mesos_mode'

# option enabling compatibility mode for Spark ver < 2
SPARK_PRE_20_MODE = 'spark_pre_20_mode'

# Service Checks
SPARK_STANDALONE_SERVICE_CHECK = 'spark.standalone_master.can_connect'
YARN_SERVICE_CHECK = 'spark.resource_manager.can_connect'
Expand Down Expand Up @@ -263,7 +267,9 @@ def _get_running_apps(self, instance, tags):
cluster_mode = SPARK_YARN_MODE

if cluster_mode == SPARK_STANDALONE_MODE:
return self._standalone_init(master_address)
# check for PRE-20
pre20 = _is_affirmative(instance.get(SPARK_PRE_20_MODE, False))
return self._standalone_init(master_address, pre20)

elif cluster_mode == SPARK_MESOS_MODE:
running_apps = self._mesos_init(master_address)
Expand All @@ -278,7 +284,7 @@ def _get_running_apps(self, instance, tags):
raise Exception('Invalid setting for %s. Received %s.' % (SPARK_CLUSTER_MODE,
cluster_mode))

def _standalone_init(self, spark_master_address):
def _standalone_init(self, spark_master_address, pre_20_mode):
'''
Return a dictionary of {app_id: (app_name, tracking_url)} for the running Spark applications
'''
Expand All @@ -294,17 +300,32 @@ def _standalone_init(self, spark_master_address):
app_name = app.get('name')

# Parse through the HTML to grab the application driver's link
app_url = self._get_standalone_app_url(app_id, spark_master_address)

if app_id and app_name and app_url:
running_apps[app_id] = (app_name, app_url)
try:
app_url = self._get_standalone_app_url(app_id, spark_master_address)

if app_id and app_name and app_url:
if pre_20_mode:
self.log.debug('Getting application list in pre-20 mode')
applist = self._rest_request_to_json(app_url,
SPARK_APPS_PATH,
SPARK_STANDALONE_SERVICE_CHECK)
for appl in applist:
aid = appl.get('id')
aname = appl.get('name')
running_apps[aid] = (aname, app_url)
else:
running_apps[app_id] = (app_name, app_url)
except:
# it's possible for the requests to fail if the job
# completed since we got the list of apps. Just continue
pass

# Report success after gathering metrics from Spark master
self.service_check(SPARK_STANDALONE_SERVICE_CHECK,
AgentCheck.OK,
tags=['url:%s' % spark_master_address],
message='Connection to Spark master "%s" was successful' % spark_master_address)

self.log.info("Returning running apps %s" % running_apps)
return running_apps

def _mesos_init(self, master_address):
Expand Down Expand Up @@ -624,6 +645,7 @@ def _rest_request(self, address, object_path, service_name, *args, **kwargs):
url = urljoin(url, '?' + query)

try:
self.log.debug('Spark check URL: %s' % url)
response = requests.get(url)
response.raise_for_status()

Expand Down
4 changes: 4 additions & 0 deletions conf.d/spark.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ instances:
# spark_cluster_mode: spark_standalone_mode
# spark_cluster_mode: spark_mesos_mode

# To use an older (versions prior to 2.0) Standalone Spark cluster,
# the 'spark_pre_20_mode' must be set
# spark_pre_20_mode: true

# A Required friendly name for the cluster.
# cluster_name: MySparkCluster

Expand Down

0 comments on commit 30eb58d

Please sign in to comment.