Skip to content

Commit

Permalink
[Dashboard] add RAY_PROMETHEUS_HEADERS env for carrying additional he…
Browse files Browse the repository at this point in the history
…aders to Prometheus (ray-project#49353)

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian authored and Roshan Kathawate committed Jan 7, 2025
1 parent cfcabc1 commit 756d0ea
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 12 deletions.
12 changes: 11 additions & 1 deletion python/ray/dashboard/modules/data/data_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import ray.dashboard.optional_utils as optional_utils
import ray.dashboard.utils as dashboard_utils
from ray.dashboard.modules.metrics.metrics_head import (
DEFAULT_PROMETHEUS_HEADERS,
DEFAULT_PROMETHEUS_HOST,
PROMETHEUS_HEADERS_ENV_VAR,
PROMETHEUS_HOST_ENV_VAR,
PrometheusQueryError,
parse_prom_headers,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -52,6 +55,12 @@ def __init__(self, dashboard_head):
self.prometheus_host = os.environ.get(
PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
)
self.prometheus_headers = parse_prom_headers(
os.environ.get(
PROMETHEUS_HEADERS_ENV_VAR,
DEFAULT_PROMETHEUS_HEADERS,
)
)

@optional_utils.DashboardHeadRouteTable.get("/api/data/datasets/{job_id}")
@optional_utils.init_ray_and_catch_exceptions()
Expand Down Expand Up @@ -148,7 +157,8 @@ def is_minimal_module():

async def _query_prometheus(self, query):
async with self.http_session.get(
f"{self.prometheus_host}/api/v1/query?query={quote(query)}"
f"{self.prometheus_host}/api/v1/query?query={quote(query)}",
headers=self.prometheus_headers,
) as resp:
if resp.status == 200:
prom_data = await resp.json()
Expand Down
58 changes: 55 additions & 3 deletions python/ray/dashboard/modules/metrics/metrics_head.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
import logging
import os
import shutil
Expand Down Expand Up @@ -45,6 +46,8 @@

DEFAULT_PROMETHEUS_HOST = "http://localhost:9090"
PROMETHEUS_HOST_ENV_VAR = "RAY_PROMETHEUS_HOST"
DEFAULT_PROMETHEUS_HEADERS = "{}"
PROMETHEUS_HEADERS_ENV_VAR = "RAY_PROMETHEUS_HEADERS"
DEFAULT_PROMETHEUS_NAME = "Prometheus"
PROMETHEUS_NAME_ENV_VAR = "RAY_PROMETHEUS_NAME"
PROMETHEUS_HEALTHCHECK_PATH = "-/healthy"
Expand All @@ -57,6 +60,26 @@
GRAFANA_HEALTHCHECK_PATH = "api/health"


# parse_prom_headers will make sure the input is in one of the following formats:
# 1. {"H1": "V1", "H2": "V2"}
# 2. [["H1", "V1"], ["H2", "V2"], ["H2", "V3"]]
def parse_prom_headers(prometheus_headers):
parsed = json.loads(prometheus_headers)
if isinstance(parsed, dict):
if all(isinstance(k, str) and isinstance(v, str) for k, v in parsed.items()):
return parsed
if isinstance(parsed, list):
if all(len(e) == 2 and all(isinstance(v, str) for v in e) for e in parsed):
return parsed
raise ValueError(
f"{PROMETHEUS_HEADERS_ENV_VAR} should be a JSON string in one of the formats:\n"
+ "1) An object with string keys and string values.\n"
+ "2) an array of string arrays with 2 string elements each.\n"
+ 'For example, {"H1": "V1", "H2": "V2"} and\n'
+ '[["H1", "V1"], ["H2", "V2"], ["H2", "V3"]] are valid.'
)


class PrometheusQueryError(Exception):
def __init__(self, status, message):
self.message = (
Expand All @@ -73,6 +96,12 @@ def __init__(self, dashboard_head):
self.prometheus_host = os.environ.get(
PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
)
self.prometheus_headers = parse_prom_headers(
os.environ.get(
PROMETHEUS_HEADERS_ENV_VAR,
DEFAULT_PROMETHEUS_HEADERS,
)
)
default_metrics_root = os.path.join(self._dashboard_head.session_dir, "metrics")
session_latest_metrics_root = os.path.join(
self._dashboard_head.temp_dir, SESSION_LATEST, "metrics"
Expand Down Expand Up @@ -166,7 +195,9 @@ async def prometheus_health(self, req):
try:
path = f"{self.prometheus_host}/{PROMETHEUS_HEALTHCHECK_PATH}"

async with self.http_session.get(path) as resp:
async with self.http_session.get(
path, headers=self.prometheus_headers
) as resp:
if resp.status != 200:
return dashboard_optional_utils.rest_response(
success=False,
Expand Down Expand Up @@ -244,6 +275,18 @@ def _create_default_grafana_configs(self):
prometheus_host = os.environ.get(
PROMETHEUS_HOST_ENV_VAR, DEFAULT_PROMETHEUS_HOST
)
prometheus_headers = parse_prom_headers(
os.environ.get(PROMETHEUS_HEADERS_ENV_VAR, DEFAULT_PROMETHEUS_HEADERS)
)
# parse_prom_headers will make sure the prometheus_headers is either format of:
# 1. {"H1": "V1", "H2": "V2"} or
# 2. [["H1", "V1"], ["H2", "V2"], ["H2", "V3"]]
prometheus_header_pairs = []
if isinstance(prometheus_headers, list):
prometheus_header_pairs = prometheus_headers
elif isinstance(prometheus_headers, dict):
prometheus_header_pairs = [(k, v) for k, v in prometheus_headers.items()]

data_sources_path = os.path.join(grafana_provisioning_folder, "datasources")
os.makedirs(
data_sources_path,
Expand All @@ -261,9 +304,17 @@ def _create_default_grafana_configs(self):
"w",
) as f:
f.write(
GRAFANA_DATASOURCE_TEMPLATE.format(
GRAFANA_DATASOURCE_TEMPLATE(
prometheus_host=prometheus_host,
prometheus_name=self._prometheus_name,
jsonData={
f"httpHeaderName{i+1}": header
for i, (header, _) in enumerate(prometheus_header_pairs)
},
secureJsonData={
f"httpHeaderValue{i+1}": value
for i, (_, value) in enumerate(prometheus_header_pairs)
},
)
)
with open(
Expand Down Expand Up @@ -391,7 +442,8 @@ def on_new_lag(lag_s):

async def _query_prometheus(self, query):
async with self.http_session.get(
f"{self.prometheus_host}/api/v1/query?query={quote(query)}"
f"{self.prometheus_host}/api/v1/query?query={quote(query)}",
headers=self.prometheus_headers,
) as resp:
if resp.status == 200:
prom_data = await resp.json()
Expand Down
30 changes: 22 additions & 8 deletions python/ray/dashboard/modules/metrics/templates.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import yaml

GRAFANA_INI_TEMPLATE = """
[security]
allow_embedding = true
Expand All @@ -22,15 +24,27 @@
path: {dashboard_output_folder}
"""

GRAFANA_DATASOURCE_TEMPLATE = """apiVersion: 1

datasources:
- name: {prometheus_name}
url: {prometheus_host}
type: prometheus
isDefault: true
access: proxy
"""
def GRAFANA_DATASOURCE_TEMPLATE(
prometheus_name, prometheus_host, jsonData, secureJsonData
):
return yaml.safe_dump(
{
"apiVersion": 1,
"datasources": [
{
"name": prometheus_name,
"url": prometheus_host,
"type": "prometheus",
"isDefault": True,
"access": "proxy",
"jsonData": jsonData,
"secureJsonData": secureJsonData,
}
],
}
)


PROMETHEUS_YML_TEMPLATE = """# my global config
global:
Expand Down
51 changes: 51 additions & 0 deletions python/ray/dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,57 @@ def test_dashboard_module_load(tmpdir):
assert loaded_modules_actual == loaded_modules_expected


@pytest.mark.skipif(
os.environ.get("RAY_MINIMAL") == "1",
reason="This test is not supposed to work for minimal installation.",
)
def test_extra_prom_headers_validation(tmpdir, monkeypatch):
from ray.dashboard.modules.metrics.metrics_head import PROMETHEUS_HEADERS_ENV_VAR

"""Test the extra Prometheus headers validation in DashboardHead."""
head = DashboardHead(
http_host="127.0.0.1",
http_port=8265,
http_port_retries=1,
node_ip_address="127.0.0.1",
gcs_address="127.0.0.1:6379",
cluster_id_hex=ray.ClusterID.from_random().hex(),
grpc_port=0,
log_dir=str(tmpdir),
temp_dir=str(tmpdir),
session_dir=str(tmpdir),
minimal=False,
serve_frontend=True,
)
loaded_modules_expected = {"MetricsHead", "DataHead"}

# Test the base case.
head._load_modules(modules_to_load=loaded_modules_expected)

# Test the supported case.
monkeypatch.setenv(PROMETHEUS_HEADERS_ENV_VAR, '{"H1": "V1", "H2": "V2"}')
head._load_modules(modules_to_load=loaded_modules_expected)

# Test the supported case.
monkeypatch.setenv(
PROMETHEUS_HEADERS_ENV_VAR,
'[["H1", "V1"], ["H2", "V2"], ["H2", "V3"]]',
)
head._load_modules(modules_to_load=loaded_modules_expected)

# Test the unsupported case.
with pytest.raises(ValueError):
monkeypatch.setenv(
PROMETHEUS_HEADERS_ENV_VAR, '{"H1": "V1", "H2": ["V1", "V2"]}'
)
head._load_modules(modules_to_load=loaded_modules_expected)

# Test the unsupported case.
with pytest.raises(ValueError):
monkeypatch.setenv(PROMETHEUS_HEADERS_ENV_VAR, "not_json")
head._load_modules(modules_to_load=loaded_modules_expected)


@pytest.mark.skipif(
sys.version_info >= (3, 10, 0),
reason=(
Expand Down

0 comments on commit 756d0ea

Please sign in to comment.