Skip to content

Commit

Permalink
Replace backend_type with is_background_worker
Browse files Browse the repository at this point in the history
* This commit attenpts to solve issue : dalibo#130 [1].
* PoWA has a bug where backend_type is not correctly filled [2]. it
  was fixed but it's not in the stable version yet.
* Since [3], we queried for the `backend_type` and filtered
  `parallel workers` in the Python code. The test is now done in
  SQL,  which avoids the problems all together.
* The `parallel worker` detection was not working for pg 11+ because
  it uses the 'parallel worker' backend_type. It's now fixed.

[1] dalibo#130
[2] powa-team/powa-archivist#20
[3] dalibo@6a180d8
  • Loading branch information
blogh committed Apr 9, 2020
1 parent 7b9d9f4 commit 4b3548a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 10 deletions.
50 changes: 43 additions & 7 deletions pgactivity/Data.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,42 @@ def pg_get_activities(self, duration_mode=1):
"""
Get activity from pg_stat_activity view.
"""
if self.pg_num_version >= 100000:
# PostgreSQL 10 and more
if self.pg_num_version >= 110000:
# PostgreSQL 11 and more
query = """
SELECT
pg_stat_activity.pid AS pid,
pg_stat_activity.application_name AS application_name,
CASE WHEN LENGTH(pg_stat_activity.datname) > 16
THEN SUBSTRING(pg_stat_activity.datname FROM 0 FOR 6)||'...'||SUBSTRING(pg_stat_activity.datname FROM '........$')
ELSE pg_stat_activity.datname
END
AS database,
CASE WHEN pg_stat_activity.client_addr IS NULL
THEN 'local'
ELSE pg_stat_activity.client_addr::TEXT
END
AS client,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.query_start)) AS duration,
CASE WHEN pg_stat_activity.wait_event_type IN ('LWLock', 'Lock', 'BufferPin') THEN true ELSE false END AS wait,
pg_stat_activity.usename AS user,
pg_stat_activity.state AS state,
pg_stat_activity.query AS query,
pg_stat_activity.backend_type = 'parallel worker' AS is_parallel_worker
FROM
pg_stat_activity
WHERE
state <> 'idle'
AND pid <> pg_backend_pid()
AND CASE WHEN %(min_duration)s = 0 THEN true
ELSE extract(epoch from now() - {duration_column}) > %(min_duration)s
END
ORDER BY
EXTRACT(epoch FROM (NOW() - pg_stat_activity.query_start)) DESC
"""
elif self.pg_num_version >= 100000:
# PostgreSQL 10
# We assume a background_worker with a not null query is a parallel worker.
query = """
SELECT
pg_stat_activity.pid AS pid,
Expand All @@ -355,7 +389,7 @@ def pg_get_activities(self, duration_mode=1):
pg_stat_activity.usename AS user,
pg_stat_activity.state AS state,
pg_stat_activity.query AS query,
pg_stat_activity.backend_type AS backend_type
(pg_stat_activity.backend_type = 'background worker' AND pg_stat_activity.query IS NOT NULL) AS is_parallel_worker
FROM
pg_stat_activity
WHERE
Expand All @@ -369,6 +403,7 @@ def pg_get_activities(self, duration_mode=1):
"""
elif self.pg_num_version >= 90600:
# PostgreSQL prior to 10.0 and >= 9.6.0
# There is no way to see parallel workers
query = """
SELECT
pg_stat_activity.pid AS pid,
Expand All @@ -388,7 +423,7 @@ def pg_get_activities(self, duration_mode=1):
pg_stat_activity.usename AS user,
pg_stat_activity.state AS state,
pg_stat_activity.query AS query,
null AS backend_type
false AS is_parallel_worker
FROM
pg_stat_activity
WHERE
Expand Down Expand Up @@ -421,7 +456,7 @@ def pg_get_activities(self, duration_mode=1):
pg_stat_activity.usename AS user,
pg_stat_activity.state AS state,
pg_stat_activity.query AS query,
null AS backend_type
false AS is_parallel_worker
FROM
pg_stat_activity
WHERE
Expand Down Expand Up @@ -454,7 +489,7 @@ def pg_get_activities(self, duration_mode=1):
pg_stat_activity.waiting AS wait,
pg_stat_activity.usename AS user,
pg_stat_activity.current_query AS query,
null AS backend_type
false AS is_parallel_worker
FROM
pg_stat_activity
WHERE
Expand All @@ -473,6 +508,7 @@ def pg_get_activities(self, duration_mode=1):
cur = self.pg_conn.cursor()
cur.execute(query, {'min_duration': self.min_duration})
ret = cur.fetchall()

return ret

def pg_get_waiting(self, duration_mode=1):
Expand Down Expand Up @@ -823,7 +859,7 @@ def sys_get_proc(self, queries, is_local):
process.set_extra('io_wait',
self.__sys_get_iow_status(psproc.status_iow()))
process.set_extra('psutil_proc', psproc)
process.set_extra('backend_type', query['backend_type'])
process.set_extra('is_parallel_worker', query['is_parallel_worker'])
process.set_extra('appname', query['application_name'])

processes[process.pid] = process
Expand Down
6 changes: 3 additions & 3 deletions pgactivity/UI.py
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,7 @@ def __poll_activities(self, interval, flag, indent, process = None, \
'duration': self.data.get_duration(proc.duration),
'wait': proc.wait,
'io_wait': proc.get_extra('io_wait'),
'backend_type': proc.get_extra('backend_type')
'is_parallel_worker': proc.get_extra('is_parallel_worker')
})

except psutil.NoSuchProcess:
Expand Down Expand Up @@ -2235,8 +2235,8 @@ def __refresh_line(self, process, flag, indent, \
dif = self.maxx - len(indent) - 1

query = ''
if 'backend_type' in process and \
process['backend_type'] == 'background worker':
if 'is_parallel_worker' in process and \
process['is_parallel_worker']:

query += '\_ '
if self.verbose_mode == PGTOP_TRUNCATE:
Expand Down

0 comments on commit 4b3548a

Please sign in to comment.