Skip to content

Commit

Permalink
Eco-CI brought back to main repo
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneTR committed Nov 23, 2024
1 parent 8540952 commit 5474f6b
Show file tree
Hide file tree
Showing 8 changed files with 1,219 additions and 7 deletions.
8 changes: 8 additions & 0 deletions api/api_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,3 +658,11 @@ def authenticate(authentication_token=Depends(header_scheme), request: Request =
except UserAuthenticationError:
raise HTTPException(status_code=401, detail="Invalid token") from UserAuthenticationError
return user

def get_connecting_ip(request):
connecting_ip = request.headers.get("x-forwarded-for")

if connecting_ip:
return connecting_ip.split(",")[0]

return request.client.host
267 changes: 267 additions & 0 deletions api/eco_ci.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
from datetime import date

from fastapi import APIRouter
from fastapi import Request, Response, Depends
from fastapi.responses import ORJSONResponse

from api.api_helpers import authenticate, html_escape_multi, get_connecting_ip, rescale_energy_value
from api.object_specifications import CI_Measurement_Old, CI_Measurement

import anybadge

from xml.sax.saxutils import escape as xml_escape

from lib import error_helpers
from lib.user import User
from lib.db import DB

router = APIRouter()


@router.post('/v1/ci/measurement/add')
async def post_ci_measurement_add_deprecated(
request: Request,
measurement: CI_Measurement_Old,
user: User = Depends(authenticate) # pylint: disable=unused-argument
):

measurement = html_escape_multi(measurement)

used_client_ip = get_connecting_ip(request)

co2i_transformed = int(measurement.co2i) if measurement.co2i else None

co2eq_transformed = int(float(measurement.co2eq)*1000000) if measurement.co2eq else None

query = '''
INSERT INTO
ci_measurements (energy_uj,
repo,
branch,
workflow_id,
run_id,
label,
source,
cpu,
commit_hash,
duration_us,
cpu_util_avg,
workflow_name,
lat,
lon,
city,
carbon_intensity_g,
carbon_ug,
filter_type,
filter_project,
filter_machine,
filter_tags,
user_id,
ip_address
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
'''

params = ( measurement.energy_value*1000, measurement.repo, measurement.branch,
measurement.workflow, measurement.run_id, measurement.label, measurement.source, measurement.cpu,
measurement.commit_hash, measurement.duration*1000000, measurement.cpu_util_avg, measurement.workflow_name,
measurement.lat, measurement.lon, measurement.city, co2i_transformed, co2eq_transformed,
'machine.ci', 'CI/CD', 'unknown', [],
user._id, used_client_ip)


DB().query(query=query, params=params)

if measurement.energy_value <= 1 or (measurement.co2eq and co2eq_transformed <= 1):
error_helpers.log_error(
'Extremely small energy budget was submitted to old Eco-CI API',
measurement=measurement
)

return Response(status_code=204)


@router.post('/v2/ci/measurement/add')
async def post_ci_measurement_add(
request: Request,
measurement: CI_Measurement,
user: User = Depends(authenticate) # pylint: disable=unused-argument
):

measurement = html_escape_multi(measurement)

params = [measurement.energy_uj, measurement.repo, measurement.branch,
measurement.workflow, measurement.run_id, measurement.label, measurement.source, measurement.cpu,
measurement.commit_hash, measurement.duration_us, measurement.cpu_util_avg, measurement.workflow_name,
measurement.lat, measurement.lon, measurement.city, measurement.carbon_intensity_g, measurement.carbon_ug,
measurement.filter_type, measurement.filter_project, measurement.filter_machine]

tags_replacer = ' ARRAY[]::text[] '
if measurement.filter_tags:
tags_replacer = f" ARRAY[{','.join(['%s']*len(measurement.filter_tags))}] "
params = params + measurement.filter_tags

used_client_ip = measurement.ip # If an ip has been given with the data. We prioritize that
if used_client_ip is None:
used_client_ip = get_connecting_ip(request)

params.append(used_client_ip)
params.append(user._id)

query = f"""
INSERT INTO
ci_measurements (energy_uj,
repo,
branch,
workflow_id,
run_id,
label,
source,
cpu,
commit_hash,
duration_us,
cpu_util_avg,
workflow_name,
lat,
lon,
city,
carbon_intensity_g,
carbon_ug,
filter_type,
filter_project,
filter_machine,
filter_tags,
ip_address,
user_id
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
{tags_replacer},
%s, %s)
"""

DB().query(query=query, params=params)

if measurement.energy_uj <= 1 or (measurement.carbon_ug and measurement.carbon_ug <= 1):
error_helpers.log_error(
'Extremely small energy budget was submitted to Eco-CI API',
measurement=measurement
)

return Response(status_code=204)

@router.get('/v1/ci/measurements')
async def get_ci_measurements(repo: str, branch: str, workflow: str, start_date: date, end_date: date):

query = """
SELECT energy_uj, run_id, created_at, label, cpu, commit_hash, duration_us, source, cpu_util_avg,
(SELECT workflow_name FROM ci_measurements AS latest_workflow
WHERE latest_workflow.repo = ci_measurements.repo
AND latest_workflow.branch = ci_measurements.branch
AND latest_workflow.workflow_id = ci_measurements.workflow_id
ORDER BY latest_workflow.created_at DESC
LIMIT 1) AS workflow_name,
lat, lon, city, carbon_intensity_g, carbon_ug
FROM ci_measurements
WHERE
repo = %s AND branch = %s AND workflow_id = %s
AND DATE(created_at) >= TO_DATE(%s, 'YYYY-MM-DD')
AND DATE(created_at) <= TO_DATE(%s, 'YYYY-MM-DD')
ORDER BY run_id ASC, created_at ASC
"""
params = (repo, branch, workflow, str(start_date), str(end_date))
data = DB().fetch_all(query, params=params)

if data is None or data == []:
return Response(status_code=204) # No-Content

return ORJSONResponse({'success': True, 'data': data})

@router.get('/v1/ci/repositories')
async def get_ci_repositories(repo: str | None = None, sort_by: str = 'name'):

params = []
query = """
SELECT repo, source, MAX(created_at) as last_run
FROM ci_measurements
WHERE 1=1
"""

if repo: # filter is currently not used, but may be a feature in the future
query = f"{query} AND ci_measurements.repo = %s \n"
params.append(repo)

query = f"{query} GROUP BY repo, source"

if sort_by == 'date':
query = f"{query} ORDER BY last_run DESC"
else:
query = f"{query} ORDER BY repo ASC"

data = DB().fetch_all(query, params=tuple(params))
if data is None or data == []:
return Response(status_code=204) # No-Content

return ORJSONResponse({'success': True, 'data': data}) # no escaping needed, as it happend on ingest


@router.get('/v1/ci/runs')
async def get_ci_runs(repo: str, sort_by: str = 'name'):

params = []
query = """
SELECT repo, branch, workflow_id, source, MAX(created_at) as last_run,
(SELECT workflow_name FROM ci_measurements AS latest_workflow
WHERE latest_workflow.repo = ci_measurements.repo
AND latest_workflow.branch = ci_measurements.branch
AND latest_workflow.workflow_id = ci_measurements.workflow_id
ORDER BY latest_workflow.created_at DESC
LIMIT 1) AS workflow_name
FROM ci_measurements
WHERE 1=1
"""

query = f"{query} AND ci_measurements.repo = %s \n"
params.append(repo)
query = f"{query} GROUP BY repo, branch, workflow_id, source"

if sort_by == 'date':
query = f"{query} ORDER BY last_run DESC"
else:
query = f"{query} ORDER BY repo ASC"

data = DB().fetch_all(query, params=tuple(params))
if data is None or data == []:
return Response(status_code=204) # No-Content

return ORJSONResponse({'success': True, 'data': data}) # no escaping needed, as it happend on ingest

@router.get('/v1/ci/badge/get')
async def get_ci_badge_get(repo: str, branch: str, workflow:str):
query = """
SELECT SUM(energy_uj), MAX(run_id)
FROM ci_measurements
WHERE repo = %s AND branch = %s AND workflow_id = %s
GROUP BY run_id
ORDER BY MAX(created_at) DESC
LIMIT 1
"""

params = (repo, branch, workflow)
data = DB().fetch_one(query, params=params)

if data is None or data == [] or data[1] is None: # special check for data[1] as this is aggregate query which always returns result
return Response(status_code=204) # No-Content

energy_value = data[0]

[energy_value, energy_unit] = rescale_energy_value(energy_value, 'uJ')
badge_value= f"{energy_value:.2f} {energy_unit}"

badge = anybadge.Badge(
label='Energy Used',
value=xml_escape(badge_value),
num_value_padding_chars=1,
default_color='green')
return Response(content=str(badge), media_type="image/svg+xml")
6 changes: 4 additions & 2 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import anybadge

from api import eco_ci
from api.object_specifications import Software
from api.api_helpers import (ORJSONResponseObjKeep, add_phase_stats_statistics, determine_comparison_case,
html_escape_multi, get_phase_stats, get_phase_stats_object,
Expand Down Expand Up @@ -731,11 +732,12 @@ async def robots_txt():
async def read_authentication_token(user: User = Depends(authenticate)):
return ORJSONResponse({'success': True, 'data': user.to_dict()})

app.include_router(eco_ci.router)

# include enterprise functionality if activated
if GlobalConfig().config.get('ee_token', False):
from ee.api import carbondb, eco_ci, power_hog
from ee.api import carbondb, power_hog
app.include_router(carbondb.router)
app.include_router(eco_ci.router)
app.include_router(power_hog.router)

if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 5474f6b

Please sign in to comment.