diff --git a/api/api_helpers.py b/api/api_helpers.py index 92fa4a555..c957e6ac1 100644 --- a/api/api_helpers.py +++ b/api/api_helpers.py @@ -608,13 +608,14 @@ def __init__( self.content = content super().__init__(content, status_code, headers, media_type, background) +# The decorator will not work between requests, so we are not prone to stale data over time +@cache def get_geo(ip): - try: - ip_obj = ipaddress.ip_address(ip) - if ip_obj.is_private: - return('52.53721666833642', '13.424863870661927') - except ValueError: - return (None, None) + + ip_obj = ipaddress.ip_address(ip) # may raise a ValueError + if ip_obj.is_private: + error_helpers.log_error(f"Private IP was submitted to get_geo {ip}. This is normal in development, but should not happen in production.") + return('52.53721666833642', '13.424863870661927') query = "SELECT ip_address, data FROM ip_data WHERE created_at > NOW() - INTERVAL '24 hours' AND ip_address=%s;" db_data = DB().fetch_all(query, (ip,)) @@ -624,19 +625,20 @@ def get_geo(ip): latitude, longitude = get_geo_ipapi_co(ip) - if latitude is False: + if not latitude: latitude, longitude = get_geo_ip_api_com(ip) - if latitude is False: + if not latitude: latitude, longitude = get_geo_ip_ipinfo(ip) + if not latitude: + raise RuntimeError(f"Could not get Geo-IP for {ip} after 3 tries") - #If all 3 fail there is something bigger wrong return (latitude, longitude) def get_geo_ipapi_co(ip): response = requests.get(f"https://ipapi.co/{ip}/json/", timeout=10) - print(f"Accessing https://ipapi.co/{ip}/json/") + if response.status_code == 200: resp_data = response.json() @@ -650,6 +652,8 @@ def get_geo_ipapi_co(ip): return (resp_data.get('latitude'), resp_data.get('longitude')) + error_helpers.log_error(f"Could not get Geo-IP from ipapi.co for {ip}. Trying next ...", response=response) + return (False, False) def get_geo_ip_api_com(ip): @@ -671,6 +675,8 @@ def get_geo_ip_api_com(ip): return (resp_data.get('latitude'), resp_data.get('longitude')) + error_helpers.log_error(f"Could not get Geo-IP from ip-api.com for {ip}. Trying next ...", response=response) + return (False, False) def get_geo_ip_ipinfo(ip): @@ -694,8 +700,12 @@ def get_geo_ip_ipinfo(ip): return (resp_data.get('latitude'), resp_data.get('longitude')) + error_helpers.log_error(f"Could not get Geo-IP from ipinfo.io for {ip}. Trying next ...", response=response) + return (False, False) +# The decorator will not work between requests, so we are not prone to stale data over time +@cache def get_carbon_intensity(latitude, longitude): if latitude is None or longitude is None: @@ -726,12 +736,11 @@ def get_carbon_intensity(latitude, longitude): return resp_data.get('carbonIntensity') - return None + error_helpers.log_error(f"Could not get carbon intensity from Electricitymaps.org for {params}", response=response) -def carbondb_add(client_ip, energydatas): + return None - latitude, longitude = get_geo(client_ip) - carbon_intensity = get_carbon_intensity(latitude, longitude) +def carbondb_add(client_ip, energydatas, user_id): data_rows = [] @@ -752,10 +761,12 @@ def carbondb_add(client_ip, energydatas): if field_value is None or str(field_value).strip() == '': raise RequestValidationError(f"{field_name.capitalize()} is empty. Ignoring everything!") - if 'ip' in e: - # An ip has been given with the data. Let's use this: - latitude, longitude = get_geo(e['ip']) - carbon_intensity = get_carbon_intensity(latitude, longitude) + if 'ip' in e: # An ip has been given with the data. We prioritize that + latitude, longitude = get_geo(e['ip']) # cached + carbon_intensity = get_carbon_intensity(latitude, longitude) # cached + else: + latitude, longitude = get_geo(client_ip) # cached + carbon_intensity = get_carbon_intensity(latitude, longitude) # cached energy_kwh = float(e['energy_value']) * 2.77778e-7 # kWh co2_value = energy_kwh * carbon_intensity # results in g @@ -764,12 +775,12 @@ def carbondb_add(client_ip, energydatas): project_uuid = e['project'] if e['project'] is not None else '' tags_clean = "{" + ",".join([f'"{tag.strip()}"' for tag in e['tags'].split(',') if e['tags']]) + "}" if e['tags'] is not None else '' - row = f"{e['type']}|{company_uuid}|{e['machine']}|{project_uuid}|{tags_clean}|{int(e['time_stamp'])}|{e['energy_value']}|{co2_value}|{carbon_intensity}|{latitude}|{longitude}|{client_ip}" + row = f"{e['type']}|{company_uuid}|{e['machine']}|{project_uuid}|{tags_clean}|{int(e['time_stamp'])}|{e['energy_value']}|{co2_value}|{carbon_intensity}|{latitude}|{longitude}|{client_ip}|{user_id}" data_rows.append(row) data_str = "\n".join(data_rows) data_file = io.StringIO(data_str) - columns = ['type', 'company', 'machine', 'project', 'tags', 'time_stamp', 'energy_value', 'co2_value', 'carbon_intensity', 'latitude', 'longitude', 'ip_address'] + columns = ['type', 'company', 'machine', 'project', 'tags', 'time_stamp', 'energy_value', 'co2_value', 'carbon_intensity', 'latitude', 'longitude', 'ip_address', 'user_id'] DB().copy_from(file=data_file, table='carbondb_energy_data', columns=columns, sep='|') diff --git a/api/main.py b/api/main.py index 226c2d3a5..e2e672188 100644 --- a/api/main.py +++ b/api/main.py @@ -11,15 +11,20 @@ from typing import List from xml.sax.saxutils import escape as xml_escape import math -from fastapi import FastAPI, Request, Response +from urllib.parse import urlparse + +from fastapi import FastAPI, Request, Response, Depends, HTTPException from fastapi.responses import ORJSONResponse from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import APIKeyHeader + from datetime import date from starlette.responses import RedirectResponse from starlette.exceptions import HTTPException as StarletteHTTPException +from starlette.datastructures import Headers as StarletteHeaders from pydantic import BaseModel, ValidationError, field_validator from typing import Optional @@ -38,6 +43,8 @@ from lib.diff import get_diffable_row, diff_rows from lib import error_helpers from lib.job.base import Job +from lib.user import User, UserAuthenticationError +from lib.secure_variable import SecureVariable from tools.timeline_projects import TimelineProject from enum import Enum @@ -53,7 +60,7 @@ async def validation_exception_handler(request: Request, exc: RequestValidationE url=request.url, query_params=request.query_params, client=request.client, - headers=request.headers, + headers=obfuscate_authentication_token(request.headers), body=exc.body, details=exc.errors(), exception=exc @@ -71,7 +78,7 @@ async def http_exception_handler(request, exc): url=request.url, query_params=request.query_params, client=request.client, - headers=request.headers, + headers=obfuscate_authentication_token(request.headers), body=body, details=exc.detail, exception=exc @@ -84,6 +91,7 @@ async def http_exception_handler(request, exc): async def catch_exceptions_middleware(request: Request, call_next): #pylint: disable=broad-except body = None + try: body = await request.body() return await call_next(request) @@ -93,7 +101,7 @@ async def catch_exceptions_middleware(request: Request, call_next): url=request.url, query_params=request.query_params, client=request.client, - headers=request.headers, + headers=obfuscate_authentication_token(request.headers), body=body, exception=exc ) @@ -117,6 +125,40 @@ async def catch_exceptions_middleware(request: Request, call_next): allow_headers=['*'], ) +header_scheme = APIKeyHeader( + name='X-Authentication', + scheme_name='Header', + description='Authentication key - See https://docs.green-coding.io/authentication', + auto_error=False +) + +def obfuscate_authentication_token(headers: StarletteHeaders): + headers_mut = headers.mutablecopy() + if 'X-Authentication' in headers_mut: + headers_mut['X-Authentication'] = '****OBFUSCATED****' + return headers_mut + +def authenticate(authentication_token=Depends(header_scheme), request: Request = None): + parsed_url = urlparse(str(request.url)) + try: + + if not authentication_token: # Note that if no token is supplied this will authenticate as the DEFAULT user, which in FOSS systems has full capabilities + authentication_token = 'DEFAULT' + + user = User.authenticate(SecureVariable(authentication_token)) + + if not user.can_use_route(parsed_url.path): + raise HTTPException(status_code=401, detail="Route not allowed") from UserAuthenticationError + + if not user.has_api_quota(parsed_url.path): + raise HTTPException(status_code=401, detail="Quota exceeded") from UserAuthenticationError + + user.deduct_api_quota(parsed_url.path, 1) + + except UserAuthenticationError: + raise HTTPException(status_code=401, detail="Invalid token") from UserAuthenticationError + return user + @app.get('/') async def home(): @@ -215,6 +257,7 @@ async def get_repositories(uri: str | None = None, branch: str | None = None, ma return ORJSONResponse({'success': True, 'data': escaped_data}) + # A route to return all of the available entries in our catalog. @app.get('/v1/runs') async def get_runs(uri: str | None = None, branch: str | None = None, machine_id: int | None = None, machine: str | None = None, filename: str | None = None, limit: int | None = None, uri_mode = 'none'): @@ -590,7 +633,6 @@ async def get_jobs(machine_id: int | None = None, state: str | None = None): return ORJSONResponse({'success': True, 'data': data}) -#### class HogMeasurement(BaseModel): time: int @@ -658,7 +700,10 @@ def validate_measurement_data(data): return True @app.post('/v1/hog/add') -async def hog_add(measurements: List[HogMeasurement]): +async def hog_add( + measurements: List[HogMeasurement], + user: User = Depends(authenticate), # pylint: disable=unused-argument + ): for measurement in measurements: decoded_data = base64.b64decode(measurement.data) @@ -735,8 +780,9 @@ async def hog_add(measurements: List[HogMeasurement]): ane_energy, energy_impact, thermal_pressure, - settings) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + settings, + user_id) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """ params = ( @@ -750,6 +796,7 @@ async def hog_add(measurements: List[HogMeasurement]): cpu_energy_data['energy_impact'], measurement_data['thermal_pressure'], measurement.settings, + user._id, ) measurement_db_id = DB().fetch_one(query=query, params=params)[0] @@ -1018,9 +1065,6 @@ async def hog_get_task_details(machine_uuid: str, measurements_id_start: int, me return ORJSONResponse({'success': True, 'tasks_data': tasks_data, 'coalitions_data': coalitions_data}) - -#### - class Software(BaseModel): name: str url: str @@ -1031,7 +1075,7 @@ class Software(BaseModel): schedule_mode: str @app.post('/v1/software/add') -async def software_add(software: Software): +async def software_add(software: Software, user: User = Depends(authenticate)): software = html_escape_multi(software) @@ -1057,22 +1101,26 @@ async def software_add(software: Software): if not DB().fetch_one('SELECT id FROM machines WHERE id=%s AND available=TRUE', params=(software.machine_id,)): raise RequestValidationError('Machine does not exist') + if not user.can_use_machine(software.machine_id): + raise RequestValidationError('Your user does not have the permissions to use that machine.') if software.schedule_mode not in ['one-off', 'daily', 'weekly', 'commit', 'variance']: raise RequestValidationError(f"Please select a valid measurement interval. ({software.schedule_mode}) is unknown.") - # notify admin of new add - if notification_email := GlobalConfig().config['admin']['notification_email']: - Job.insert('email', name='New run added from Web Interface', message=str(software), email=notification_email) - + if not user.can_schedule_job(software.schedule_mode): + raise RequestValidationError('Your user does not have the permissions to use that schedule mode.') if software.schedule_mode in ['daily', 'weekly', 'commit']: - TimelineProject.insert(software.name, software.url, software.branch, software.filename, software.machine_id, software.schedule_mode) + TimelineProject.insert(name=software.name, url=software.url, branch=software.branch, filename=software.filename, machine_id=software.machine_id, user_id=user._id, schedule_mode=software.schedule_mode) # even for timeline projects we do at least one run amount = 10 if software.schedule_mode == 'variance' else 1 for _ in range(0,amount): - Job.insert('run', name=software.name, url=software.url, email=software.email, branch=software.branch, filename=software.filename, machine_id=software.machine_id) + Job.insert('run', user_id=user._id, name=software.name, url=software.url, email=software.email, branch=software.branch, filename=software.filename, machine_id=software.machine_id) + + # notify admin of new add + if notification_email := GlobalConfig().config['admin']['notification_email']: + Job.insert('email', user_id=user._id, name='New run added from Web Interface', message=str(software), email=notification_email) return ORJSONResponse({'success': True}, status_code=202) @@ -1165,7 +1213,11 @@ class CI_Measurement(BaseModel): co2eq: Optional[str] = '' @app.post('/v1/ci/measurement/add') -async def post_ci_measurement_add(request: Request, measurement: CI_Measurement): +async def post_ci_measurement_add( + request: Request, + measurement: CI_Measurement, + user: User = Depends(authenticate) # pylint: disable=unused-argument + ): for key, value in measurement.model_dump().items(): match key: case 'unit': @@ -1206,14 +1258,15 @@ async def post_ci_measurement_add(request: Request, measurement: CI_Measurement) lon, city, co2i, - co2eq + co2eq, + user_id ) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = (measurement.energy_value, measurement.energy_unit, measurement.repo, measurement.branch, measurement.workflow, measurement.run_id, measurement.label, measurement.source, measurement.cpu, measurement.commit_hash, measurement.duration, measurement.cpu_util_avg, measurement.workflow_name, - measurement.lat, measurement.lon, measurement.city, measurement.co2i, measurement.co2eq) + measurement.lat, measurement.lon, measurement.city, measurement.co2i, measurement.co2eq, user._id) DB().query(query=query, params=params) @@ -1239,8 +1292,16 @@ async def post_ci_measurement_add(request: Request, measurement: CI_Measurement) 'tags': f"{measurement.label},{measurement.repo},{measurement.branch},{measurement.workflow}" } - # If there is an error the function will raise an Error - carbondb_add(client_ip, [energydata]) + try: + carbondb_add(client_ip, [energydata], user._id) + #pylint: disable=broad-except + except Exception as exc: + error_helpers.log_error('CI Measurement was successfully added, but CarbonDB did failed', exception=exc) + return ORJSONResponse({ + 'success': False, + 'err': f"CI Measurement was successfully added, but CarbonDB did respond with exception: {str(exc)}"}, + status_code=207 + ) return ORJSONResponse({'success': True}, status_code=201) @@ -1377,7 +1438,11 @@ def empty_str_to_none(cls, values, _): return values @app.post('/v1/carbondb/add') -async def add_carbondb(request: Request, energydatas: List[EnergyData]): +async def add_carbondb( + request: Request, + energydatas: List[EnergyData], + user: User = Depends(authenticate) # pylint: disable=unused-argument + ): client_ip = request.headers.get("x-forwarded-for") if client_ip: @@ -1385,7 +1450,7 @@ async def add_carbondb(request: Request, energydatas: List[EnergyData]): else: client_ip = request.client.host - carbondb_add(client_ip, energydatas) + carbondb_add(client_ip, energydatas, user._id) return Response(status_code=204) @@ -1441,5 +1506,19 @@ async def carbondb_get_company_project_details(cptype: str, uuid: str): return ORJSONResponse({'success': True, 'data': data}) +# @app.get('/v1/authentication/new') +# This will fail if the DB insert fails but still report 'success': True +# Must be reworked if we want to allow API based token generation +# async def get_authentication_token(name: str = None): +# if name is not None and name.strip() == '': +# name = None +# return ORJSONResponse({'success': True, 'data': User.get_new(name)}) + +@app.get('/v1/authentication/data') +async def read_authentication_token(user: User = Depends(authenticate)): + return ORJSONResponse({'success': True, 'data': user.__dict__}) + + + if __name__ == '__main__': app.run() # pylint: disable=no-member diff --git a/config.yml.example b/config.yml.example index 02937615d..20075f87d 100644 --- a/config.yml.example +++ b/config.yml.example @@ -26,7 +26,6 @@ admin: email_bcc: False - cluster: api_url: __API_URL__ metrics_url: __METRICS_URL__ @@ -65,8 +64,6 @@ measurement: pre-test-sleep: 5 idle-duration: 5 baseline-duration: 5 - flow-process-duration: 1800 # half hour - total-duration: 3600 # one hour post-test-sleep: 5 phase-transition-time: 1 boot: diff --git a/docker/structure.sql b/docker/structure.sql index 65ecdb477..da4e67684 100644 --- a/docker/structure.sql +++ b/docker/structure.sql @@ -1,9 +1,35 @@ CREATE DATABASE "green-coding"; \c green-coding; +CREATE SCHEMA IF NOT EXISTS "public"; + CREATE EXTENSION "uuid-ossp"; CREATE EXTENSION "moddatetime"; +CREATE TABLE users ( + id SERIAL PRIMARY KEY, + name text, + token text NOT NULL, + capabilities JSONB NOT NULL, + created_at timestamp with time zone DEFAULT now(), + updated_at timestamp with time zone +); + +CREATE UNIQUE INDEX name_unique ON users(name text_ops); +CREATE UNIQUE INDEX token_unique ON users(token text_ops); + +CREATE TRIGGER users_moddatetime + BEFORE UPDATE ON users + FOR EACH ROW + EXECUTE PROCEDURE moddatetime (updated_at); + +-- Default password for authentication is DEFAULT +INSERT INTO "public"."users"("name","token","capabilities","created_at","updated_at") +VALUES +(E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurement":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); + + + CREATE TABLE machines ( id SERIAL PRIMARY KEY, description text, @@ -24,6 +50,12 @@ CREATE TRIGGER machines_moddatetime FOR EACH ROW EXECUTE PROCEDURE moddatetime (updated_at); +-- Default password for authentication is DEFAULT +INSERT INTO "public"."machines"("description", "available") +VALUES +(E'Local machine', true); + + CREATE TABLE jobs ( id SERIAL PRIMARY KEY, type text, @@ -36,6 +68,7 @@ CREATE TABLE jobs ( categories int[], machine_id int REFERENCES machines(id) ON DELETE SET NULL ON UPDATE CASCADE, message text, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -67,6 +100,7 @@ CREATE TABLE runs ( logs text, invalid_run text, failed boolean DEFAULT false, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -183,6 +217,7 @@ CREATE TABLE ci_measurements ( city text, co2i text, co2eq text, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -217,6 +252,7 @@ CREATE TABLE timeline_projects ( machine_id integer REFERENCES machines(id) ON DELETE RESTRICT ON UPDATE CASCADE NOT NULL, schedule_mode text NOT NULL, last_scheduled timestamp with time zone, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -238,6 +274,7 @@ CREATE TABLE hog_measurements ( thermal_pressure text, settings jsonb, data jsonb, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -287,7 +324,6 @@ CREATE TABLE hog_tasks ( diskio_byteswritten bigint, intr_wakeups bigint, idle_wakeups bigint, - data jsonb, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone @@ -336,6 +372,7 @@ CREATE TABLE carbondb_energy_data ( latitude DOUBLE PRECISION, longitude DOUBLE PRECISION, ip_address INET, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); @@ -365,6 +402,7 @@ CREATE TABLE carbondb_energy_data_day ( co2_sum FLOAT, carbon_intensity_avg FLOAT, record_count INT, + user_id integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE, created_at timestamp with time zone DEFAULT now(), updated_at timestamp with time zone ); diff --git a/frontend/authentication.html b/frontend/authentication.html new file mode 100644 index 000000000..9d5384285 --- /dev/null +++ b/frontend/authentication.html @@ -0,0 +1,95 @@ + + + + + + + + + + + + + Green Metrics Tool + + + + + + + + + + + + + + +
+

+ + Authentication +

+
+
+
Authentication in GMT
+
+

The Green Metrics Tool supports restricting certain functionalites for fair-use or premium access.

+

In case you already have a token you can input it here to use it with all API calls in this Dashboard and / or see your current capabilities and quotas.

+

If you want to acquire a new authentication token to access certain premium features that are not distributed with the open-source version contact us at info@green-coding.io

+
+
+
+
+
+
Your authentication token
+
+
+
+ +
+

+ + +
+
+
+
+ +
+
+ + \ No newline at end of file diff --git a/frontend/carbondb-details.html b/frontend/carbondb-details.html index 8dde252a1..df387732b 100644 --- a/frontend/carbondb-details.html +++ b/frontend/carbondb-details.html @@ -49,7 +49,7 @@

- Sum CO2eq (g) + Sum CO2eq (g)
@@ -57,7 +57,7 @@

- Avg. Carbon Intensity (gCO2e/kWh) + Avg. Carbon Intensity (gCO2e/kWh)
diff --git a/frontend/carbondb-lists.html b/frontend/carbondb-lists.html index 44a9044ba..45418c557 100644 --- a/frontend/carbondb-lists.html +++ b/frontend/carbondb-lists.html @@ -49,7 +49,7 @@

- Sum CO2eq (g) + Sum CO2eq (g)
diff --git a/frontend/css/green-coding.css b/frontend/css/green-coding.css index 67ef2a311..4d599a6ae 100644 --- a/frontend/css/green-coding.css +++ b/frontend/css/green-coding.css @@ -22,7 +22,7 @@ Thanks to https://css-tricks.com/transitions-only-after-page-load/ */ margin-top: 14px; } -.no-transform-statistics .statistic .value { +.no-transform { text-transform: none !important; } diff --git a/frontend/js/authentication.js b/frontend/js/authentication.js new file mode 100644 index 000000000..6f3b01493 --- /dev/null +++ b/frontend/js/authentication.js @@ -0,0 +1,42 @@ +(() => { + $(window).on('load', function() { + const authentication_token = localStorage.getItem('authentication_token'); + + if (authentication_token != null) { + $("#authentication-token").val(authentication_token); + } + }) + + // $('#create-authentication-token').on('click', async function(){ + // try { + // $('#new-token-message').hide(); + // var new_authentication_token = await makeAPICall(`/v1/authentication/new?name=${$("#new-token-name").val()}`); + // $('#new-token-message').show(); + // $('#new-token').text(new_authentication_token.data); + // } catch (err) { + // showNotification('Could not create new authentication token', err); + // } + // }) + + $('#save-authentication-token').on('click', async function(){ + + const authentication_token = $("#authentication-token").val().trim(); + if (authentication_token == '') { + showNotification('Please enter a non-empty authentication token'); + return false; + } + try { + $('#token-details-message').hide(); + const user_data = await makeAPICall('/v1/authentication/data', null, authentication_token); + + localStorage.setItem('authentication_token', authentication_token); + + $('#token-details-message').show(); + $('#token-details').text(JSON.stringify(user_data.data, null, 2)); + + } catch (err) { + showNotification('Could not read authentication token data', err); + } + }) + +})(); diff --git a/frontend/js/helpers/main.js b/frontend/js/helpers/main.js index c32b62b49..7685c8dc8 100644 --- a/frontend/js/helpers/main.js +++ b/frontend/js/helpers/main.js @@ -25,15 +25,18 @@ class GMTMenu extends HTMLElement { Eco-CI - - Status - Power Hog CarbonDB + + Status + + + Authentication + Settings @@ -138,7 +141,8 @@ const escapeString = (string) =>{ return my_string.replace(reg, (match) => map[match]); } -async function makeAPICall(path, values=null) { +async function makeAPICall(path, values=null, force_authentication_token=null) { + if(values != null ) { var options = { @@ -149,9 +153,11 @@ async function makeAPICall(path, values=null) { } } } else { - var options = { method: 'GET' } + var options = { method: 'GET', headers: {} } } + options.headers['X-Authentication'] = (force_authentication_token == null) ? localStorage.getItem('authentication_token'): force_authentication_token; + let json_response = null; if(localStorage.getItem('remove_idle') == 'true') path += "?remove_idle=true" await fetch(API_URL + path, options) diff --git a/frontend/js/helpers/metric-boxes.js b/frontend/js/helpers/metric-boxes.js index 37eec6b16..10cecc3fb 100644 --- a/frontend/js/helpers/metric-boxes.js +++ b/frontend/js/helpers/metric-boxes.js @@ -6,7 +6,7 @@ class PhaseMetrics extends HTMLElement { connectedCallback() { this.innerHTML = `

Key metrics

-
+
Phase Duration
diff --git a/frontend/request.html b/frontend/request.html index e114ead6a..1e8cc42bc 100644 --- a/frontend/request.html +++ b/frontend/request.html @@ -54,15 +54,16 @@

  • Only public repositories are supported
  • -
  • Only one run per day per repository is supported
  • -
  • For a premium account with no restrictions please contact us at info@green-coding.io
  • +
  • Limit on runs per day per, measurement minutes and data retention ...
  • +
  • For a premium account with no restrictions please see our comparison table
  • +
  • If you already have a premium plan please enter your authentication token on the settings page
Submit software for measurement
-
+
@@ -84,6 +85,7 @@

+

Find the specifications of the machines in our documentation

-

Find the specifications of the machines in our documentation

-
diff --git a/lib/db.py b/lib/db.py index 9736063e4..76d254d16 100644 --- a/lib/db.py +++ b/lib/db.py @@ -1,5 +1,4 @@ #pylint: disable=consider-using-enumerate - from psycopg_pool import ConnectionPool import psycopg.rows @@ -68,6 +67,19 @@ def fetch_one(self, query, params=None, fetch_mode=None): def fetch_all(self, query, params=None, fetch_mode=None): return self.__query(query, params=params, return_type='all', fetch_mode=fetch_mode) + def import_csv(self, filename): + raise NotImplementedError('Code still flakes on ; in data. Please rework') + # pylint: disable=unreachable + with self._pool.connection() as conn: + conn.autocommit = True + cur = conn.cursor() + with open(filename, 'r', encoding='utf-8') as sql_file: + sql_script = sql_file.read() + for statement in sql_script.split(';'): + if statement.strip(): + cur.execute(statement) + conn.autocommit = False + def copy_from(self, file, table, columns, sep=','): with self._pool.connection() as conn: conn.autocommit = False # is implicit default diff --git a/lib/error_helpers.py b/lib/error_helpers.py index 0085e76c4..b67e0f212 100644 --- a/lib/error_helpers.py +++ b/lib/error_helpers.py @@ -13,7 +13,9 @@ def end_error(*messages, **kwargs): def format_error(*messages, **kwargs): err = '\n'.join(messages) err += '\n\n' - err += '\n'.join([f"{key.capitalize()} ({value.__class__}): {value}" for key, value in kwargs.items()]) + err += '\n'.join([f"{key.capitalize()} ({value.__class__.__name__}): {value}" for key, value in kwargs.items()]) + if 'run_id' in kwargs and kwargs['run_id']: + err += f"\nRun-ID Link: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={kwargs['run_id']}" error_string = f""" \n<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< 0_o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n @@ -39,4 +41,4 @@ def log_error(*messages, **kwargs): print(TerminalColors.FAIL, err, TerminalColors.ENDC, file=sys.stderr) if error_email := GlobalConfig().config['admin']['error_email']: - Job.insert('email', email=error_email, name='Green Metrics Tool Error', message=err) + Job.insert('email', user_id=None, email=error_email, name='Green Metrics Tool Error', message=err) diff --git a/lib/job/base.py b/lib/job/base.py index dee5c2702..79f0cf2ad 100644 --- a/lib/job/base.py +++ b/lib/job/base.py @@ -23,7 +23,7 @@ """ class Job(ABC): - def __init__(self, state, name, email, url, branch, filename, machine_id, run_id, job_id, machine_description, message): + def __init__(self, *, state, name, email, url, branch, filename, machine_id, user_id, run_id, job_id, machine_description, message): self._id = job_id self._state = state self._name = name @@ -32,6 +32,7 @@ def __init__(self, state, name, email, url, branch, filename, machine_id, run_i self._branch = branch self._filename = filename self._machine_id = machine_id + self._user_id = user_id self._machine_description = machine_description self._run_id = run_id self._message = message @@ -71,18 +72,18 @@ def _process(self, **kwargs): pass @classmethod - def insert(cls, job_type, *, name=None, url=None, email=None, branch=None, filename=None, machine_id=None, message=None): + def insert(cls, job_type, *, user_id, name=None, url=None, email=None, branch=None, filename=None, machine_id=None, message=None): if job_type == 'run' and (not branch or not url or not filename or not machine_id): raise RuntimeError('For adding runs branch, url, filename and machine_id must be set') query = """ INSERT INTO - jobs (type, name, url, email, branch, filename, machine_id, message, state, created_at) + jobs (type, name, url, email, branch, filename, machine_id, user_id, message, state, created_at) VALUES - (%s, %s, %s, %s, %s, %s, %s, %s, 'WAITING', NOW()) RETURNING id; + (%s, %s, %s, %s, %s, %s, %s, %s, %s, 'WAITING', NOW()) RETURNING id; """ - params = (job_type, name, url, email, branch, filename, machine_id, message) + params = (job_type, name, url, email, branch, filename, machine_id, user_id, message) return DB().fetch_one(query, params=params)[0] # A static method to get a job object @@ -93,7 +94,7 @@ def get_job(cls, job_type): query = ''' SELECT j.id, j.state, j.name, j.email, j.url, j.branch, - j.filename, j.machine_id, m.description, j.message, r.id as run_id + j.filename, j.machine_id, j.user_id, m.description, j.message, r.id as run_id FROM jobs as j LEFT JOIN machines as m on m.id = j.machine_id LEFT JOIN runs as r on r.job_id = j.id @@ -132,9 +133,10 @@ def get_job(cls, job_type): branch=job[5], filename=job[6], machine_id=job[7], - machine_description=job[8], - message=job[9], - run_id=job[10], + user_id=job[8], + machine_description=job[9], + message=job[10], + run_id=job[11], ) @classmethod diff --git a/lib/job/run.py b/lib/job/run.py index ceb72afae..584f77eb9 100644 --- a/lib/job/run.py +++ b/lib/job/run.py @@ -11,6 +11,7 @@ from lib.job.base import Job from lib.global_config import GlobalConfig from lib.db import DB +from lib.user import User from lib.terminal_colors import TerminalColors from lib.system_checks import ConfigurationCheckError from tools.phase_stats import build_and_store_phase_stats @@ -27,6 +28,14 @@ def check_job_running(self): #pylint: disable=arguments-differ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_prune=False): + user = User(self._user_id) + + if not user.can_use_machine(self._machine_id): + raise RuntimeError(f"Your user does not have the permissions to use the selected machine. Machine ID: {self._machine_id}") + + if not user.has_measurement_quota(self._machine_id): + raise RuntimeError(f"Your user does not have enough measurement quota to run a job on the selected machine. Machine ID: {self._machine_id}") + runner = Runner( name=self._name, uri=self._url, @@ -38,10 +47,15 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru full_docker_prune=full_docker_prune, docker_prune=docker_prune, job_id=self._id, + user_id=self._user_id, + measurement_flow_process_duration=user._capabilities['measurement']['settings']['flow-process-duration'], + measurement_total_duration=user._capabilities['measurement']['settings']['total-duration'], ) try: # Start main code. Only URL is allowed for cron jobs self._run_id = runner.run() + user.deduct_measurement_quota(self._machine_id, int(runner._last_measurement_duration/1_000_000)) # duration in runner is in microseconds. We need seconds + build_and_store_phase_stats(self._run_id, runner._sci) # We need to import this here as we need the correct config file @@ -53,6 +67,7 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru if self._email: Job.insert( 'email', + user_id=self._user_id, email=self._email, name='Measurement Job successfully processed on Green Metrics Tool Cluster', message=f"Your report is now accessible under the URL: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={self._run_id}" @@ -64,6 +79,7 @@ def _process(self, skip_system_checks=False, docker_prune=False, full_docker_pru Job.insert( 'email', + user_id=self._user_id, email=self._email, name='Measurement Job on Green Metrics Tool Cluster failed', message=f"Run-ID: {self._run_id}\nName: {self._name}\n\nDetails can also be found in the log under: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={self._run_id}\n\nError message: {exc}\n" diff --git a/lib/secure_variable.py b/lib/secure_variable.py new file mode 100644 index 000000000..b205042a2 --- /dev/null +++ b/lib/secure_variable.py @@ -0,0 +1,32 @@ +import json + +class SecureVariable: + def __init__(self, value): + self._value = value + + def __repr__(self): + return '****OBFUSCATED****' + + def __str__(self): + return self.__repr__() + + def get_value(self): + return self._value + +class SecureVariableEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, SecureVariable): + return repr(o) + return super().default(o) + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('variable', help='Please supply a variable') + + args = parser.parse_args() # script will exit if arguments not present + + variable = SecureVariable(args.variable) + print("Variable print output looks like this:", variable) diff --git a/lib/user.py b/lib/user.py new file mode 100644 index 000000000..918738f5a --- /dev/null +++ b/lib/user.py @@ -0,0 +1,175 @@ +import json +import hashlib +import uuid + +from lib.secure_variable import SecureVariable +from lib.db import DB + +class User(): + + def __init__(self, user_id: int): + user = DB().fetch_one(""" + SELECT id, name, capabilities + FROM users + WHERE id = %s + """, params=(user_id, )) + if not user: + raise RuntimeError(f"User with id {user_id} not found in database") + + self._id = user[0] + self._name = user[1] + self._capabilities = user[2] + + + def __repr__(self): + values = self.__dict__.copy() + del values['_id'] + return str(values) + + def update(self): + DB().query(""" + UPDATE users + SET capabilities = %s + WHERE id = %s + """, params=(json.dumps(self._capabilities), self._id, )) + + def can_use_machine(self, machine_id: int): + return machine_id in self._capabilities['machines'] + + def can_use_route(self, route: str): + return route in self._capabilities['api']['routes'] + + def can_schedule_job(self, schedule_mode: str): + return schedule_mode in self._capabilities['jobs']['schedule_modes'] + + + def has_api_quota(self, route: str): + if route in self._capabilities['api']['quotas']: + return self._capabilities['api']['quotas'][route] > 0 + return True # None means infinite amounts + + def deduct_api_quota(self, route: str, amount: int): + if route in self._capabilities['api']['quotas']: + self._capabilities['api']['quotas'][route] -= amount + self.update() + + def has_measurement_quota(self, machine_id: int): + machine_id = str(machine_id) # json does not support integer keys + if machine_id in self._capabilities['measurement']['quotas']: + return self._capabilities['measurement']['quotas'][machine_id] > 0 + return True # None means infinite amounts + + def deduct_measurement_quota(self, machine_id: int, amount: int): + machine_id = str(machine_id) # json does not support integer keys + if machine_id in self._capabilities['measurement']['quotas']: + self._capabilities['measurement']['quotas'][machine_id] -= amount + self.update() + + @classmethod + def authenticate(cls, token: SecureVariable | None, silent=False): + sha256_hash = hashlib.sha256() + sha256_hash.update(token.get_value().encode('UTF-8')) + + user = DB().fetch_one(""" + SELECT id, name + FROM users + WHERE token = %s + """, params=((sha256_hash.hexdigest()), )) + if not user: + raise UserAuthenticationError('User with corresponding token not found') # do never output token everywhere cause it might land in logs + + return cls(user[0]) + + @staticmethod + def get_new(name=None): + + token = str(uuid.uuid4()).upper() + sha256_hash = hashlib.sha256() + sha256_hash.update(token.encode('UTF-8')) + + default_capabilities = { + "api": { + "quotas": { # An empty dictionary here means that no quotas apply + }, + "routes": [ # This will be dynamically loaded from the current main.py for all applicable routes + "/v1/carbondb/add", + "/v1/ci/measurement/add", + "/v1/software/add", + "/v1/hog/add", + "/v1/authentication/data", + ] + }, + "jobs": { + "schedule_modes": [ + "one-off", + "daily", + "weekly", + "commit", + "variance", + ], + }, + "measurement": { + "settings": { + "flow-process-duration": 3600, + "total-duration": 3600, + }, + "quotas": { # An empty dictionary here means that no quotas apply + "default": 10_000 + } + }, + "data": { + "runs": { + "retention": 3600, + }, + "measurements": { + "retention": 3600, + }, + "ci_measurements": { + "retention": 3600, + }, + "hog_measurements": { + "retention": 3600, + }, + "hog_coalitions": { + "retention": 3600, + }, + "hog_tasks": { + "retention": 3600, + }, + }, + "machines": [ # This will be dynamically loaded from the current database + 1, + ], + "optimizations": [ # This will be dynamically loaded from the current filesystem + "container_memory_utilization", + "container_cpu_utilization", + "message_optimization", + "container_build_time", + "container_boot_time", + "container_image_size", + ], + } + + user = DB().query(""" + INSERT INTO users + (name, token, capabilities) + VALUES + (%s, %s, %s) + """, params=((name, sha256_hash.hexdigest(), json.dumps(default_capabilities), ))) + + return token + +class UserAuthenticationError(Exception): + pass + + +if __name__ == '__main__': + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument('token', help='Please supply a token to get the user') + + args = parser.parse_args() # script will exit if arguments not present + + authenticated_user_id = User.authenticate(SecureVariable(args.token)) + print("User is", User(authenticated_user_id)) diff --git a/migrations/2024_08_22_authentication.sql b/migrations/2024_08_22_authentication.sql new file mode 100644 index 000000000..1ab87f2c5 --- /dev/null +++ b/migrations/2024_08_22_authentication.sql @@ -0,0 +1,34 @@ +CREATE TABLE users ( + id SERIAL PRIMARY KEY, + name text, + token text NOT NULL, + capabilities JSONB NOT NULL, + created_at timestamp with time zone DEFAULT now(), + updated_at timestamp with time zone +); + +ALTER TABLE "jobs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "timeline_projects" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "runs" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "ci_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "hog_measurements" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "carbondb_energy_data" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; +ALTER TABLE "carbondb_energy_data_day" ADD COLUMN "user_id" integer REFERENCES users(id) ON DELETE SET NULL ON UPDATE CASCADE; + + +CREATE UNIQUE INDEX name_unique ON users(name text_ops); +CREATE UNIQUE INDEX token_unique ON users(token text_ops); + +INSERT INTO "users"("id","name","token","capabilities","created_at","updated_at") +VALUES +(1,E'DEFAULT',E'89dbf71048801678ca4abfbaa3ea8f7c651aae193357a3e23d68e21512cd07f5',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurement":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); + +CREATE TRIGGER users_moddatetime + BEFORE UPDATE ON users + FOR EACH ROW + EXECUTE PROCEDURE moddatetime (updated_at); + +-- Default password for authentication is DEFAULT +INSERT INTO "public"."machines"("description", "available") +VALUES +(E'Local machine', true); \ No newline at end of file diff --git a/optimization_providers/durations/container.py b/optimization_providers/durations/container.py index 3d1fd0a62..cb3940962 100644 --- a/optimization_providers/durations/container.py +++ b/optimization_providers/durations/container.py @@ -8,7 +8,7 @@ MAX_BOOT_DURATION = 5 # 5 seconds # pylint: disable=unused-argument -@register_reporter('container-build-time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) +@register_reporter('container_build_time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) def container_build_time(self, run, measurements, repo_path, network, notes, phases): installation_phase = run['phases'][1] @@ -25,7 +25,7 @@ def container_build_time(self, run, measurements, repo_path, network, notes, pha ) # pylint: disable=unused-argument -@register_reporter('container-boot-time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) +@register_reporter('container_boot_time', Criticality.INFO, REPORTER_NAME, REPORTER_ICON, req_providers =[]) def container_boot_time(self, run, measurements, repo_path, network, notes, phases): boot_phase = run['phases'][2] diff --git a/runner.py b/runner.py index 17b0411ff..2dd6c0f5b 100755 --- a/runner.py +++ b/runner.py @@ -52,7 +52,8 @@ def __init__(self, debug_mode=False, allow_unsafe=False, skip_system_checks=False, skip_unsafe=False, verbose_provider_boot=False, full_docker_prune=False, dev_no_sleeps=False, dev_no_build=False, dev_no_metrics=False, - dev_flow_timetravel=False, dev_no_optimizations=False, docker_prune=False, job_id=None): + dev_flow_timetravel=False, dev_no_optimizations=False, docker_prune=False, job_id=None, + user_id=None, measurement_flow_process_duration=None, measurement_total_duration=None): if skip_unsafe is True and allow_unsafe is True: raise RuntimeError('Cannot specify both --skip-unsafe and --allow-unsafe') @@ -88,6 +89,10 @@ def __init__(self, self._run_id = None self._commit_hash = None self._commit_timestamp = None + self._user_id = user_id + self._measurement_flow_process_duration = measurement_flow_process_duration + self._measurement_total_duration = measurement_total_duration + self._last_measurement_duration = 0 del self._arguments['self'] # self is not needed and also cannot be serialzed. We remove it @@ -125,10 +130,19 @@ def initialize_run(self): # we also update the branch here again, as this might not be main in case of local filesystem self._run_id = DB().fetch_one(""" - INSERT INTO runs (job_id, name, uri, email, branch, filename, commit_hash, commit_timestamp, runner_arguments, created_at) - VALUES (%s, %s, %s, 'manual', %s, %s, %s, %s, %s, NOW()) + INSERT INTO runs ( + job_id, name, uri, email, branch, filename, commit_hash, + commit_timestamp, runner_arguments, user_id, created_at + ) + VALUES ( + %s, %s, %s, 'manual', %s, %s, %s, + %s, %s, %s, NOW() + ) RETURNING id - """, params=(self._job_id, self._name, self._uri, self._branch, self._original_filename, self._commit_hash, self._commit_timestamp, json.dumps(self._arguments)))[0] + """, params=( + self._job_id, self._name, self._uri, self._branch, self._original_filename, self._commit_hash, + self._commit_timestamp, json.dumps(self._arguments), self._user_id + ))[0] return self._run_id def get_optimizations_ignore(self): @@ -542,7 +556,6 @@ def clean_image_name(self, name): return name def build_docker_images(self): - config = GlobalConfig().config print(TerminalColors.HEADER, '\nBuilding Docker images', TerminalColors.ENDC) # Create directory /tmp/green-metrics-tool/docker_images @@ -598,7 +611,10 @@ def build_docker_images(self): print(' '.join(docker_build_command)) - ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', timeout=config['measurement']['total-duration'], check=False) + if self._measurement_total_duration: + ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', timeout=self._measurement_total_duration, check=False) + else: + ps = subprocess.run(docker_build_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='UTF-8', check=False) if ps.returncode != 0: print(f"Error: {ps.stderr} \n {ps.stdout}") @@ -1053,9 +1069,8 @@ def start_metric_providers(self, allow_container=True, allow_other=True): raise RuntimeError(f"Stderr on {metric_provider.__class__.__name__} was NOT empty: {stderr_read}") def check_total_runtime_exceeded(self): - config = GlobalConfig().config - if (time.time() - self.__start_measurement_seconds) > config['measurement']['total-duration']: - raise TimeoutError(f"Timeout of {config['measurement']['total-duration']} s was exceeded. This can be configured in 'total-duration'.") + if self._measurement_total_duration and (time.time() - self.__start_measurement_seconds) > self._measurement_total_duration: + raise TimeoutError(f"Timeout of {self._measurement_total_duration} s was exceeded. This can be configured in the user authentication for 'total-duration'.") def start_phase(self, phase, transition = True): config = GlobalConfig().config @@ -1096,8 +1111,6 @@ def end_phase(self, phase): self.__notes_helper.add_note({'note': f"Ending phase {phase}", 'detail_name': '[NOTES]', 'timestamp': phase_time}) def run_flows(self): - config = GlobalConfig().config - # run the flows ps_to_kill_tmp = [] ps_to_read_tmp = [] exception_occured = False @@ -1147,7 +1160,7 @@ def run_flows(self): if cmd_obj.get('detach', False) is True: - print('Process should be detached. Running asynchronously and detaching ...') + print('Executing process asynchronously and detaching ...') #pylint: disable=consider-using-with,subprocess-popen-preexec-fn ps = subprocess.Popen( docker_exec_command, @@ -1163,15 +1176,25 @@ def run_flows(self): ps_to_kill_tmp.append({'ps': ps, 'cmd': cmd_obj['command'], 'ps_group': False}) else: - print(f"Process should be synchronous. Alloting {config['measurement']['flow-process-duration']}s runtime ...") - ps = subprocess.run( - docker_exec_command, - stderr=stderr_behaviour, - stdout=stdout_behaviour, - encoding='UTF-8', - check=False, # cause it will be checked later and also ignore-errors checked - timeout=config['measurement']['flow-process-duration'], - ) + print('Executing process synchronously.') + if self._measurement_flow_process_duration: + print(f"Alloting {self._measurement_flow_process_duration}s runtime ...") + ps = subprocess.run( + docker_exec_command, + stderr=stderr_behaviour, + stdout=stdout_behaviour, + encoding='UTF-8', + check=False, # cause it will be checked later and also ignore-errors checked + timeout=self._measurement_flow_process_duration, + ) + else: + ps = subprocess.run( + docker_exec_command, + stderr=stderr_behaviour, + stdout=stdout_behaviour, + encoding='UTF-8', + check=False, # cause it will be checked later and also ignore-errors checked + ) ps_to_read_tmp.append({ 'cmd': docker_exec_command, @@ -1349,6 +1372,8 @@ def update_start_and_end_times(self): SET start_measurement=%s, end_measurement=%s WHERE id = %s """, params=(self.__start_measurement, self.__end_measurement, self._run_id)) + self._last_measurement_duration = self.__end_measurement - self.__start_measurement + def set_run_failed(self): if not self._run_id: diff --git a/tests/api/test_api.py b/tests/api/test_api.py index d889d6341..e0dccdcaa 100644 --- a/tests/api/test_api.py +++ b/tests/api/test_api.py @@ -2,15 +2,14 @@ import os import time from uuid import UUID -import pytest import requests CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) +from lib.user import User from lib.db import DB from lib import utils from lib.global_config import GlobalConfig -from tools.machine import Machine from tests import test_functions as Tests config = GlobalConfig(config_name='test-config.yml').config @@ -21,11 +20,6 @@ import hog_data -@pytest.fixture(autouse=True, name="register_machine") -def register_machine_fixture(): - machine = Machine(machine_id=1, description='test-machine') - machine.register() - def get_job_id(run_name): query = """ SELECT @@ -48,7 +42,7 @@ def test_post_run_add(): job_id = get_job_id(run_name) assert job_id is not None -def test_ci_measurement_add(): +def test_ci_measurement_add_default_user(): measurement = CI_Measurement(energy_value=123, energy_unit='mJ', repo='testRepo', @@ -65,18 +59,61 @@ def test_ci_measurement_add(): response = requests.post(f"{API_URL}/v1/ci/measurement/add", json=measurement.model_dump(), timeout=15) assert response.status_code == 201, Tests.assertion_info('success', response.text) query = """ - SELECT * FROM ci_measurements WHERE run_id = %s + SELECT * FROM ci_measurements WHERE run_id = %s -- we make * match to always test all columns. Even if we add some in the future """ data = DB().fetch_one(query, (measurement.run_id, ), fetch_mode='dict') + assert data is not None - for key in measurement.model_dump(): - if key == 'workflow': - assert data['workflow_id'] == measurement.model_dump()[key], Tests.assertion_info(f"workflow_id: {data['workflow_id']}", measurement.model_dump()[key]) - elif key in ['cb_company_uuid', 'cb_project_uuid', 'cb_machine_uuid']: + for key in data: + if key == 'workflow_id': + assert data[key] == measurement.model_dump()['workflow'], Tests.assertion_info(f"workflow_id: {data[key]}", measurement.model_dump()['workflow']) + elif key in ['id', 'cb_company_uuid', 'cb_project_uuid', 'cb_machine_uuid', 'created_at', 'updated_at']: pass + elif key == 'user_id': + assert data[key] == 1, Tests.assertion_info(1, f"{key}: {data[key]}") else: assert data[key] == measurement.model_dump()[key], Tests.assertion_info(f"{key}: {data[key]}", measurement.model_dump()[key]) +def test_ci_measurement_add_different_user(): + measurement = CI_Measurement(energy_value=123, + energy_unit='mJ', + repo='testRepo', + branch='testBranch', + cpu='testCPU', + cpu_util_avg=50, + commit_hash='1234asdf', + workflow='testWorkflow', + run_id='testRunID', + source='testSource', + label='testLabel', + duration=20, + workflow_name='testWorkflowName') + + DB().query(""" + INSERT INTO "public"."users"("id", "name","token","capabilities","created_at","updated_at") + VALUES + (2, E'PYTEST',E'ee8e09e43bceff39c9410f11a2392a3f6b868557240002b72dbdd22a2f792eef',E'{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurement":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}',E'2024-08-22 11:28:24.937262+00',NULL); + """) + + response = requests.post(f"{API_URL}/v1/ci/measurement/add", json=measurement.model_dump(), timeout=15, headers={'X-Authentication': 'PYTEST'}) + assert response.status_code == 201, Tests.assertion_info('success', response.text) + query = """ + SELECT * FROM ci_measurements WHERE run_id = %s -- we make * match to always test all columns. Even if we add some in the future + """ + data = DB().fetch_one(query, (measurement.run_id, ), fetch_mode='dict') + + assert data is not None + for key in data: + if key == 'workflow_id': + assert data[key] == measurement.model_dump()['workflow'], Tests.assertion_info(f"workflow_id: {data[key]}", measurement.model_dump()['workflow']) + elif key in ['id', 'cb_company_uuid', 'cb_project_uuid', 'cb_machine_uuid', 'created_at', 'updated_at']: + pass + elif key == 'user_id': + assert data[key] == 2, Tests.assertion_info(3, f"{key}: {data[key]}") + else: + assert data[key] == measurement.model_dump()[key], Tests.assertion_info(f"{key}: {data[key]}", measurement.model_dump()[key]) + + def test_ci_measurement_add_co2(): measurement = CI_Measurement(energy_value=123, @@ -212,3 +249,26 @@ def test_carbonDB_add(): data = DB().fetch_one('SELECT * FROM carbondb_energy_data', fetch_mode='dict') assert data is not None or data != [] assert exp_data == {key: data[key] for key in exp_data if key in data}, "The specified keys do not have the same values in both dictionaries." + +def test_route_forbidden(): + user = User(1) + user._capabilities['api']['routes'] = [] + user.update() + + response = requests.get(f"{API_URL}/v1/authentication/data", timeout=15) + assert response.status_code == 401 + assert response.text == '{"success":false,"err":"Route not allowed"}' + +def test_can_read_authentication_data(): + response = requests.get(f"{API_URL}/v1/authentication/data", timeout=15) + assert response.status_code == 200 + assert response.text == '{"success":true,"data":{"_id":1,"_name":"DEFAULT","_capabilities":{"api":{"quotas":{},"routes":["/v1/carbondb/add","/v1/ci/measurement/add","/v1/software/add","/v1/hog/add","/v1/authentication/data"]},"data":{"runs":{"retention":2678400},"hog_tasks":{"retention":2678400},"measurements":{"retention":2678400},"hog_coalitions":{"retention":2678400},"ci_measurements":{"retention":2678400},"hog_measurements":{"retention":2678400}},"jobs":{"schedule_modes":["one-off","daily","weekly","commit","variance"]},"machines":[1],"measurement":{"quotas":{},"settings":{"total-duration":86400,"flow-process-duration":86400}},"optimizations":["container_memory_utilization","container_cpu_utilization","message_optimization","container_build_time","container_boot_time","container_image_size"]}}}' + +def test_api_quota_exhausted(): + user = User(1) + user._capabilities['api']['quotas'] = {'/v1/authentication/data': 0} + user.update() + + response = requests.get(f"{API_URL}/v1/authentication/data", timeout=15) + assert response.status_code == 401 + assert response.text == '{"success":false,"err":"Quota exceeded"}' diff --git a/tests/conftest.py b/tests/conftest.py index 111c76f9d..667c60390 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,6 @@ import pytest -from lib.db import DB +from tests import test_functions as Tests ## VERY IMPORTANT to override the config file here ## otherwise it will automatically connect to non-test DB and delete all your real data @@ -12,14 +12,15 @@ def pytest_collection_modifyitems(items): if item.fspath.basename == 'test_functions.py': item.add_marker(pytest.mark.skip(reason='Skipping this file')) -# should we hardcode test-db here? + +# Note: This fixture runs always +# Pytest collects all fixtures before running any tests +# no matter which order they are loaded in @pytest.fixture(autouse=True) def cleanup_after_test(): yield - tables = DB().fetch_all("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") - for table in tables: - table_name = table[0] - DB().query(f'TRUNCATE TABLE "{table_name}" RESTART IDENTITY CASCADE') + Tests.reset_db() + ### If you wish to turn off the above auto-cleanup per test, include the following in your ### test module: diff --git a/tests/lib/test_client.py b/tests/lib/test_client.py new file mode 100644 index 000000000..19cc232fe --- /dev/null +++ b/tests/lib/test_client.py @@ -0,0 +1,35 @@ +import os +import subprocess + +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) + +from lib import utils +from lib.global_config import GlobalConfig +from lib.job.base import Job +from tests import test_functions as Tests + +GlobalConfig().override_config(config_name='test-config.yml') +config = GlobalConfig().config + +def test_simple_cluster_run(): + name = utils.randomword(12) + url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' + filename = 'usage_scenario.yml' + branch = 'main' + machine_id = 1 + + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + + ps = subprocess.run( + ['python3', '../tools/client.py', '--testing', '--config-override', 'test-config.yml'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + assert ps.stderr == '', Tests.assertion_info('No Error', ps.stderr) + assert 'Successfully ended testing run of client.py' in ps.stdout,\ + Tests.assertion_info('Successfully ended testing run of client.py', ps.stdout) + + assert 'MEASUREMENT SUCCESSFULLY COMPLETED' in ps.stdout,\ + Tests.assertion_info('MEASUREMENT SUCCESSFULLY COMPLETED', ps.stdout) diff --git a/tests/lib/test_diff.py b/tests/lib/test_diff.py index 86b27ce9c..6872f5b62 100644 --- a/tests/lib/test_diff.py +++ b/tests/lib/test_diff.py @@ -11,7 +11,7 @@ # to the diffing. To prevent this, this Unit test checks if the table column signature is unchanged def test_run_signature(): - expected_signature = 'id,job_id,name,uri,branch,commit_hash,commit_timestamp,email,categories,usage_scenario,filename,machine_specs,runner_arguments,machine_id,gmt_hash,measurement_config,start_measurement,end_measurement,phases,logs,invalid_run,failed,created_at,updated_at' + expected_signature = 'id,job_id,name,uri,branch,commit_hash,commit_timestamp,email,categories,usage_scenario,filename,machine_specs,runner_arguments,machine_id,gmt_hash,measurement_config,start_measurement,end_measurement,phases,logs,invalid_run,failed,user_id,created_at,updated_at' current_signature = DB().fetch_all("SELECT column_name FROM information_schema.columns WHERE table_name = 'runs' ORDER BY ordinal_position;") current_signature = ",".join([x[0] for x in current_signature]) diff --git a/tests/lib/test_jobs.py b/tests/lib/test_jobs.py index 0b2de1e92..bdb3e4917 100644 --- a/tests/lib/test_jobs.py +++ b/tests/lib/test_jobs.py @@ -9,23 +9,18 @@ from lib.db import DB from lib import utils from lib.global_config import GlobalConfig -from tools.machine import Machine from lib.job.base import Job +from lib.user import User from tests import test_functions as Tests GlobalConfig().override_config(config_name='test-config.yml') config = GlobalConfig().config -@pytest.fixture(autouse=True, name="register_machine") -def register_machine_fixture(): - machine = Machine(machine_id=1, description='test-machine') - machine.register() - - # This should be done once per module @pytest.fixture(autouse=True, scope="module", name="build_image") def build_image_fixture(): - subprocess.run(['docker', 'compose', '-f', f"{CURRENT_DIR}/../data/stress-application/compose.yml", 'build'], check=True) + Tests.build_image_fixture() + def get_job(job_id): query = """ @@ -71,19 +66,19 @@ def test_insert_job(): branch = 'main' machine_id = 1 - job_id = Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + job_id = Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) assert job_id is not None job = Job.get_job('run') assert job._state == 'WAITING' -def test_simple_run_job(): +def test_simple_run_job_no_quota(): name = utils.randomword(12) url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' filename = 'usage_scenario.yml' branch = 'main' machine_id = 1 - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) ps = subprocess.run( ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], @@ -99,28 +94,33 @@ def test_simple_run_job(): assert 'MEASUREMENT SUCCESSFULLY COMPLETED' in ps.stdout,\ Tests.assertion_info('MEASUREMENT SUCCESSFULLY COMPLETED', ps.stdout) -def test_simple_cluster_run(): +def test_simple_run_job_quota_gets_deducted(): name = utils.randomword(12) url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' filename = 'usage_scenario.yml' branch = 'main' machine_id = 1 - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + + user = User(1) + user._capabilities['measurement']['quotas'] = {'1': 10_000 * 60} # typical quota is 10.000 minutes + user.update() ps = subprocess.run( - ['python3', '../tools/client.py', '--testing', '--config-override', 'test-config.yml'], + ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE, encoding='UTF-8' ) - assert ps.stderr == '', Tests.assertion_info('No Error', ps.stderr) - assert 'Successfully ended testing run of client.py' in ps.stdout,\ - Tests.assertion_info('Successfully ended testing run of client.py', ps.stdout) + assert ps.stderr == '', Tests.assertion_info('No Error', ps.stderr) + assert 'Successfully processed jobs queue item.' in ps.stdout,\ + Tests.assertion_info('Successfully processed jobs queue item.', ps.stdout) assert 'MEASUREMENT SUCCESSFULLY COMPLETED' in ps.stdout,\ Tests.assertion_info('MEASUREMENT SUCCESSFULLY COMPLETED', ps.stdout) + assert User(1)._capabilities['measurement']['quotas']['1'] < 10_000 * 60 def test_simple_run_job_missing_filename_branch(): name = utils.randomword(12) @@ -128,7 +128,7 @@ def test_simple_run_job_missing_filename_branch(): machine_id = 1 with pytest.raises(RuntimeError): - Job.insert('run', name=name, url=url, email=None, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, machine_id=machine_id) def test_simple_run_job_wrong_machine_id(): @@ -139,7 +139,54 @@ def test_simple_run_job_wrong_machine_id(): machine_id = 100 with pytest.raises(psycopg.errors.ForeignKeyViolation): - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + +def test_measurement_quota_exhausted(): + name = utils.randomword(12) + url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' + filename = 'usage_scenario.yml' + branch = 'main' + machine_id = 1 + + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + + user = User(1) + user._capabilities['measurement']['quotas'] = {'1': 2678400} + user.update() + user.deduct_measurement_quota(machine_id=machine_id, amount=2678400) + + ps = subprocess.run( + ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + + assert 'Your user does not have enough measurement quota to run a job on the selected machine. Machine ID: 1' in ps.stderr, Tests.assertion_info('Quota exhaused', ps.stderr) + +def test_machine_not_allowed(): + name = utils.randomword(12) + url = 'https://github.com/green-coding-berlin/pytest-dummy-repo' + filename = 'usage_scenario.yml' + branch = 'main' + machine_id = 1 + Job.insert('run', user_id=1, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + + user = User(1) + user._capabilities['machines'] = [] + user.update() + + ps = subprocess.run( + ['python3', '../tools/jobs.py', 'run', '--config-override', 'test-config.yml'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + + assert 'Your user does not have the permissions to use the selected machine. Machine ID: 1' in ps.stderr, Tests.assertion_info('Machine forbidden', ps.stderr) + #pylint: disable=unused-variable # for the time being, until I get the mocking to work @@ -152,6 +199,7 @@ def todo_test_simple_email_job(): Job.insert( 'email', + user_id=1, email=email, name=subject, message=message, diff --git a/tests/smoke_test.py b/tests/smoke_test.py index e52b68a72..f6d88d2e3 100644 --- a/tests/smoke_test.py +++ b/tests/smoke_test.py @@ -11,6 +11,7 @@ from lib.db import DB from lib import utils from lib.global_config import GlobalConfig +from tests import test_functions as Tests from runner import Runner run_stderr = None @@ -28,10 +29,7 @@ def cleanup_after_test(): @pytest.fixture(autouse=True, scope='module') def cleanup_after_module(): yield - tables = DB().fetch_all("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'") - for table in tables: - table_name = table[0] - DB().query(f'TRUNCATE TABLE "{table_name}" RESTART IDENTITY CASCADE') + Tests.reset_db() # Runs once per file before any test( #pylint: disable=expression-not-assigned diff --git a/tests/test_config_opts.py b/tests/test_config_opts.py index 1d83f0812..abc5556df 100644 --- a/tests/test_config_opts.py +++ b/tests/test_config_opts.py @@ -16,11 +16,9 @@ def test_global_timeout(): - total_duration_new = 1 - total_duration_before = GlobalConfig().config['measurement']['total-duration'] - GlobalConfig().config['measurement']['total-duration'] = total_duration_new + measurement_total_duration = 1 - runner = Runner(uri=GMT_DIR, uri_type='folder', filename='tests/data/usage_scenarios/basic_stress.yml', skip_system_checks=True, dev_no_build=False, dev_no_sleeps=True, dev_no_metrics=True) + runner = Runner(uri=GMT_DIR, uri_type='folder', filename='tests/data/usage_scenarios/basic_stress.yml', skip_system_checks=True, dev_no_build=False, dev_no_sleeps=True, dev_no_metrics=True, measurement_total_duration=1) out = io.StringIO() err = io.StringIO() @@ -28,15 +26,13 @@ def test_global_timeout(): with redirect_stdout(out), redirect_stderr(err): runner.run() except subprocess.TimeoutExpired as e: - assert str(e).startswith("Command '['docker', 'run', '--rm', '-v',") and f"timed out after {total_duration_new} seconds" in str(e), \ - Tests.assertion_info(f"Command '['docker', 'run', '--rm', '-v', ... timed out after {total_duration_new} seconds", str(e)) + assert str(e).startswith("Command '['docker', 'run', '--rm', '-v',") and f"timed out after {measurement_total_duration} seconds" in str(e), \ + Tests.assertion_info(f"Command '['docker', 'run', '--rm', '-v', ... timed out after {measurement_total_duration} seconds", str(e)) return except TimeoutError as e: - assert str(e) == f"Timeout of {total_duration_new} s was exceeded. This can be configured in 'total-duration'.", \ - Tests.assertion_info(f"Timeout of {total_duration_new} s was exceeded. This can be configured in 'total-duration'.", str(e)) + assert str(e) == f"Timeout of {measurement_total_duration} s was exceeded. This can be configured in the user authentication for 'total-duration'.", \ + Tests.assertion_info(f"Timeout of {measurement_total_duration} s was exceeded. This can be configured in the user authentication for 'total-duration'.", str(e)) return - finally: - GlobalConfig().config['measurement']['total-duration'] = total_duration_before # reset assert False, \ Tests.assertion_info('Timeout was not raised', str(out.getvalue())) diff --git a/tests/test_functions.py b/tests/test_functions.py index ee4541ca4..aed9c33c3 100644 --- a/tests/test_functions.py +++ b/tests/test_functions.py @@ -1,5 +1,6 @@ import os import subprocess +from lib.db import DB CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -20,6 +21,20 @@ def check_if_container_running(container_name): return False return True +def build_image_fixture(): + subprocess.run(['docker', 'compose', '-f', f"{CURRENT_DIR}/data/stress-application/compose.yml", 'build'], check=True) + +# should be preceded by a yield statement and on autouse +def reset_db(): + DB().query('DROP schema "public" CASCADE') + subprocess.run( + ['docker', 'exec', '--user', 'postgres', 'test-green-coding-postgres-container', 'bash', '-c', 'psql --port 9573 < ./docker-entrypoint-initdb.d/structure.sql'], + check=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + encoding='UTF-8' + ) + class RunUntilManager: def __init__(self, runner): self.__runner = runner diff --git a/tools/client.py b/tools/client.py index 8f22f2af8..f2e321759 100644 --- a/tools/client.py +++ b/tools/client.py @@ -106,7 +106,7 @@ def do_cleanup(cur_temp, cooldown_time_after_job): while True: job = Job.get_job('run') if job and job.check_job_running(): - error_helpers.log_error('Job is still running. This is usually an error case! Continuing for now ...') + error_helpers.log_error('Job is still running. This is usually an error case! Continuing for now ...', machine=config_main['machine']['description']) time.sleep(client_main['sleep_time_no_job']) continue @@ -154,6 +154,7 @@ def do_cleanup(cur_temp, cooldown_time_after_job): if client_main['send_control_workload_status_mail'] and config_main['admin']['notification_email']: Job.insert( 'email', + user_id=None, email=config_main['admin']['notification_email'], name=f"{config_main['machine']['description']} is operating normally. All STDDEV below {cwl['threshold'] * 100} %", message='\n'.join(message) @@ -176,18 +177,18 @@ def do_cleanup(cur_temp, cooldown_time_after_job): except ConfigurationCheckError as exc: # ConfigurationChecks indicate that before the job ran, some setup with the machine was incorrect. So we soft-fail here with sleeps set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id) if exc.status == Status.WARN: # Warnings is something like CPU% too high. Here short sleep - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, sleep_duration=600) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, machine=config_main['machine']['description'], sleep_duration=600) time.sleep(600) else: # Hard fails won't resolve on it's own. We sleep until next cluster validation - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, sleep_duration=client_main['time_between_control_workload_validations']) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, status=exc.status, run_id=job._run_id, name=job._name, url=job._url, machine=config_main['machine']['description'], sleep_duration=client_main['time_between_control_workload_validations']) time.sleep(client_main['time_between_control_workload_validations']) except subprocess.CalledProcessError as exc: set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id) - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, stdout=exc.stdout, stderr=exc.stderr, run_id=job._run_id, name=job._name, url=job._url) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, stdout=exc.stdout, stderr=exc.stderr, run_id=job._run_id, machine=config_main['machine']['description'], name=job._name, url=job._url) except Exception as exc: set_status('job_error', current_temperature, last_cooldown_time, data=str(exc), run_id=job._run_id) - error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, run_id=job._run_id, name=job._name, url=job._url) + error_helpers.log_error('Job processing in cluster failed (client.py)', exception=exc, run_id=job._run_id, machine=config_main['machine']['description'], name=job._name, url=job._url) finally: if not args.testing: do_cleanup(current_temperature, last_cooldown_time) diff --git a/tools/jobs.py b/tools/jobs.py index 4a2116305..359f806d7 100644 --- a/tools/jobs.py +++ b/tools/jobs.py @@ -77,6 +77,7 @@ if job_main._email and not isinstance(exception, ConfigurationCheckError): Job.insert( 'email', + user_id=job_main._user_id, email=job_main._email, name='Measurement Job on Green Metrics Tool Cluster failed', message=f"Run-ID: {job_main._run_id}\nName: {job_main._name}\nMachine: {job_main._machine_description}\n\nDetails can also be found in the log under: {GlobalConfig().config['cluster']['metrics_url']}/stats.html?id={job_main._run_id}\n\nError message: {exception}\n" diff --git a/tools/prune_db.py b/tools/prune_db.py index f83d2d270..bed803fcf 100644 --- a/tools/prune_db.py +++ b/tools/prune_db.py @@ -12,19 +12,40 @@ import argparse parser = argparse.ArgumentParser() - parser.add_argument('--all', action='store_true', default=False, help='Will also remove successful runs') + parser.add_argument('mode', choices=['all', 'failed-runs', 'retention-expired'], default=False, help='Will also remove successful runs if all is used') args = parser.parse_args() # script will exit if arguments not present - if args.all: - print("This will remove ALL runs and measurement data from the DB. Continue? (y/N)") + if args.mode == 'all': + print("This will remove ALL runs, measurement, CI, carbonDB and hog data from the DB. Continue? (y/N)") answer = sys.stdin.readline() if answer.strip().lower() == 'y': - DB().query('DELETE FROM runs') + DB().query('TRUNCATE runs CASCADE') + DB().query('TRUNCATE ci_measurements CASCADE') + DB().query('TRUNCATE hog_measurements CASCADE') + DB().query('TRUNCATE carbondb_energy_data CASCADE') + DB().query('TRUNCATE carbondb_energy_data_day CASCADE') print("Done") - else: + elif args.mode == 'failed-runs': print("This will remove all runs that have not ended, which includes failed ones, but also possibly running, so be sure no measurement is currently active. Continue? (y/N)") answer = sys.stdin.readline() if answer.strip().lower() == 'y': DB().query('DELETE FROM runs WHERE end_measurement IS NULL') print("Done") + elif args.mode == 'retention-expired': + print("Getting all users on the system ...") + users = DB().fetch_all('SELECT * FROM users', fetch_mode='dict') + for user in users: + print('User:', user['name']) + print('Retention periods:') + for table, retention in user['capabilities']['data'].items(): + print("\t-", table, retention['retention']) + join_condition = 'WHERE' + if table == 'measurements': + join_condition = 'USING runs WHERE measurements.run_id = runs.id AND' + elif table in 'hog_coalitions': + join_condition = 'USING hog_measurements WHERE hog_coalitions.measurement = hog_measurements.id AND' + elif table in 'hog_tasks': + join_condition = 'USING hog_measurements, hog_tasks WHERE hog_coalitions.measurement = hog_measurements.id AND hog_tasks.coalition = hog_coalitions.id AND' + DB().query(f"DELETE FROM {table} {join_condition} user_id = {user['id']} AND {table}.created_at < NOW() - INTERVAL '{retention['retention']} SECONDS'") + print("Done") diff --git a/tools/timeline_projects.py b/tools/timeline_projects.py index 7b8ec0daf..c453b6c04 100644 --- a/tools/timeline_projects.py +++ b/tools/timeline_projects.py @@ -21,16 +21,16 @@ class TimelineProject(): #pylint:disable=redefined-outer-name @classmethod - def insert(cls, name, url, branch, filename, machine_id, schedule_mode): + def insert(cls, *, name, url, branch, filename, machine_id, user_id, schedule_mode): # Timeline projects never insert / use emails as they are always premium and made by admin # So they need no notification on success / add insert_query = """ INSERT INTO - timeline_projects (name, url, branch, filename, machine_id, schedule_mode, created_at) + timeline_projects (name, url, branch, filename, machine_id, user_id, schedule_mode, created_at) VALUES - (%s, %s, %s, %s, %s, %s, NOW()) RETURNING id; + (%s, %s, %s, %s, %s, %s, %s, NOW()) RETURNING id; """ - params = (name, url, branch, filename, machine_id, schedule_mode,) + params = (name, url, branch, filename, machine_id, user_id, schedule_mode,) return DB().fetch_one(insert_query, params=params)[0] @@ -62,33 +62,33 @@ def insert(cls, name, url, branch, filename, machine_id, schedule_mode): else: query = """ SELECT - id, name, url, branch, filename, machine_id, schedule_mode, last_scheduled, + id, name, url, branch, filename, machine_id, user_id, schedule_mode, last_scheduled, DATE(last_scheduled) >= DATE(NOW()) as "scheduled_today", DATE(last_scheduled) >= DATE(NOW() - INTERVAL '7 DAYS') as "scheduled_last_week" FROM timeline_projects """ data = DB().fetch_all(query) - for [project_id, name, url, branch, filename, machine_id, schedule_mode, last_scheduled, scheduled_today, scheduled_last_week] in data: + for [project_id, name, url, branch, filename, machine_id, user_id, schedule_mode, last_scheduled, scheduled_today, scheduled_last_week] in data: if not last_scheduled: print('Project was not scheduled yet ', url, branch, filename, machine_id) DB().query('UPDATE timeline_projects SET last_scheduled = NOW() WHERE id = %s', params=(project_id,)) - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=user_id, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) print('\tInserted ') elif schedule_mode == 'daily': print('Project is on daily schedule', url, branch, filename, machine_id) if scheduled_today is False: print('\tProject was not scheduled today', scheduled_today) DB().query('UPDATE timeline_projects SET last_scheduled = NOW() WHERE id = %s', params=(project_id,)) - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=user_id, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) print('\tInserted') elif schedule_mode == 'weekly': print('Project is on daily schedule', url, branch, filename, machine_id) if scheduled_last_week is False: print('\tProject was not scheduled in last 7 days', scheduled_last_week) DB().query('UPDATE timeline_projects SET last_scheduled = NOW() WHERE id = %s', params=(project_id,)) - Job.insert('run', name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) + Job.insert('run', user_id=user_id, name=name, url=url, email=None, branch=branch, filename=filename, machine_id=machine_id) print('\tInserted') elif schedule_mode == 'commit': - print('Project is on time schedule', url, branch, filename, machine_id) - print('This functionality is not yet implemented ...') + print('Project is on commit schedule', url, branch, filename, machine_id) + raise NotImplementedError('This functionality is not yet implemented ...')