Skip to content

Commit

Permalink
[dogstatsd] Recover gracefully from serialization errors
Browse files Browse the repository at this point in the history
- try to skip errors so that we can serialize what we can
- add a metric on serialization status
  • Loading branch information
Remi Hakim authored and olivielpeau committed Jan 4, 2016
1 parent 5ce6347 commit d24809f
Showing 1 changed file with 44 additions and 3 deletions.
47 changes: 44 additions & 3 deletions dogstatsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
# project
from aggregator import get_formatter, MetricsBucketAggregator
from checks.check_status import DogstatsdStatus
from checks.metric_types import MetricTypes
from config import get_config, get_version
from daemon import AgentSupervisor, Daemon
from util import chunks, get_hostname, get_uuid, plural
Expand Down Expand Up @@ -61,9 +62,48 @@
EVENT_CHUNK_SIZE = 50
COMPRESS_THRESHOLD = 1024

def add_serialization_status_metric(status, hostname):
interval = 10.0
value = 1
return {
'tags': ["status:{0}".format(status)],
'metric': 'datadog.dogstatsd.serialization_status',
'interval': interval,
'device_name': None,
'host': hostname,
'points': [(time(), value / interval)],
'type': MetricTypes.RATE,
}

def unicode_metrics(metrics):
for i, metric in enumerate(metrics):
for key, value in metric.items():
if isinstance(value, basestring):
metric[key] = unicode(value, errors='replace')
elif isinstance(value, tuple) or isinstance(value, list):
value_list = list(value)
for j, value_element in enumerate(value_list):
if isinstance(value_element, basestring):
value_list[j] = unicode(value_element, errors='replace')
metric[key] = tuple(value_list)
metrics[i] = metric
return metrics


def serialize_metrics(metrics, hostname):
try:
metrics.append(add_serialization_status_metric("success", hostname))
serialized = json.dumps({"series": metrics})
except (UnicodeDecodeError, Exception) as e:
log.exception("Unable to serialize payload. Trying to replace bad characters. %s", e)
metrics.append(add_serialization_status_metric("failure", hostname))
try:
log.error(metrics)
serialized = json.dumps({"series": unicode_metrics(metrics)})
except Exception as e:
log.exception("Unable to serialize payload. Giving up. %s", e)
serialized = json.dumps({"series": [add_serialization_status_metric("permanent_failure", hostname)]})

def serialize_metrics(metrics):
serialized = json.dumps({"series": metrics})
if len(serialized) > COMPRESS_THRESHOLD:
headers = {'Content-Type': 'application/json',
'Content-Encoding': 'deflate'}
Expand Down Expand Up @@ -91,6 +131,7 @@ def __init__(self, interval, metrics_aggregator, api_host, api_key=None,
self.metrics_aggregator = metrics_aggregator
self.flush_count = 0
self.log_count = 0
self.hostname = get_hostname()

self.watchdog = None
if use_watchdog:
Expand Down Expand Up @@ -174,7 +215,7 @@ def flush(self):
log.exception("Error flushing metrics")

def submit(self, metrics):
body, headers = serialize_metrics(metrics)
body, headers = serialize_metrics(metrics, self.hostname)
params = {}
if self.api_key:
params['api_key'] = self.api_key
Expand Down

0 comments on commit d24809f

Please sign in to comment.