-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmain.py
82 lines (68 loc) · 2.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import os
import logging
import time
import uuid
from metrics.metrics import PrefectMetrics
from metrics.healthz import PrefectHealthz
from prometheus_client import start_http_server, REGISTRY
def metrics():
"""
Main entry point for the PrefectMetrics exporter.
"""
# Get environment variables or use default values
loglevel = str(os.getenv("LOG_LEVEL", "INFO"))
max_retries = int(os.getenv("MAX_RETRIES", "3"))
metrics_addr = os.getenv("METRICS_ADDR", "0.0.0.0")
metrics_port = int(os.getenv("METRICS_PORT", "8000"))
offset_minutes = int(os.getenv("OFFSET_MINUTES", "3"))
url = str(os.getenv("PREFECT_API_URL", "http://localhost:4200/api"))
api_key = str(os.getenv("PREFECT_API_KEY", ""))
csrf_client_id = str(uuid.uuid4())
scrape_interval_seconds = int(os.getenv("SCRAPE_INTERVAL_SECONDS", "30"))
# Configure logging
logging.basicConfig(
level=loglevel, format="%(asctime)s - %(name)s - [%(levelname)s] %(message)s"
)
logger = logging.getLogger("prometheus-prefect-exporter")
# Configure headers for HTTP requests
headers = {"accept": "application/json", "Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
# check endpoint
PrefectHealthz(
url=url, headers=headers, max_retries=max_retries, logger=logger
).get_health_check()
##
# NOTIFY IF PAGINATION IS ENABLED
#
enable_pagination = str(os.getenv("PAGINATION_ENABLED", "True")) == "True"
pagination_limit = int(os.getenv("PAGINATION_LIMIT", 200))
if enable_pagination:
logger.info("Pagination is enabled")
logger.info(f"Pagination limit is {pagination_limit}")
else:
logger.info("Pagination is disabled")
# Create an instance of the PrefectMetrics class
metrics = PrefectMetrics(
url=url,
headers=headers,
offset_minutes=offset_minutes,
max_retries=max_retries,
client_id=csrf_client_id,
csrf_enabled=str(os.getenv("PREFECT_CSRF_ENABLED", "False")) == "True",
logger=logger,
# Enable pagination if not specified to avoid breaking existing deployments
enable_pagination=enable_pagination,
pagination_limit=pagination_limit,
)
# Register the metrics with Prometheus
logger.info("Initializing metrics...")
REGISTRY.register(metrics)
# Start the HTTP server to expose Prometheus metrics
start_http_server(metrics_port, metrics_addr)
logger.info(f"Exporter listening on {metrics_addr}:{metrics_port}")
# Run the loop to collect Prefect metrics
while True:
time.sleep(scrape_interval_seconds)
if __name__ == "__main__":
metrics()