From b43cae17ce3694305686d545650aab76cb21b5bf Mon Sep 17 00:00:00 2001 From: Olivier Vielpeau Date: Fri, 14 Jul 2017 16:06:03 -0400 Subject: [PATCH 1/2] [collector][emitter] Split metric payloads bigger than 2MB --- emitter.py | 115 ++++++++++++++++++++++++++++--------- tests/core/test_emitter.py | 44 ++++++++++++-- 2 files changed, 128 insertions(+), 31 deletions(-) diff --git a/emitter.py b/emitter.py index 7dc0d6c800..4cf19b505e 100644 --- a/emitter.py +++ b/emitter.py @@ -29,6 +29,11 @@ control_char_re = re.compile('[%s]' % re.escape(control_chars)) +# Only enforced for the metrics API on our end, for now +MAX_COMPRESSED_SIZE = 2 << 20 # 2MB, the backend should accept up to 3MB but let's be conservative here +MAX_SPLIT_DEPTH = 3 # maximum depth of recursive calls to payload splitting function + + def remove_control_chars(s, log): if isinstance(s, str): sanitized = control_char_re.sub('', s) @@ -72,49 +77,106 @@ def sanitize_payload(item, log, sanitize_func): return item -def post_payload(url, message, agentConfig, log): +def post_payload(url, message, serialize_func, agentConfig, log): log.debug('http_emitter: attempting postback to ' + url) try: - try: - payload = json.dumps(message) - except UnicodeDecodeError: - newmessage = sanitize_payload(message, log, remove_control_chars) - try: - payload = json.dumps(newmessage) - except UnicodeDecodeError: - log.info('Removing undecodable characters from payload') - newmessage = sanitize_payload(newmessage, log, remove_undecodable_chars) - payload = json.dumps(newmessage) + payloads = serialize_func(message, MAX_COMPRESSED_SIZE, 0, log) except UnicodeDecodeError as ude: - log.error('http_emitter: Unable to convert message to json %s', ude) + log.exception('http_emitter: Unable to convert message to json') # early return as we can't actually process the message return except RuntimeError as rte: - log.error('http_emitter: runtime error dumping message to json %s', rte) + log.exception('http_emitter: runtime error dumping message to json') # early return as we can't actually process the message return except Exception as e: - log.error('http_emitter: unknown exception processing message %s', e) + log.exception('http_emitter: unknown exception processing message') return - zipped = zlib.compress(payload) + for payload in payloads: + try: + headers = get_post_headers(agentConfig, payload) + r = requests.post(url, data=payload, timeout=5, headers=headers) - log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f" - % (len(payload), len(zipped), float(len(payload))/float(len(zipped)))) + r.raise_for_status() + + if r.status_code >= 200 and r.status_code < 205: + log.debug("Payload accepted") + except Exception: + log.exception("Unable to post payload.") + + +def serialize_payload(message, log): + payload = "" try: - headers = get_post_headers(agentConfig, zipped) - r = requests.post(url, data=zipped, timeout=5, headers=headers) + payload = json.dumps(message) + except UnicodeDecodeError: + newmessage = sanitize_payload(message, log, remove_control_chars) + try: + payload = json.dumps(newmessage) + except UnicodeDecodeError: + log.info('Removing undecodable characters from payload') + newmessage = sanitize_payload(newmessage, log, remove_undecodable_chars) + payload = json.dumps(newmessage) - r.raise_for_status() + return payload + + +def serialize_and_compress_legacy_payload(legacy_payload, max_compressed_size, depth, log): + """ + Serialize and compress the legacy payload + """ + serialized_payload = serialize_payload(legacy_payload, log) + zipped = zlib.compress(serialized_payload) + log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f" + % (len(serialized_payload), len(zipped), float(len(serialized_payload))/float(len(zipped)))) - if r.status_code >= 200 and r.status_code < 205: - log.debug("Payload accepted") - except Exception: - log.exception("Unable to post payload.") + compressed_payloads = [zipped] + + if len(zipped) > max_compressed_size: + # let's just log a warning for now, splitting the legacy payload is tricky + log.warning("collector payload is above the limit of %dKB compressed", max_compressed_size/(1<<10)) + + return compressed_payloads + + +def serialize_and_compress_metrics_payload(metrics_payload, max_compressed_size, depth, log): + """ + Serialize and compress the metrics payload + If the compressed payload is too big, we attempt to split it into smaller payloads + """ + compressed_payloads = [] + + serialized_payload = serialize_payload(metrics_payload, log) + zipped = zlib.compress(serialized_payload) + compression_ratio = float(len(serialized_payload))/float(len(zipped)) + log.debug("payload_size=%d, compressed_size=%d, compression_ratio=%.3f" + % (len(serialized_payload), len(zipped), compression_ratio)) + + if len(zipped) < max_compressed_size: + compressed_payloads.append(zipped) + else: + series = metrics_payload["series"] + + if depth > MAX_SPLIT_DEPTH: + log.error("Maximum depth of payload splitting reached, dropping the %d metrics in this chunk", len(series)) + return compressed_payloads + + nb_chunks = len(zipped)/max_compressed_size + 1 + int(compression_ratio/2) # try to account for the compression + log.debug("payload is too big (%d bytes), splitting it in %d chunks", len(zipped), nb_chunks) + + series_per_chunk = len(series)/nb_chunks + 1 + + for i in range(nb_chunks): + compressed_payloads.extend( + serialize_and_compress_metrics_payload({"series": series[i*series_per_chunk:(i+1)*series_per_chunk]}, max_compressed_size, depth+1, log) + ) + + return compressed_payloads def split_payload(legacy_payload): @@ -149,6 +211,7 @@ def split_payload(legacy_payload): return legacy_payload, metrics_payload + def http_emitter(message, log, agentConfig, endpoint): api_key = message.get('apiKey') @@ -164,10 +227,10 @@ def http_emitter(message, log, agentConfig, endpoint): legacy_payload, metrics_payload = split_payload(message) # Post legacy payload - post_payload(legacy_url, legacy_payload, agentConfig, log) + post_payload(legacy_url, legacy_payload, serialize_and_compress_legacy_payload, agentConfig, log) # Post metrics payload - post_payload(metrics_endpoint, metrics_payload, agentConfig, log) + post_payload(metrics_endpoint, metrics_payload, serialize_and_compress_metrics_payload, agentConfig, log) def get_post_headers(agentConfig, payload): diff --git a/tests/core/test_emitter.py b/tests/core/test_emitter.py index 09b9c4f147..5d07389736 100644 --- a/tests/core/test_emitter.py +++ b/tests/core/test_emitter.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # 3p -from mock import Mock +import mock import unittest import simplejson as json @@ -9,7 +9,9 @@ remove_control_chars, remove_undecodable_chars, sanitize_payload, - split_payload + serialize_and_compress_metrics_payload, + serialize_payload, + split_payload, ) import os @@ -51,7 +53,7 @@ def test_remove_control_chars(self): (u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪', u'_e{2,19}:t4|♬ †øU †øU ¥ºu T0µ ♪') ] - log = Mock() + log = mock.Mock() for bad, good in messages: self.assertTrue(remove_control_chars(bad, log) == good, (bad,good)) @@ -70,7 +72,7 @@ def test_remove_control_chars_from_payload(self): {"processes":[1234,[[u'db🖫', 0, 2.2,12,34,u'☢compiz☢',1]]]} ] - log = Mock() + log = mock.Mock() def is_converted_same(msg): new_msg = sanitize_payload(msg, log, remove_control_chars) @@ -92,6 +94,38 @@ def test_remove_undecodable_characters(self): ] for bad, good, log_called in messages: - log = Mock() + log = mock.Mock() self.assertEqual(good, remove_undecodable_chars(bad, log)) self.assertEqual(log_called, log.warning.called) + + # Make compression a no-op for the tests + @mock.patch('zlib.compress', side_effect=lambda x: x) + def test_metrics_payload_chunks(self, compress_mock): + log = mock.Mock() + nb_series = 10000 + max_compressed_size = 1 << 10 + + metrics_payload = {"series": [ + { + "metric": "%d" % i, # use an integer so that it's easy to find the metric afterwards + "points": [(i, i)], + "source_type_name": "System", + } for i in xrange(nb_series) + ]} + + compressed_payloads = serialize_and_compress_metrics_payload(metrics_payload, max_compressed_size, 0, log) + + # check that all the payloads are smaller than the max size + for compressed_payload in compressed_payloads: + self.assertLess(len(compressed_payload), max_compressed_size) + + # check that all the series are there (correct number + correct metric names) + series_after = [] + for compressed_payload in compressed_payloads: + series_after.extend(json.loads(compressed_payload)["series"]) + + self.assertEqual(nb_series, len(series_after)) + + metrics_sorted = sorted([int(metric["metric"]) for metric in series_after]) + for i, metric_name in enumerate(metrics_sorted): + self.assertEqual(i, metric_name) From b4bf33bb21247f335e1d9cc0edc96e5369292378 Mon Sep 17 00:00:00 2001 From: Olivier Vielpeau Date: Thu, 27 Jul 2017 20:46:19 -0400 Subject: [PATCH 2/2] Address review comments --- emitter.py | 34 +++++++++++++++++++++++----------- tests/core/test_emitter.py | 3 +-- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/emitter.py b/emitter.py index 4cf19b505e..1ff0b897a3 100644 --- a/emitter.py +++ b/emitter.py @@ -31,7 +31,9 @@ # Only enforced for the metrics API on our end, for now MAX_COMPRESSED_SIZE = 2 << 20 # 2MB, the backend should accept up to 3MB but let's be conservative here -MAX_SPLIT_DEPTH = 3 # maximum depth of recursive calls to payload splitting function +# maximum depth of recursive calls to payload splitting function, a bit arbitrary (we don't want too much +# depth in the recursive calls, but we want to give up only when some metrics are clearly too large) +MAX_SPLIT_DEPTH = 2 def remove_control_chars(s, log): @@ -83,15 +85,15 @@ def post_payload(url, message, serialize_func, agentConfig, log): try: payloads = serialize_func(message, MAX_COMPRESSED_SIZE, 0, log) - except UnicodeDecodeError as ude: + except UnicodeDecodeError: log.exception('http_emitter: Unable to convert message to json') # early return as we can't actually process the message return - except RuntimeError as rte: + except RuntimeError: log.exception('http_emitter: runtime error dumping message to json') # early return as we can't actually process the message return - except Exception as e: + except Exception: log.exception('http_emitter: unknown exception processing message') return @@ -139,7 +141,7 @@ def serialize_and_compress_legacy_payload(legacy_payload, max_compressed_size, d if len(zipped) > max_compressed_size: # let's just log a warning for now, splitting the legacy payload is tricky - log.warning("collector payload is above the limit of %dKB compressed", max_compressed_size/(1<<10)) + log.warning("collector payload is above the limit of %dKB compressed", max_compressed_size/(1 << 10)) return compressed_payloads @@ -147,7 +149,8 @@ def serialize_and_compress_legacy_payload(legacy_payload, max_compressed_size, d def serialize_and_compress_metrics_payload(metrics_payload, max_compressed_size, depth, log): """ Serialize and compress the metrics payload - If the compressed payload is too big, we attempt to split it into smaller payloads + If the compressed payload is too big, we attempt to split it into smaller payloads and call this + function recursively on each smaller payload """ compressed_payloads = [] @@ -166,14 +169,23 @@ def serialize_and_compress_metrics_payload(metrics_payload, max_compressed_size, log.error("Maximum depth of payload splitting reached, dropping the %d metrics in this chunk", len(series)) return compressed_payloads - nb_chunks = len(zipped)/max_compressed_size + 1 + int(compression_ratio/2) # try to account for the compression - log.debug("payload is too big (%d bytes), splitting it in %d chunks", len(zipped), nb_chunks) + # Try to account for the compression when estimating the number of chunks needed to get small-enough chunks + n_chunks = len(zipped)/max_compressed_size + 1 + int(compression_ratio/2) + log.debug("payload is too big (%d bytes), splitting it in %d chunks", len(zipped), n_chunks) - series_per_chunk = len(series)/nb_chunks + 1 + series_per_chunk = len(series)/n_chunks + 1 - for i in range(nb_chunks): + for i in range(n_chunks): + # Create each chunk and make them go through this function recursively ; increment the `depth` of the recursive call compressed_payloads.extend( - serialize_and_compress_metrics_payload({"series": series[i*series_per_chunk:(i+1)*series_per_chunk]}, max_compressed_size, depth+1, log) + serialize_and_compress_metrics_payload( + { + "series": series[i*series_per_chunk:(i+1)*series_per_chunk] + }, + max_compressed_size, + depth+1, + log + ) ) return compressed_payloads diff --git a/tests/core/test_emitter.py b/tests/core/test_emitter.py index 5d07389736..ded20bcda9 100644 --- a/tests/core/test_emitter.py +++ b/tests/core/test_emitter.py @@ -10,7 +10,6 @@ remove_undecodable_chars, sanitize_payload, serialize_and_compress_metrics_payload, - serialize_payload, split_payload, ) @@ -103,7 +102,7 @@ def test_remove_undecodable_characters(self): def test_metrics_payload_chunks(self, compress_mock): log = mock.Mock() nb_series = 10000 - max_compressed_size = 1 << 10 + max_compressed_size = 1 << 10 # 1KB, well below the original size of our payload of 10000 metrics metrics_payload = {"series": [ {