Skip to content

Commit

Permalink
[postgres] Allow to use psycopg2 (#2782)
Browse files Browse the repository at this point in the history
Fix #2755
  • Loading branch information
Remi Hakim authored Aug 26, 2016
1 parent d339d26 commit 1312bd5
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 14 deletions.
46 changes: 33 additions & 13 deletions checks.d/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@
import socket

# 3rd party
import pg8000 as pg
from pg8000 import InterfaceError, ProgrammingError
try:
import psycopg2
except ImportError:
psycopg2 = None

import pg8000

# project
from checks import AgentCheck, CheckException
Expand All @@ -20,6 +24,10 @@
MAX_CUSTOM_RESULTS = 100
TABLE_COUNT_LIMIT = 200

def psycopg2_connect(*args, **kwargs):
del kwargs['ssl']
return psycopg2.connect(*args, **kwargs)


class ShouldRestartException(Exception):
pass
Expand Down Expand Up @@ -300,6 +308,16 @@ def __init__(self, name, init_config, agentConfig, instances=None):
self.replication_metrics = {}
self.custom_metrics = {}

def _get_pg_attrs(self, instance):
if _is_affirmative(instance.get('use_psycopg2', False)):
if psycopg2 is None:
self.log.error("Unable to import psycopg2, falling back to pg8000")
else:
return psycopg2_connect, psycopg2.InterfaceError, psycopg2.ProgrammingError

# Let's use pg8000
return pg8000.connect, pg8000.InterfaceError, pg8000.ProgrammingError

def _get_version(self, key, db):
if key not in self.versions:
cursor = db.cursor()
Expand Down Expand Up @@ -416,7 +434,7 @@ def _build_relations_config(self, yamlconfig):
self.log.warn('Failed to parse config element=%s, check syntax' % str(element))
return config

def _collect_stats(self, key, db, instance_tags, relations, custom_metrics, function_metrics, count_metrics):
def _collect_stats(self, key, db, instance_tags, relations, custom_metrics, function_metrics, count_metrics, interface_error, programming_error):
"""Query pg_stat_* for various metrics
If relations is not an empty list, gather per-relation metrics
on top of that.
Expand Down Expand Up @@ -491,7 +509,7 @@ def _collect_stats(self, key, db, instance_tags, relations, custom_metrics, func
cursor.execute(query.replace(r'%', r'%%'))

results = cursor.fetchall()
except ProgrammingError as e:
except programming_error as e:
log_func("Not all metrics may be available: %s" % str(e))
continue

Expand Down Expand Up @@ -556,7 +574,7 @@ def _collect_stats(self, key, db, instance_tags, relations, custom_metrics, func
v[0][1](self, v[0][0], v[1], tags=tags)

cursor.close()
except InterfaceError as e:
except interface_error as e:
self.log.error("Connection error: %s" % str(e))
raise ShouldRestartException
except socket.error as e:
Expand All @@ -571,7 +589,7 @@ def _get_service_check_tags(self, host, port, dbname):
]
return service_check_tags

def get_connection(self, key, host, port, user, password, dbname, ssl, use_cached=True):
def get_connection(self, key, host, port, user, password, dbname, ssl, connect_fct, use_cached=True):
"Get and memoize connections to instances"
if key in self.dbs and use_cached:
return self.dbs[key]
Expand All @@ -580,12 +598,12 @@ def get_connection(self, key, host, port, user, password, dbname, ssl, use_cache
try:
if host == 'localhost' and password == '':
# Use ident method
connection = pg.connect("user=%s dbname=%s" % (user, dbname))
connection = connect_fct("user=%s dbname=%s" % (user, dbname))
elif port != '':
connection = pg.connect(host=host, port=port, user=user,
connection = connect_fct(host=host, port=port, user=user,
password=password, database=dbname, ssl=ssl)
else:
connection = pg.connect(host=host, user=user, password=password,
connection = connect_fct(host=host, user=user, password=password,
database=dbname, ssl=ssl)
except Exception as e:
message = u'Error establishing postgres connection: %s' % (str(e))
Expand Down Expand Up @@ -667,17 +685,19 @@ def check(self, instance):
# preset tags to the database name
db = None

connect_fct, interface_error, programming_error = self._get_pg_attrs(instance)

# Collect metrics
try:
# Check version
db = self.get_connection(key, host, port, user, password, dbname, ssl)
db = self.get_connection(key, host, port, user, password, dbname, ssl, connect_fct)
version = self._get_version(key, db)
self.log.debug("Running check against version %s" % version)
self._collect_stats(key, db, tags, relations, custom_metrics, function_metrics, count_metrics)
self._collect_stats(key, db, tags, relations, custom_metrics, function_metrics, count_metrics, interface_error, programming_error)
except ShouldRestartException:
self.log.info("Resetting the connection")
db = self.get_connection(key, host, port, user, password, dbname, ssl, use_cached=False)
self._collect_stats(key, db, tags, relations, custom_metrics, function_metrics, count_metrics)
db = self.get_connection(key, host, port, user, password, dbname, ssl, connect_fct, use_cached=False)
self._collect_stats(key, db, tags, relations, custom_metrics, function_metrics, count_metrics, interface_error, programming_error)

if db is not None:
service_check_tags = self._get_service_check_tags(host, port, dbname)
Expand Down
1 change: 1 addition & 0 deletions conf.d/postgres.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ instances:
# password: my_password
# dbname: db_name
# ssl: False
# use_psycopg2: False # Force using psycogp2 instead of pg8000 to connect. WARNING: psycopg2 doesn't support ssl mode.
# tags:
# - optional_tag1
# - optional_tag2
Expand Down
218 changes: 217 additions & 1 deletion tests/checks/integration/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_checks(self):
}
]

self.run_check_twice(dict(instances=instances))
self.run_check_twice(dict(instances=instances), force_reload=True)

# Useful to get server version
# FIXME: Not great, should have a function like that available
Expand Down Expand Up @@ -217,3 +217,219 @@ def test_checks(self):
self.assertServiceMetadata(['version'], count=2)

self.coverage_report()
from pg8000.core import Connection
self.assertTrue(type(self.check.dbs[key]) == Connection)
self.check.dbs[key].close()

def test_psycopg2(self):
host = 'localhost'
port = 15432
dbname = 'datadog_test'

instances = [
{
'host': host,
'port': port,
'username': 'datadog',
'password': 'datadog',
'use_psycopg2': 'yes',
'dbname': dbname,
'relations': ['persons'],
'custom_metrics': [{
'descriptors': [('datname', 'customdb')],
'metrics': {
'numbackends': ['custom.numbackends', 'Gauge'],
},
'query': "SELECT datname, %s FROM pg_stat_database WHERE datname = 'datadog_test' LIMIT(1)",
'relation': False,
}]
},
{
'host': host,
'port': port,
'username': 'datadog',
'password': 'datadog',
'dbname': 'dogs',
'relations': ['breed', 'kennel']
}
]

self.run_check_twice(dict(instances=instances), force_reload=True)

# Useful to get server version
# FIXME: Not great, should have a function like that available
key = (host, port, dbname)
db = self.check.dbs[key]

# Testing DB_METRICS scope
COMMON_METRICS = [
'postgresql.connections',
'postgresql.commits',
'postgresql.rollbacks',
'postgresql.disk_read',
'postgresql.buffer_hit',
'postgresql.rows_returned',
'postgresql.rows_fetched',
'postgresql.rows_inserted',
'postgresql.rows_updated',
'postgresql.rows_deleted',
'postgresql.database_size',
]

for mname in COMMON_METRICS:
for db in ('datadog_test', 'dogs'):
self.assertMetric(mname, count=1, tags=['db:%s' % db])

NEWER_92_METRICS = [
'postgresql.deadlocks',
'postgresql.temp_bytes',
'postgresql.temp_files',
]

if self.check._is_9_2_or_above(key, db):
for mname in NEWER_92_METRICS:
for db in ('datadog_test', 'dogs'):
self.assertMetric(mname, count=1, tags=['db:%s' % db])

# Testing BGW_METRICS scope
COMMON_BGW_METRICS = [
'postgresql.bgwriter.checkpoints_timed',
'postgresql.bgwriter.checkpoints_requested',
'postgresql.bgwriter.buffers_checkpoint',
'postgresql.bgwriter.buffers_clean',
'postgresql.bgwriter.maxwritten_clean',
'postgresql.bgwriter.buffers_backend',
'postgresql.bgwriter.buffers_alloc',
]

for mname in COMMON_BGW_METRICS:
self.assertMetric(mname, count=1)

NEWER_91_BGW_METRICS = [
'postgresql.bgwriter.buffers_backend_fsync',
]

if self.check._is_9_1_or_above(key, db):
for mname in NEWER_91_BGW_METRICS:
self.assertMetric(mname, count=1)

NEWER_92_BGW_METRICS = [
'postgresql.bgwriter.write_time',
'postgresql.bgwriter.sync_time',
]

if self.check._is_9_2_or_above(key, db):
for mname in NEWER_92_BGW_METRICS:
self.assertMetric(mname, count=1)

# FIXME: Test postgresql.locks

# Relation specific metrics
RELATION_METRICS = [
'postgresql.seq_scans',
'postgresql.seq_rows_read',
'postgresql.index_scans',
'postgresql.index_rows_fetched',
'postgresql.rows_inserted',
'postgresql.rows_updated',
'postgresql.rows_deleted',
'postgresql.rows_hot_updated',
'postgresql.live_rows',
'postgresql.dead_rows',
]

SIZE_METRICS = [
'postgresql.table_size',
'postgresql.index_size',
'postgresql.total_size',
]

STATIO_METRICS = [
'postgresql.heap_blocks_read',
'postgresql.heap_blocks_hit',
'postgresql.index_blocks_read',
'postgresql.index_blocks_hit',
'postgresql.toast_blocks_read',
'postgresql.toast_blocks_hit',
'postgresql.toast_index_blocks_read',
'postgresql.toast_index_blocks_hit',
]

for inst in instances:
for rel in inst.get('relations', []):
expected_tags = ['db:%s' % inst['dbname'], 'table:%s' % rel]
expected_rel_tags = ['db:%s' % inst['dbname'], 'table:%s' % rel, 'schema:public']
for mname in RELATION_METRICS:
count = 1
# We only build a test index and stimulate it on breed
# in the dogs DB, so the other index metrics shouldn't be
# here.
if 'index' in mname and rel != 'breed':
count = 0
self.assertMetric(mname, count=count, tags=expected_rel_tags)

for mname in SIZE_METRICS:
self.assertMetric(mname, count=1, tags=expected_tags)

for mname in STATIO_METRICS:
at_least = None
count = 1
if '.index' in mname and rel != 'breed':
count = 0
# FIXME: toast are not reliable, need to do some more setup
# to get some values here I guess
if 'toast' in mname:
at_least = 0 # how to set easily a flaky metric, w/o impacting coverage
count = None
self.assertMetric(mname, count=count, at_least=at_least, tags=expected_rel_tags)

# Index metrics
IDX_METRICS = [
'postgresql.index_scans',
'postgresql.index_rows_read',
'postgresql.index_rows_fetched',
]

# we have a single index defined!
expected_tags = ['db:dogs', 'table:breed', 'index:breed_names', 'schema:public']
for mname in IDX_METRICS:
self.assertMetric(mname, count=1, tags=expected_tags)

# instance connection metrics
CONNECTION_METRICS = [
'postgresql.max_connections',
'postgresql.percent_usage_connections',
]
for mname in CONNECTION_METRICS:
self.assertMetric(mname, count=1)

# db level connections
for inst in instances:
expected_tags = ['db:%s' % inst['dbname']]
self.assertMetric('postgresql.connections', count=1, tags=expected_tags)

# By schema metrics
self.assertMetric('postgresql.table.count', value=2, count=1, tags=['schema:public'])
self.assertMetric('postgresql.db.count', value=2, count=1)

# Our custom metric
self.assertMetric('custom.numbackends', value=1, tags=['customdb:datadog_test'])

# Test service checks
self.assertServiceCheck('postgres.can_connect',
count=1, status=AgentCheck.OK,
tags=['host:localhost', 'port:15432', 'db:datadog_test']
)
self.assertServiceCheck('postgres.can_connect',
count=1, status=AgentCheck.OK,
tags=['host:localhost', 'port:15432', 'db:dogs']
)

# Assert service metadata
self.assertServiceMetadata(['version'], count=2)

self.coverage_report()

from psycopg2.extensions import connection
self.assertTrue(type(self.check.dbs[key]) == connection)
self.check.dbs[key].close()

0 comments on commit 1312bd5

Please sign in to comment.