Skip to content

Commit

Permalink
Adding enterprise submodule for custom additional components (#996)
Browse files Browse the repository at this point in the history
* Added optional enterprise submodule

* Added checked out version of enterprise submodule

* frontend file for CarbonDB now as symlink

* Removing enterprise functionality and adding modularity to Dashboard and install process

* Using events API to send token validation

* ee token must be in double quoted string

* Eco-CI brought back to main repo

* Eco-CI now always active in frontend

* Updated ee submodule

* Moved ee tests to ee folder; Updated ee repo

* test-config now created dynamically

* Removing test-config from git

* Ignoring test-config.yml

* Do not checkout submodules by default

* missing .sh

* Refactored create_test_config_file

* Updated README with enterprise test instructions

* CarbonDB and PowerHOG must be actively deactivated
  • Loading branch information
ArneTR authored Nov 24, 2024
1 parent 4936f62 commit c3636f8
Show file tree
Hide file tree
Showing 43 changed files with 444 additions and 3,764 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests-bare-metal-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
uses: actions/checkout@v4
with:
ref: 'main'
submodules: 'true'
submodules: 'false'

- name: Eco CI Energy Estimation - Initialize
uses: green-coding-solutions/eco-ci-energy-estimation@main
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests-eco-ci-energy-estimation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
uses: actions/checkout@v4
with:
ref: 'main'
submodules: 'true'
submodules: 'false'

- name: Eco CI Energy Estimation - Initialize
uses: green-coding-solutions/eco-ci-energy-estimation@testing
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests-vm-mac.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
uses: actions/checkout@v4
with:
ref: ${{ github.ref }}
submodules: 'true'
submodules: 'false'

- if: ${{ github.event_name == 'workflow_dispatch' || steps.check-date.outputs.should_run == 'true'}}
name: 'Setup, Run, and Teardown Tests'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests-vm-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
uses: actions/checkout@v4
with:
ref: 'main'
submodules: 'true'
submodules: 'false'

- if: ${{ github.event_name == 'workflow_dispatch' || steps.check-date.outputs.should_run == 'true'}}
name: 'Setup, Run, and Teardown Tests'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/tests-vm-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: actions/checkout@v4
with:
ref: ${{ github.ref }}
submodules: 'true'
submodules: 'false'


- name: 'Setup, Run, and Teardown Tests'
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ static-binary
/docker/test-compose.yml
/tests/structure.sql
tools/sgx_enable
/tests/test-config.yml
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "metric_providers/psu/energy/ac/xgboost/machine/model"]
path = metric_providers/psu/energy/ac/xgboost/machine/model
url = https://github.com/green-coding-solutions/spec-power-model
[submodule "ee"]
path = ee
url = [email protected]:green-coding-solutions/gmt-enterprise.git
268 changes: 25 additions & 243 deletions api/api_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,27 @@
import faulthandler
faulthandler.enable(file=sys.__stderr__) # will catch segfaults and write to stderr

from urllib.parse import urlparse

from functools import cache
from html import escape as html_escape
import re
import math
import typing
import ipaddress
import json
import uuid

from starlette.background import BackgroundTask
from fastapi.responses import ORJSONResponse
from fastapi import Depends, Request, HTTPException
from fastapi.security import APIKeyHeader
import numpy as np
import requests
import scipy.stats
from pydantic import BaseModel


from lib.global_config import GlobalConfig
from lib.db import DB
from lib import error_helpers
from lib.user import User, UserAuthenticationError
from lib.secure_variable import SecureVariable

