Skip to content

Commit

Permalink
Common: rewrite check_transfer_queue_status
Browse files Browse the repository at this point in the history
* Rewrite base query with sqla2.0
* Move CASE statements into python logic
* Sort Imports
* Update header
* Update exception statement
* Change gauge to PrometheusPusher
  • Loading branch information
voetberg committed Aug 6, 2024
1 parent a7c0556 commit 4ad7968
Showing 1 changed file with 54 additions and 67 deletions.
121 changes: 54 additions & 67 deletions common/check_transfer_queues_status
Original file line number Diff line number Diff line change
@@ -1,85 +1,72 @@
#!/usr/bin/env python
# Copyright European Organization for Nuclear Research (CERN) 2013
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# Authors:
# - Mario Lassnig, <[email protected]>, 2013-2021
# - Cedric Serfon, <[email protected]>, 2014
# - Wen Guan, <[email protected]>, 2015
# - Thomas Beermann, <[email protected]>, 2019
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Probe to check the queues of the transfer service
"""
from __future__ import print_function

import sys
import traceback
from sqlalchemy import select, func

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get
from rucio.db.sqla.session import BASE, get_session
from rucio.db.sqla import session, models
from rucio.db.sqla.session import get_session

from utils.common import probe_metrics
from utils.common import PrometheusPusher

# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

if BASE.metadata.schema:
schema = BASE.metadata.schema + '.'
else:
schema = ''

active_queue = """SELECT
CASE
WHEN state = 'S' THEN 'queues.requests.submitted.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'Q' THEN 'queues.requests.queued.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'F' THEN 'queues.requests.failed.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'D' THEN 'queues.requests.done.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'L' THEN 'queues.requests.lost.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'W' THEN 'queues.requests.waiting.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'M' THEN 'queues.requests.mismatchscheme.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'G' THEN 'queues.requests.submitting.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'N' THEN 'queues.requests.nosources.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'O' THEN 'queues.requests.onlytapesources.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'A' THEN 'queues.requests.submissionfailed.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'U' THEN 'queues.requests.suspend.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
WHEN state = 'P' THEN 'queues.requests.preparing.' || replace(activity, ' ', '_') || '.' || nvl(replace(substr(external_host, 9, length(external_host)-13), '.', '_'), 'no_fts_host')
ELSE state
END state_desc,
num_rows
FROM
(
select state, count(*) num_rows, activity, external_host
FROM {schema}requests
GROUP BY state, activity, external_host
)""".format(schema=schema)

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":
try:
registry = CollectorRegistry()
g = Gauge('conveyor_queues_requests', '', labelnames=('state', 'activity', 'external_host'), registry=registry)
session = get_session()
for k in session.execute(active_queue).fetchall():
print(k[0], k[1], end=" ")
probe_metrics.gauge(name=k[0].replace('-', '_')).set(k[1])
items = k[0].split('.')
state = items[2]
activity = items[3]
external_host = items[4].replace('-', '_')
g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k[1])
if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_transfer_queues_status', registry=registry)
except:
continue
except:
statement = select(
func.count(),
models.Request.state,
models.Request.activity,
models.Request.external_host
).group_by(
models.Request.state,
models.Request.activity,
models.Request.external_host
)

with PrometheusPusher() as manager:
for row in session.execute(statement):
activity = row.activity.replace(" ", "_")

# host 1.2.3.45 -> 1_2_3_45, or "no_fts_host" if there is no host
if row.external_host is not None:
start = 9
stop = len(row.external_host) - 13
if stop < start:
external_host = 'no_fts_host'
else:
external_host = row.external_host[start, stop].replace(".", "_")
else:
external_host = 'no_fts_host'

# Get the name of the activity from the str rep. RequestState.STATE -> state
state = f"queues.requests.{str(row.state).split('.')[-1].lower()}"

print(f"{state}.{activity}.{external_host} {row.count}")
(manager.gauge(
"transfer.{state}.{activity}.{external_host}",
documentation="Status of requests by state, activity, external host.")
.labels(activity=activity, state=state, external_host=external_host)
.set(row.count))
except Exception:
print(traceback.format_exc())
sys.exit(UNKNOWN)
sys.exit(OK)
sys.exit(OK)

0 comments on commit 4ad7968

Please sign in to comment.