From 7c7b6a2e5675af681ce47fafb803f42d91bed55f Mon Sep 17 00:00:00 2001 From: Ofek Lev Date: Thu, 11 Jul 2019 12:07:01 -0400 Subject: [PATCH] Use the new RequestsWrapper for connecting to services (#4094) * Use the new RequestsWrapper for connecting to services * Apply suggestions from code review Co-Authored-By: Pierre Guceski * indent --- .../mapreduce/data/conf.yaml.example | 261 ++++++++++++++---- .../datadog_checks/mapreduce/mapreduce.py | 56 ++-- mapreduce/requirements.in | 2 - mapreduce/tests/conftest.py | 2 +- mapreduce/tests/test_integration.py | 1 + mapreduce/tests/test_mapreduce.py | 10 +- 6 files changed, 239 insertions(+), 93 deletions(-) diff --git a/mapreduce/datadog_checks/mapreduce/data/conf.yaml.example b/mapreduce/datadog_checks/mapreduce/data/conf.yaml.example index 88e6df5acfbe8..1ab044ad59cb6 100644 --- a/mapreduce/datadog_checks/mapreduce/data/conf.yaml.example +++ b/mapreduce/datadog_checks/mapreduce/data/conf.yaml.example @@ -1,3 +1,76 @@ +init_config: + + ## @param general_counters - list of objects - optional + ## + ## `general_counters` are job agnostic metrics that create a metric for each specified counter + ## Create a an object with the following layout: + ## + ## - counter_group_name: '' + ## counters: + ## - counter_name: 'MAP_INPUT_RECORDS' + ## - counter_name: 'MAP_OUTPUT_RECORDS' + ## - counter_name: 'REDUCE_INPUT_RECORDS' + ## - counter_name: 'REDUCE_OUTPUT_RECORDS' + ## + ## For more information on counters visit the MapReduce documentation page: + ## https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html#Job_Counters_API #noqa + # + # general_counters: + # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' + # counters: + # - counter_name: 'HDFS_BYTES_READ' + + ## @param job_specific_counters - list of objects - optional + ## `job_specific_counters` are metrics that are specific to a particular job. + ## Create an object with the following layout: + ## + ## - job_name: + ## metrics: + ## - counter_group_name: + ## counters: + ## - counter_name: + ## + ## For more information on counters visit the MapReduce documentation page: + ## https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html#Job_Counters_API #noqa + # + # job_specific_counters: + # - job_name: '' + # metrics: + # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' + # counters: + # - counter_name: 'FILE_BYTES_WRITTEN' + # - counter_name: 'HDFS_BYTES_WRITTEN' + # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' + # counters: + # - counter_name: 'HDFS_BYTES_READ' + + ## @param proxy - object - optional + ## Set HTTP or HTTPS proxies for all instances. Use the `no_proxy` list + ## to specify hosts that must bypass proxies. + ## + ## The SOCKS protocol is also supported like so: + ## + ## socks5://user:pass@host:port + ## + ## Using the scheme `socks5` causes the DNS resolution to happen on the + ## client, rather than on the proxy server. This is in line with `curl`, + ## which uses the scheme to decide whether to do the DNS resolution on + ## the client or proxy. If you want to resolve the domains on the proxy + ## server, use `socks5h` as the scheme. + # + # proxy: + # http: http://: + # https: https://: + # no_proxy: + # - + # - + + ## @param skip_proxy - boolean - optional - default: false + ## If set to `true`, this makes the check bypass any proxy + ## settings enabled and attempt to reach services directly. + # + # skip_proxy: false + instances: ## @param resourcemanager_uri - string - required @@ -33,63 +106,151 @@ instances: # - : # - : + ## @param proxy - object - optional + ## This overrides the `proxy` setting in `init_config`. + ## + ## Set HTTP or HTTPS proxies. Use the `no_proxy` list + ## to specify hosts that must bypass proxies. + ## + ## The SOCKS protocol is also supported like so: + ## + ## socks5://user:pass@host:port + ## + ## Using the scheme `socks5` causes the DNS resolution to happen on the + ## client, rather than on the proxy server. This is in line with `curl`, + ## which uses the scheme to decide whether to do the DNS resolution on + ## the client or proxy. If you want to resolve the domains on the proxy + ## server, use `socks5h` as the scheme. + # + # proxy: + # http: http://: + # https: https://: + # no_proxy: + # - + # - + + ## @param skip_proxy - boolean - optional - default: false + ## This overrides the `skip_proxy` setting in `init_config`. + ## + ## If set to `true`, this makes the check bypass any proxy + ## settings enabled and attempt to reach services directly. + # + # skip_proxy: false + ## @param username - string - optional - ## If your service uses basic HTTP authentication, set here the username required. + ## The username to use if services are behind basic auth. + # + # username: + + ## @param ntlm_domain - string - optional + ## If your services uses NTLM authentication, you can + ## specify a domain that is used in the check. For NTLM Auth, + ## append the username to domain, not as the `username` parameter. + ## Example: / # - # user: + # ntlm_domain: ## @param password - string - optional - ## If your service uses basic HTTP authentication, set here the password required. + ## The password to use if services are behind basic or NTLM auth. # # password: - ## @param ssl_verify - boolean - optional - default: true - ## Instruct the check to validate SSL certificates when connecting to "resourcemanager_uri" + ## @param kerberos_auth - string - optional - default: disabled + ## If your service uses Kerberos authentication, you can specify the Kerberos + ## strategy to use between: + ## * required + ## * optional + ## * disabled + ## + ## See https://github.com/requests/requests-kerberos#mutual-authentication # - # ssl_verify: true + # kerberos_auth: disabled -init_config: + ## @param kerberos_delegate - boolean - optional - default: false + ## Set to `true` to enable kerberos delegation of credentials to a server that requests delegation. + ## See https://github.com/requests/requests-kerberos#delegation + # + # kerberos_delegate: false + + ## @param kerberos_force_initiate - boolean - optional - default: false + ## Set to `true` to preemptively initiate the Kerberos GSS exchange and present a Kerberos ticket on the initial + ## request (and all subsequent). + ## See https://github.com/requests/requests-kerberos#preemptive-authentication + # + # kerberos_force_initiate: false + + ## @param kerberos_hostname - string - optional + ## Override the hostname used for the Kerberos GSS exchange if its DNS name doesn't match its kerberos + ## hostname (eg, behind a content switch or load balancer). + ## See https://github.com/requests/requests-kerberos#hostname-override + # + # kerberos_hostname: null + + ## @param kerberos_principal - string - optional + ## Set an explicit principal, to force Kerberos to look for a matching credential cache for the named user. + ## See https://github.com/requests/requests-kerberos#explicit-principal + # + # kerberos_principal: null + + ## @param kerberos_keytab - string - optional + ## Set the path to your Kerberos key tab file. + # + # kerberos_keytab: + + ## @param tls_verify - boolean - optional - default: true + ## Instructs the check to validate the TLS certificate of services. + # + # tls_verify: true + + ## @param tls_ignore_warning - boolean - optional - default: false + ## If `tls_verify` is disabled, security warnings are logged by the check. + ## Disable those by setting `tls_ignore_warning` to true. + # + # tls_ignore_warning: false - ## @param general_counters - list of objects - optional - ## - ## general_counters are job agnostic metrics that create a metric for each specified counter - ## Create a an object with the following layout: - ## - ## - counter_group_name: '' - ## counters: - ## - counter_name: 'MAP_INPUT_RECORDS' - ## - counter_name: 'MAP_OUTPUT_RECORDS' - ## - counter_name: 'REDUCE_INPUT_RECORDS' - ## - counter_name: 'REDUCE_OUTPUT_RECORDS' - ## - ## For more information on counters visit the MapReduce documentation page: - ## https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html#Job_Counters_API #noqa - # - # general_counters: - # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' - # counters: - # - counter_name: 'HDFS_BYTES_READ' - - ## @param job_specific_counters - list of objects - optional - ## job_specific_counters are metrics that are specific to a particular job. - ## Create an object with the following layout: - ## - ## - job_name: - ## metrics: - ## - counter_group_name: - ## counters: - ## - counter_name: - ## - ## For more information on counters visit the MapReduce documentation page: - ## https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredAppMasterRest.html#Job_Counters_API #noqa - # - # job_specific_counters: - # - job_name: '' - # metrics: - # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' - # counters: - # - counter_name: 'FILE_BYTES_WRITTEN' - # - counter_name: 'HDFS_BYTES_WRITTEN' - # - counter_group_name: 'org.apache.hadoop.mapreduce.FileSystemCounter' - # counters: - # - counter_name: 'HDFS_BYTES_READ' + ## @param tls_cert - string - optional + ## The path to a single file in PEM format containing a certificate as well as any + ## number of CA certificates needed to establish the certificate’s authenticity for + ## use when connecting to services. It may also contain an unencrypted private key to use. + # + # tls_cert: + + ## @param tls_private_key - string - optional + ## The unencrypted private key to use for `tls_cert` when connecting to services. This is + ## required if `tls_cert` is set and it does not already contain a private key. + # + # tls_private_key: + + ## @param tls_ca_cert - string - optional + ## The path to a file of concatenated CA certificates in PEM format or a directory + ## containing several CA certificates in PEM format. If a directory, the directory + ## must have been processed using the c_rehash utility supplied with OpenSSL. See: + ## https://www.openssl.org/docs/manmaster/man3/SSL_CTX_load_verify_locations.html + # + # tls_ca_cert: + + ## @param headers - list of key:value elements - optional + ## The headers parameter allows you to send specific headers with every request. + ## You can use it for explicitly specifying the host header or adding headers for + ## authorization purposes. + ## + ## This overrides any default headers. + # + # headers: + # Host: + # X-Auth-Token: + + ## @param timeout - integer - optional - default: 10 + ## The timeout for connecting to services. + # + # timeout: 10 + + ## @param log_requests - boolean - optional - default: false + ## Whether or not to debug log the HTTP(S) requests made, including the method and URL. + # + # log_requests: false + + ## @param persist_connections - boolean - optional - default: false + ## Whether or not to persist cookies and use connection pooling for increased performance. + # + # persist_connections: false diff --git a/mapreduce/datadog_checks/mapreduce/mapreduce.py b/mapreduce/datadog_checks/mapreduce/mapreduce.py index 6eb2e5cf49ea6..3962c9c5b2b29 100644 --- a/mapreduce/datadog_checks/mapreduce/mapreduce.py +++ b/mapreduce/datadog_checks/mapreduce/mapreduce.py @@ -41,17 +41,18 @@ """ -import requests from requests.exceptions import ConnectionError, HTTPError, InvalidURL, Timeout from simplejson import JSONDecodeError from six import iteritems, itervalues from six.moves.urllib.parse import urljoin, urlsplit, urlunsplit -from datadog_checks.checks import AgentCheck -from datadog_checks.config import _is_affirmative +from datadog_checks.base import AgentCheck, is_affirmative class MapReduceCheck(AgentCheck): + + HTTP_CONFIG_REMAPPER = {'ssl_verify': {'name': 'tls_verify'}} + # Default Settings DEFAULT_CLUSTER_NAME = 'default_cluster' @@ -104,8 +105,8 @@ class MapReduceCheck(AgentCheck): MAPREDUCE_REDUCE_TASK_METRICS = {'elapsedTime': ('mapreduce.job.reduce.task.elapsed_time', HISTOGRAM)} - def __init__(self, name, init_config, agentConfig, instances=None): - AgentCheck.__init__(self, name, init_config, agentConfig, instances) + def __init__(self, name, init_config, instances): + super(MapReduceCheck, self).__init__(name, init_config, instances) # Parse job specific counters self.general_counters = self._parse_general_counters(init_config) @@ -119,16 +120,7 @@ def check(self, instance): if rm_address is None: raise Exception("The ResourceManager URL must be specified in the instance configuration") - collect_task_metrics = _is_affirmative(instance.get('collect_task_metrics', False)) - - # Authenticate our connection to endpoint if required - username = instance.get('username') - password = instance.get('password') - auth = None - if username is not None and password is not None: - auth = (username, password) - - ssl_verify = _is_affirmative(instance.get('ssl_verify', True)) + collect_task_metrics = is_affirmative(instance.get('collect_task_metrics', False)) # Get additional tags from the conf file custom_tags = instance.get("tags", []) @@ -146,7 +138,7 @@ def check(self, instance): tags.append('cluster_name:{}'.format(cluster_name)) # Get the running MR applications from YARN - running_apps = self._get_running_app_ids(rm_address, auth, ssl_verify) + running_apps = self._get_running_app_ids(rm_address) # Report success after gathering all metrics from ResourceManaager self.service_check( @@ -157,14 +149,14 @@ def check(self, instance): ) # Get the applications from the application master - running_jobs = self._mapreduce_job_metrics(running_apps, auth, ssl_verify, tags) + running_jobs = self._mapreduce_job_metrics(running_apps, tags) # # Get job counter metrics - self._mapreduce_job_counters_metrics(running_jobs, auth, ssl_verify, tags) + self._mapreduce_job_counters_metrics(running_jobs, tags) # Get task metrics if collect_task_metrics: - self._mapreduce_task_metrics(running_jobs, auth, ssl_verify, tags) + self._mapreduce_task_metrics(running_jobs, tags) # Report success after gathering all metrics from Application Master if running_jobs: @@ -272,14 +264,12 @@ def _parse_job_specific_counters(self, init_config): return job_counter - def _get_running_app_ids(self, rm_address, auth, ssl_verify): + def _get_running_app_ids(self, rm_address): """ Return a dictionary of {app_id: (app_name, tracking_url)} for the running MapReduce applications """ metrics_json = self._rest_request_to_json( rm_address, - auth, - ssl_verify, self.YARN_APPS_PATH, self.YARN_SERVICE_CHECK, states=self.YARN_APPLICATION_STATES, @@ -301,7 +291,7 @@ def _get_running_app_ids(self, rm_address, auth, ssl_verify): return running_apps - def _mapreduce_job_metrics(self, running_apps, auth, ssl_verify, addl_tags): + def _mapreduce_job_metrics(self, running_apps, addl_tags): """ Get metrics for each MapReduce job. Return a dictionary for each MapReduce job @@ -318,7 +308,7 @@ def _mapreduce_job_metrics(self, running_apps, auth, ssl_verify, addl_tags): for app_name, tracking_url in itervalues(running_apps): metrics_json = self._rest_request_to_json( - tracking_url, auth, ssl_verify, self.MAPREDUCE_JOBS_PATH, self.MAPREDUCE_SERVICE_CHECK + tracking_url, self.MAPREDUCE_JOBS_PATH, self.MAPREDUCE_SERVICE_CHECK ) if metrics_json.get('jobs'): @@ -351,7 +341,7 @@ def _mapreduce_job_metrics(self, running_apps, auth, ssl_verify, addl_tags): return running_jobs - def _mapreduce_job_counters_metrics(self, running_jobs, auth, ssl_verify, addl_tags): + def _mapreduce_job_counters_metrics(self, running_jobs, addl_tags): """ Get custom metrics specified for each counter """ @@ -363,12 +353,7 @@ def _mapreduce_job_counters_metrics(self, running_jobs, auth, ssl_verify, addl_t job_specific_metrics = self.job_specific_counters.get(job_name) metrics_json = self._rest_request_to_json( - job_metrics['tracking_url'], - auth, - ssl_verify, - 'counters', - self.MAPREDUCE_SERVICE_CHECK, - tags=addl_tags, + job_metrics['tracking_url'], 'counters', self.MAPREDUCE_SERVICE_CHECK, tags=addl_tags ) if metrics_json.get('jobCounters'): @@ -410,7 +395,7 @@ def _mapreduce_job_counters_metrics(self, running_jobs, auth, ssl_verify, addl_t counter, self.MAPREDUCE_JOB_COUNTER_METRICS, tags ) - def _mapreduce_task_metrics(self, running_jobs, auth, ssl_verify, addl_tags): + def _mapreduce_task_metrics(self, running_jobs, addl_tags): """ Get metrics for each MapReduce task Return a dictionary of {task_id: 'tracking_url'} for each MapReduce task @@ -418,7 +403,7 @@ def _mapreduce_task_metrics(self, running_jobs, auth, ssl_verify, addl_tags): for job_stats in itervalues(running_jobs): metrics_json = self._rest_request_to_json( - job_stats['tracking_url'], auth, ssl_verify, 'tasks', self.MAPREDUCE_SERVICE_CHECK, tags=addl_tags + job_stats['tracking_url'], 'tasks', self.MAPREDUCE_SERVICE_CHECK, tags=addl_tags ) if metrics_json.get('tasks'): @@ -464,11 +449,10 @@ def _set_metric(self, metric_name, metric_type, value, tags=None, device_name=No else: self.log.error('Metric type "{}" unknown'.format(metric_type)) - def _rest_request_to_json(self, address, auth, ssl_verify, object_path, service_name, tags=None, *args, **kwargs): + def _rest_request_to_json(self, address, object_path, service_name, tags=None, *args, **kwargs): """ Query the given URL and return the JSON response """ - response_json = None tags = [] if tags is None else tags service_check_tags = ['url:{}'.format(self._get_url_base(address))] + tags @@ -491,7 +475,7 @@ def _rest_request_to_json(self, address, auth, ssl_verify, object_path, service_ url = urljoin(url, '?' + query) try: - response = requests.get(url, auth=auth, verify=ssl_verify, timeout=self.default_integration_http_timeout) + response = self.http.get(url) response.raise_for_status() response_json = response.json() diff --git a/mapreduce/requirements.in b/mapreduce/requirements.in index 139597f9cb07c..e69de29bb2d1d 100644 --- a/mapreduce/requirements.in +++ b/mapreduce/requirements.in @@ -1,2 +0,0 @@ - - diff --git a/mapreduce/tests/conftest.py b/mapreduce/tests/conftest.py index 80721f210939d..16fcb474f0d61 100644 --- a/mapreduce/tests/conftest.py +++ b/mapreduce/tests/conftest.py @@ -41,7 +41,7 @@ def dd_environment(): @pytest.fixture def check(): - return MapReduceCheck('mapreduce', {}, {}) + return lambda instance: MapReduceCheck('mapreduce', {}, [instance]) @pytest.fixture diff --git a/mapreduce/tests/test_integration.py b/mapreduce/tests/test_integration.py index d4916170a83a8..c03e63f9469fe 100644 --- a/mapreduce/tests/test_integration.py +++ b/mapreduce/tests/test_integration.py @@ -10,6 +10,7 @@ @pytest.mark.usefixtures("dd_environment") def test_check(aggregator, check, instance): + check = check(instance) check.check(instance) for metric in common.EXPECTED_METRICS: diff --git a/mapreduce/tests/test_mapreduce.py b/mapreduce/tests/test_mapreduce.py index 15c7fdb4a214d..cd02b538a3708 100644 --- a/mapreduce/tests/test_mapreduce.py +++ b/mapreduce/tests/test_mapreduce.py @@ -29,12 +29,13 @@ def test_check(aggregator, mocked_request): """ Test that we get all the metrics we're supposed to get """ + instance = MR_CONFIG['instances'][0] # Instantiate the check - mapreduce = MapReduceCheck("mapreduce", INIT_CONFIG, {}) + mapreduce = MapReduceCheck('mapreduce', INIT_CONFIG, [instance]) # Run the check once - mapreduce.check(MR_CONFIG["instances"][0]) + mapreduce.check(instance) # Check the MapReduce job metrics for metric, value in iteritems(MAPREDUCE_JOB_METRIC_VALUES): @@ -91,12 +92,13 @@ def test_auth(aggregator, mocked_auth_request): """ Test that we get all the metrics we're supposed to get """ + instance = MR_AUTH_CONFIG['instances'][0] # Instantiate the check - mapreduce = MapReduceCheck("mapreduce", INIT_CONFIG, {}) + mapreduce = MapReduceCheck('mapreduce', INIT_CONFIG, [instance]) # Run the check once - mapreduce.check(MR_AUTH_CONFIG["instances"][0]) + mapreduce.check(instance) # Check the service tests service_check_tags = ["url:{}".format(RM_URI)] + CUSTOM_TAGS