-
Notifications
You must be signed in to change notification settings - Fork 814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Fix check to enumerate job IDs #3111
Conversation
I have tested on Spark 2.0. This change is not necessary for Spark 2.0, but it does still work (just one additional http request to the Spark API). So, we could |
8023121
to
fc05c1d
Compare
Added the conditional, so that the new code is only used when configured (on older versions of Spark). Did the following tests
|
@@ -198,6 +201,7 @@ def __init__(self, name, init_config, agentConfig, instances=None): | |||
AgentCheck.__init__(self, name, init_config, agentConfig, instances) | |||
self.previous_jobs = {} | |||
self.previous_stages = {} | |||
self._spark_pre_20_mode = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this as an attribute, it will change from instance to instance and it's not too expensive to look up in the instance dict (something we'd have to do anyway if you made the self._spark_pre_20_mode
into a dict). We can probably just lookup the value in check()
for the relevant code paths.
@@ -263,6 +267,11 @@ def _get_running_apps(self, instance, tags): | |||
cluster_mode = SPARK_YARN_MODE | |||
|
|||
if cluster_mode == SPARK_STANDALONE_MODE: | |||
# check for PRE-20 | |||
pre20 = instance.get(SPARK_PRE_20_MODE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd use _is_affirmative()
here (you'll have to import it)...
pre20 = _is_affirmative(instance.get(SPARK_PRE_MODE, False))
By doing this we achieve two things, if the key doesn't exist (ie. it hasn't been set in the YAML) we'll default to false, and we can also handle stuff like "yes", "no", etc if it is set. You won't need to do the validation like you do in the next couple of lines.
pre20 = instance.get(SPARK_PRE_20_MODE) | ||
if pre20 is not None: | ||
if pre20 is True: | ||
self._spark_pre_20_mode = True | ||
return self._standalone_init(master_address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can maybe pass the pre20
argument to _standalone_init()
as an optional argument. Prototype would change to:
def _standalone_init(self, spark_master_address, pre20=False):
@@ -624,6 +648,7 @@ def _rest_request(self, address, object_path, service_name, *args, **kwargs): | |||
url = urljoin(url, '?' + query) | |||
|
|||
try: | |||
self.log.debug('Spark check URL: %s' % url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this line? I can see it being of value when debugging support cases, but it'll be quite verbose, no? Your call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be quite verbose, but that's why it's debug. It's the first thing we added (and then sent the customer a patch) when the card came in, so it seems worthwhile since the default will be not to log (info level)
Also, since you have the payload formats, etc... it could be worth it to add a test for this use-case here: https://github.com/DataDog/dd-agent/blob/master/tests/checks/mock/test_spark.py |
e2e86dc
to
30eb58d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good
Didn't realize the tests hadn't been added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a quick test to test the change before shipping. Thanks @derekwbrown
30eb58d
to
222a870
Compare
Add a conditional for the new code path (as it's for older versions of Spark, and causes an additional HTTP request that's not necessary on newer versions) Added review feedback Add unit test
222a870
to
8535953
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, was just wondering if we could maybe remove some of the duplicate code in the mocks? Up to you, I just think it might be easier to maintain moving forward.
@@ -206,6 +212,69 @@ def raise_for_status(self): | |||
body = f.read() | |||
return MockStandaloneResponse(body, 200) | |||
|
|||
def standalone_requests_pre20_get_mock(*args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we could maybe refactor some code out of standalone_requests_pre20_get_mock()
and standalone_requests_get_mock()
so we'd have less duplicated code?
maybe something like:
class MockStandaloneResponse:
text = ''
def __init__(self, json_data, status_code):
self.json_data = json_data
self.status_code = status_code
self.text = json_data
def json(self):
return json.loads(self.json_data)
def raise_for_status(self):
return True
def standalone_request_get_mock(*args, **kwargs):
if args[0] == STANDALONE_APP_URL:
with open(Fixtures.file('spark_standalone_apps'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)
# REMAINING logic
def standalone_request_pre20_get_mock(*args, **kwargs):
if args[0] == STANDALONE_SPARK_APP_URL:
with open(Fixtures.file('spark_apps_pre20'), 'r') as f:
body = f.read()
return MockStandaloneResponse(body, 200)
else:
return standalone_request_get_mock(args, kwargs)
What does this PR do?
Spark integration, at least against Spark 1.5, was using the wrong value of 'id'. It appears there are two values, the generic job id name from 8080/json:
"activeapps" : [ {
"starttime" : 1483727736675,
"id" : "app-20170106103536-0024",
"name" : "PythonPi",
"user" : "db",
"memoryperslave" : 1024,
"submitdate" : "Fri Jan 06 10:35:36 PST 2017",
"state" : "RUNNING",
"duration" : 5357
} ],
However, when you chase down individual job URL, in this case 4040/api/v1/applications/, you get
[ {
"id" : "PythonPi",
"name" : "PythonPi",
"attempts" : [ {
"startTime" : "2017-01-06T18:35:34.758GMT",
"endTime" : "1969-12-31T23:59:59.999GMT",
"sparkUser" : "",
"completed" : false
} ]
} ]
When creating the URLs to retrieve the individual job statistics, the api appears to want the id from the 2nd, not the first. The code was using the id from the first, and failing.
Motivation
Customer report.
Testing Guidelines
Additional Notes