import redis
from enum import Enum
Expand Down Expand Up @@ -632,191 +633,33 @@ def __init__(
self.content = content
super().__init__(content, status_code, headers, media_type, background)

# The decorator will not work between requests, so we are not prone to stale data over time
@cache
def get_geo(ip):

ip_obj = ipaddress.ip_address(ip) # may raise a ValueError
if ip_obj.is_private:
error_helpers.log_error(f"Private IP was submitted to get_geo {ip}. This is normal in development, but should not happen in production.")
return('52.53721666833642', '13.424863870661927')

query = "SELECT ip_address, data FROM ip_data WHERE created_at > NOW() - INTERVAL '24 hours' AND ip_address=%s;"
db_data = DB().fetch_all(query, (ip,))

if db_data is not None and len(db_data) != 0:
return (db_data[0][1].get('latitude'), db_data[0][1].get('longitude'))

latitude, longitude = get_geo_ip_api_com(ip)

if not latitude:
latitude, longitude = get_geo_ipapi_co(ip)
if not latitude:
latitude, longitude = get_geo_ip_ipinfo(ip)
if not latitude:
raise RuntimeError(f"Could not get Geo-IP for {ip} after 3 tries")

return (latitude, longitude)


def get_geo_ipapi_co(ip):

print(f"Accessing https://ipapi.co/{ip}/json/")
try:
response = requests.get(f"https://ipapi.co/{ip}/json/", timeout=10)
except Exception as exc: #pylint: disable=broad-exception-caught
error_helpers.log_error('API request to ipapi.co failed ...', exception=exc)
return (False, False)

if response.status_code == 200:
resp_data = response.json()

if 'error' in resp_data or 'latitude' not in resp_data or 'longitude' not in resp_data:
return (None, None)

resp_data['source'] = 'ipapi.co'

query = "INSERT INTO ip_data (ip_address, data) VALUES (%s, %s)"
DB().query(query=query, params=(ip, json.dumps(resp_data)))

return (resp_data.get('latitude'), resp_data.get('longitude'))
header_scheme = APIKeyHeader(
name='X-Authentication',
scheme_name='Header',
description='Authentication key - See https://docs.green-coding.io/authentication',
auto_error=False
)

error_helpers.log_error(f"Could not get Geo-IP from ipapi.co for {ip}. Trying next ...", response=response)

return (False, False)

def get_geo_ip_api_com(ip):

print(f"Accessing http://ip-api.com/json/{ip}")
try:
response = requests.get(f"http://ip-api.com/json/{ip}", timeout=10)
except Exception as exc: #pylint: disable=broad-exception-caught
error_helpers.log_error('API request to ip-api.com failed ...', exception=exc)
return (False, False)

if response.status_code == 200:
resp_data = response.json()

if ('status' in resp_data and resp_data.get('status') == 'fail') or 'lat' not in resp_data or 'lon' not in resp_data:
return (None, None)

resp_data['latitude'] = resp_data.get('lat')
resp_data['longitude'] = resp_data.get('lon')
resp_data['source'] = 'ip-api.com'

query = "INSERT INTO ip_data (ip_address, data) VALUES (%s, %s)"
DB().query(query=query, params=(ip, json.dumps(resp_data)))

return (resp_data.get('latitude'), resp_data.get('longitude'))

error_helpers.log_error(f"Could not get Geo-IP from ip-api.com for {ip}. Trying next ...", response=response)

return (False, False)

def get_geo_ip_ipinfo(ip):

print(f"Accessing https://ipinfo.io/{ip}/json")
def authenticate(authentication_token=Depends(header_scheme), request: Request = None):
parsed_url = urlparse(str(request.url))
try:
response = requests.get(f"https://ipinfo.io/{ip}/json", timeout=10)
except Exception as exc: #pylint: disable=broad-exception-caught
error_helpers.log_error('API request to ipinfo.io failed ...', exception=exc)
return (False, False)

if response.status_code == 200:
resp_data = response.json()

if 'bogon' in resp_data or 'loc' not in resp_data:
return (None, None)

lat_lng = resp_data.get('loc').split(',')
if not authentication_token or authentication_token.strip() == '': # Note that if no token is supplied this will authenticate as the DEFAULT user, which in FOSS systems has full capabilities
authentication_token = 'DEFAULT'

resp_data['latitude'] = lat_lng[0]
resp_data['longitude'] = lat_lng[1]
resp_data['source'] = 'ipinfo.io'
user = User.authenticate(SecureVariable(authentication_token))

query = "INSERT INTO ip_data (ip_address, data) VALUES (%s, %s)"
DB().query(query=query, params=(ip, json.dumps(resp_data)))
if not user.can_use_route(parsed_url.path):
raise HTTPException(status_code=401, detail="Route not allowed") from UserAuthenticationError

return (resp_data.get('latitude'), resp_data.get('longitude'))
if not user.has_api_quota(parsed_url.path):
raise HTTPException(status_code=401, detail="Quota exceeded") from UserAuthenticationError

error_helpers.log_error(f"Could not get Geo-IP from ipinfo.io for {ip}. Trying next ...", response=response)

return (False, False)

# The decorator will not work between requests, so we are not prone to stale data over time
@cache
def get_carbon_intensity(latitude, longitude):

if latitude is None or longitude is None:
return None

query = "SELECT latitude, longitude, data FROM carbon_intensity WHERE created_at > NOW() - INTERVAL '1 hours' AND latitude=%s AND longitude=%s;"
db_data = DB().fetch_all(query, (latitude, longitude))

if db_data is not None and len(db_data) != 0:
return db_data[0][2].get('carbonIntensity')

if not (electricitymaps_token := GlobalConfig().config.get('electricity_maps_token')):
raise ValueError('You need to specify an electricitymap token in the config!')

if electricitymaps_token == 'testing':
# If we are running tests we always return 1000
return 1000

headers = {'auth-token': electricitymaps_token }
params = {'lat': latitude, 'lon': longitude }

response = requests.get('https://api.electricitymap.org/v3/carbon-intensity/latest', params=params, headers=headers, timeout=10)
print(f"Accessing electricitymap with {latitude} {longitude}")
if response.status_code == 200:
resp_data = response.json()
query = "INSERT INTO carbon_intensity (latitude, longitude, data) VALUES (%s, %s, %s)"
DB().query(query=query, params=(latitude, longitude, json.dumps(resp_data)))

return resp_data.get('carbonIntensity')

error_helpers.log_error(f"Could not get carbon intensity from Electricitymaps.org for {params}", response=response)

return None

def carbondb_add(connecting_ip, data, source, user_id):

query = '''
INSERT INTO carbondb_data_raw
("type", "project", "machine", "source", "tags","time","energy_kwh","carbon_kg","carbon_intensity_g","latitude","longitude","ip_address","user_id","created_at")
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())
'''

used_client_ip = data.get('ip', None) # An ip has been given with the data. We prioritize that
if used_client_ip is None:
used_client_ip = connecting_ip

carbon_intensity_g_per_kWh = data.get('carbon_intensity_g', None)

if carbon_intensity_g_per_kWh is not None: # we need this check explicitely as we want to allow 0 as possible value
latitude = None # no use to derive if we get supplied data. We rather indicate with NULL that user supplied
longitude = None # no use to derive if we get supplied data. We rather indicate with NULL that user supplied
else:
latitude, longitude = get_geo(used_client_ip) # cached
carbon_intensity_g_per_kWh = get_carbon_intensity(latitude, longitude) # cached

energy_J = float(data['energy_uj']) / 1e6
energy_kWh = energy_J / (3_600*1_000)
carbon_kg = (energy_kWh * carbon_intensity_g_per_kWh)/1_000

DB().query(
query=query,
params=(
data['type'],
data['project'], data['machine'], source, data['tags'], data['time'], energy_kWh, carbon_kg, carbon_intensity_g_per_kWh, latitude, longitude, used_client_ip, user_id))


def validate_carbondb_params(param, elements: list):
for el in elements:
if not re.fullmatch(r'[A-Za-z0-9\._-]+', el):
raise ValueError(f"Parameter for '{param}' may only contain A-Za-z0-9._- characters and no spaces. Was: {el}")
user.deduct_api_quota(parsed_url.path, 1)

except UserAuthenticationError:
raise HTTPException(status_code=401, detail="Invalid token") from UserAuthenticationError
return user

def get_connecting_ip(request):
connecting_ip = request.headers.get("x-forwarded-for")
Expand All @@ -825,64 +668,3 @@ def get_connecting_ip(request):
return connecting_ip.split(",")[0]

return request.client.host


def replace_nan_with_zero(obj):
if isinstance(obj, dict):
for k, v in obj.items():
if isinstance(v, (dict, list)):
replace_nan_with_zero(v)
elif isinstance(v, float) and math.isnan(v):
obj[k] = 0
elif isinstance(obj, list):
for i, item in enumerate(obj):
if isinstance(item, (dict, list)):
replace_nan_with_zero(item)
elif isinstance(item, float) and math.isnan(item):
obj[i] = 0
return obj

# Refactor have this in the Pydantic model?
# https://github.com/green-coding-solutions/green-metrics-tool/issues/907
def validate_hog_measurement_data(data):
required_top_level_fields = [
'coalitions', 'all_tasks', 'elapsed_ns', 'processor', 'thermal_pressure'
]
for field in required_top_level_fields:
if field not in data:
raise ValueError(f"Missing required field: {field}")

# Validate 'coalitions' structure
if not isinstance(data['coalitions'], list):
raise ValueError("Expected 'coalitions' to be a list")

for coalition in data['coalitions']:
required_coalition_fields = [
'name', 'tasks', 'energy_impact_per_s', 'cputime_ms_per_s',
'diskio_bytesread', 'diskio_byteswritten', 'intr_wakeups', 'idle_wakeups'
]
for field in required_coalition_fields:
if field not in coalition:
raise ValueError(f"Missing required coalition field: {field}")
if field == 'tasks' and not isinstance(coalition['tasks'], list):
raise ValueError(f"Expected 'tasks' to be a list in coalition: {coalition['name']}")

# Validate 'all_tasks' structure
if 'energy_impact_per_s' not in data['all_tasks']:
raise ValueError("Missing 'energy_impact_per_s' in 'all_tasks'")

# Validate 'processor' structure based on the processor type
processor_fields = data['processor'].keys()
if 'ane_energy' in processor_fields:
required_processor_fields = ['combined_power', 'cpu_energy', 'gpu_energy', 'ane_energy']
elif 'package_joules' in processor_fields:
required_processor_fields = ['package_joules', 'cpu_joules', 'igpu_watts']
else:
raise ValueError("Unknown processor type")

for field in required_processor_fields:
if field not in processor_fields:
raise ValueError(f"Missing required processor field: {field}")

# All checks passed
return True
Loading

0 comments on commit c3636f8

Please sign in to comment.