Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Submitting metrics to the api endpoint #3180

Merged
merged 8 commits into from
Feb 28, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 51 additions & 16 deletions emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,10 @@ def sanitize_payload(item, log, sanitize_func):

return item

def http_emitter(message, log, agentConfig, endpoint):
"Send payload"
url = agentConfig['dd_url']
def post_payload(url, message, agentConfig, log):

log.debug('http_emitter: attempting postback to ' + url)

# Post back the data
try:
try:
payload = json.dumps(message)
Expand Down Expand Up @@ -107,14 +104,8 @@ def http_emitter(message, log, agentConfig, endpoint):
log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f"
% (len(payload), len(zipped), float(len(payload))/float(len(zipped))))

apiKey = message.get('apiKey', None)
if not apiKey:
raise Exception("The http emitter requires an api key")

url = "{0}/intake/{1}?api_key={2}".format(url, endpoint, apiKey)

try:
headers = post_headers(agentConfig, zipped)
headers = get_post_headers(agentConfig, zipped)
r = requests.post(url, data=zipped, timeout=5, headers=headers)

r.raise_for_status()
Expand All @@ -124,13 +115,57 @@ def http_emitter(message, log, agentConfig, endpoint):

except Exception:
log.exception("Unable to post payload.")
try:
log.error("Received status code: {0}".format(r.status_code))
except Exception:
pass


def post_headers(agentConfig, payload):
def split_payload(legacy_payload):
metrics = list(legacy_payload['metrics'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: the method performs reads only on metrics, thus, it is not required to make a copy of it.

del legacy_payload['metrics']

metrics_payload = {"series": []}

# See https://github.com/DataDog/dd-agent/blob/5.11.1/checks/__init__.py#L905-L926 for format
for ts in metrics:
sample = {
"metric": ts[0],
"points": [[ts[1], ts[2]]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be a tuple instead of a list

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed.

}

if len(ts) >= 4:
if ts[3].get('type'):
sample['type'] = ts[3]['type']
if ts[3].get('hostname'):
sample['host'] = ts[3]['hostname']
if ts[3].get('tags'):
sample['tags'] = ts[3]['tags']
if ts[3].get('device_name'):
sample['device'] = ts[3]['device_name']
Copy link
Member

@yannmh yannmh Feb 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could change those keys directly in the MetricAggregator formatter to speed things up.

sample.update(ts[3])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should device_name be changed to device? Dogstatsd uses device_name in its API payload (https://github.com/DataDog/dd-agent/blob/5.11.3/aggregator.py#L991)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yann i didn't want to change the formatter as i thought it was used by the legacy checks. Turns out it's not the case. I'll update this.

@olivielpeau what do you mean ? legacy payload processing will use device_name, new payload expects device hence my change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually i want to keep this logic kinda isolated as it's a hack. Better to keep it in a single place to keep the code cleaner. Agent 6 will get rid of all of this anyway.


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not handling any exception in this block.
Considering that metrics are strongly formatted, I believe it is "safe" to accept it and avoid a try ... except ... continue overhead.

@olivielpeau any thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not handling exceptions here looks safe to me, the format is well-defined and the code is safe

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this code is safe.

metrics_payload["series"].append(sample)

return legacy_payload, metrics_payload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nitpick: legacy_payloads refers to both the original input, and the modified/stripped output. Shall we use two different names?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't matter and it's still a legacy payload


def http_emitter(message, log, agentConfig, endpoint):
api_key = message.get('apiKey')

if not api_key:
raise Exception("The http emitter requires an api key")

# For perf reason. We now want to send the metrics to the api endpoint. So we are extracting them
# from the payload here, transform them into the expected format and send them (via the forwarder)

legacy_url = "{0}/intake/{1}?api_key={2}".format(agentConfig['dd_url'], endpoint, api_key)
metrics_endpoint = "{0}/api/v1/series?api_key={1}".format(agentConfig['dd_url'], api_key)

legacy_payload, metrics_payload = split_payload(message)

# Post legacy payload
post_payload(legacy_url, legacy_payload, agentConfig, log)

# Post metrics payload
post_payload(metrics_endpoint, metrics_payload, agentConfig, log)


def get_post_headers(agentConfig, payload):
return {
'User-Agent': 'Datadog Agent/%s' % agentConfig['version'],
'Content-Type': 'application/json',
Expand Down
1 change: 1 addition & 0 deletions tests/core/fixtures/payloads/legacy_payload.json

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions tests/core/test_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,48 @@
# 3p
from mock import Mock
import unittest
import simplejson as json

# project
from emitter import (
remove_control_chars,
remove_undecodable_chars,
sanitize_payload,
split_payload
)

import os

FIXTURE_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'fixtures', 'payloads')

class TestEmitter(unittest.TestCase):


def test_payload_splitter(self):
with open(FIXTURE_PATH + '/legacy_payload.json') as f:
legacy_payload = json.load(f)

legacy_payload_split, metrics_payload = split_payload(legacy_payload)
series = metrics_payload['series']
legacy_payload_split['metrics'] = []

for s in series:
attributes = {}

if s.get('type'):
attributes['type'] = s['type']
if s.get('host'):
attributes['hostname'] = s['host']
if s.get('tags'):
attributes['tags'] = s['tags']
if s.get('device'):
attributes['device_name'] = s['device']

formatted_sample = [s['metric'], s['points'][0][0], s['points'][0][1], attributes]
legacy_payload_split['metrics'].append(formatted_sample)

self.assertEqual(legacy_payload, legacy_payload_split)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will always be True, split_payload modifies the original payload by reference, in other words

# &legacy_payload_split == &legacy_payload
legacy_payload_split, metrics_payload = split_payload(legacy_payload)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, and so that things are clearer maybe split_payload should either return a legacy_payload_split and not mutate the legacy_payload passed as argument or not return a legacy_payload_split and mutate the legacy_payload (which probably makes more sense here)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch 🤦‍♂️

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll just pass a copy of the payload to the test for 2 reasons.

  1. Better to be explicit, if the function is mutating the dict, having it being returned is better than silently mutating it.
  2. We could make a copy of the dict in the split payload function but this would come with a memory cost.

So i'll just make a copy of the payload in the test function.
Good catch


def test_remove_control_chars(self):
messages = [
(u'#és9df\x7fELF\x02\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00>\x00\x01\x00\x00\x00\x06@\x00\x00\x00\x00\x00@\x00\x00\x00\x00\x00\x00´wer0sf®ré', u'#és9dfELF>@@´wer0sf®ré'),
Expand Down