diff --git a/CHANGELOG.md b/CHANGELOG.md index 2247c86..534e81a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [v4.1.0] - 2024-09-10 +- [pull/131](https://github.com/MAAP-Project/maap-api-nasa/pull/131) - Added query params to job list endpoint +- [pull/135](https://github.com/MAAP-Project/maap-api-nasa/pull/135) - User secret management +- [pull/137](https://github.com/MAAP-Project/maap-api-nasa/pull/137) - Organizations & job queues management +- [pull/136](https://github.com/MAAP-Project/maap-api-nasa/pull/136) - Add support for DPS sandbox queue +- [pull/132](https://github.com/MAAP-Project/maap-api-nasa/pull/132) - Remove {username} param from DPS job list endpoint + ## [v4.0.0] - 2024-06-26 - [issues/111](https://github.com/MAAP-Project/maap-api-nasa/issues/111) - Implement github actions CICD and convert to poetry based build - [pull/110](https://github.com/MAAP-Project/maap-api-nasa/pull/110) - Remove postgres from docker-compose @@ -16,4 +23,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [unreleased]: https://github.com/MAAP-Project/maap-api-nasa/v4.0.0...HEAD -[v4.0.0]: https://github.com/MAAP-Project/maap-api-nasa/compare/v3.1.5...v4.0.0 \ No newline at end of file +[v4.1.0]: https://github.com/MAAP-Project/maap-api-nasa/compare/v4.0.0...v4.1.0 +[v4.0.0]: https://github.com/MAAP-Project/maap-api-nasa/compare/v3.1.5...v4.0.0 diff --git a/api/endpoints/admin.py b/api/endpoints/admin.py index 77afed6..2ed587d 100755 --- a/api/endpoints/admin.py +++ b/api/endpoints/admin.py @@ -2,6 +2,7 @@ from flask_restx import Resource from flask import request from flask_api import status +from api.models.job_queue import JobQueue from api.models.role import Role from api.restplus import api from api.auth.security import login_required @@ -10,12 +11,107 @@ from api.schemas.pre_approved_schema import PreApprovedSchema from datetime import datetime import json - +from api.utils import job_queue from api.utils.http_util import err_response log = logging.getLogger(__name__) ns = api.namespace('admin', description='Operations related to the MAAP admin') +@ns.route('/job-queues') +class JobQueuesCls(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required(role=Role.ROLE_ADMIN) + def get(self): + """ + Lists the job queues and associated organizations + :return: + """ + all_queues = job_queue.get_all_queues() + return all_queues + + + @api.doc(security='ApiKeyAuth') + @login_required(role=Role.ROLE_ADMIN) + def post(self): + + """ + Create new job queue. + """ + + req_data = request.get_json() + if not isinstance(req_data, dict): + return err_response("Valid JSON body object required.") + + queue_name = req_data.get("queue_name", "") + if not isinstance(queue_name, str) or not queue_name: + return err_response("Valid queue name is required.") + + queue_description = req_data.get("queue_description", "") + if not isinstance(queue_description, str) or not queue_description: + return err_response("Valid queue description is required.") + + guest_tier = req_data.get("guest_tier", False) + is_default = req_data.get("is_default", False) + time_limit_minutes = req_data.get("time_limit_minutes", 0) + orgs = req_data.get("orgs", []) + + new_queue = job_queue.create_queue(queue_name, queue_description, guest_tier, is_default, time_limit_minutes, orgs) + return new_queue + + +@ns.route('/job-queues/') +class JobQueueCls(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() + def put(self, queue_id): + + """ + Update job queue. Only supplied fields are updated. + """ + + if not queue_id: + return err_response("Job queue id is required.") + + req_data = request.get_json() + if not isinstance(req_data, dict): + return err_response("Valid JSON body object required.") + + queue = db.session.query(JobQueue).filter_by(id=queue_id).first() + + if queue is None: + return err_response(msg="No job queue found with id " + queue_id) + + queue.queue_name = req_data.get("queue_name", queue.queue_name) + queue.queue_description = req_data.get("queue_description", queue.queue_description) + queue.guest_tier = req_data.get("guest_tier", queue.guest_tier) + queue.is_default = req_data.get("is_default", queue.is_default) + queue.time_limit_minutes = req_data.get("time_limit_minutes", queue.time_limit_minutes) + orgs = req_data.get("orgs", []) + + updated_queue = job_queue.update_queue(queue, orgs) + return updated_queue + + + @api.doc(security='ApiKeyAuth') + @login_required(role=Role.ROLE_ADMIN) + def delete(self, queue_id): + """ + Delete job queue + """ + + queue = db.session.query(JobQueue).filter_by(id=queue_id).first() + queue_name = queue.queue_name + + if queue is None: + return err_response(msg="Job queue does not exist") + + job_queue.delete_queue(queue_id) + + return {"code": status.HTTP_200_OK, "message": "Successfully deleted {}.".format(queue_name)} + + @ns.route('/pre-approved') class PreApprovedEmails(Resource): diff --git a/api/endpoints/algorithm.py b/api/endpoints/algorithm.py index 9ec7110..bd573c9 100644 --- a/api/endpoints/algorithm.py +++ b/api/endpoints/algorithm.py @@ -1,5 +1,8 @@ import logging import os +from collections import namedtuple + +import sqlalchemy from flask import request, Response from flask_restx import Resource, reqparse from flask_api import status @@ -10,6 +13,7 @@ import traceback import api.utils.github_util as git import api.utils.hysds_util as hysds +import api.utils.http_util as http_util import api.settings as settings import api.utils.ogc_translate as ogc from api.auth.security import get_authorized_user, login_required @@ -19,6 +23,8 @@ from datetime import datetime import json +from api.utils import job_queue + log = logging.getLogger(__name__) ns = api.namespace('mas', description='Operations to register an algorithm') @@ -255,12 +261,16 @@ def post(self): try: # validate if input queue is valid - if resource is None: resource = settings.DEFAULT_QUEUE - if resource not in hysds.get_mozart_queues(): - response_body["code"] = status.HTTP_500_INTERNAL_SERVER_ERROR - response_body["message"] = "The resource {} is invalid. Please select from one of {}".format(resource, hysds.get_mozart_queues()) - response_body["error"] = "Invalid queue in request: {}".format(req_data) - return response_body, 500 + user = get_authorized_user() + if resource is None: + resource = job_queue.get_default_queue().queue_name + else: + valid_queues = job_queue.get_user_queues(user.id) + valid_queue_names = list(map(lambda q: q.queue_name, valid_queues)) + if resource not in valid_queue_names: + return http_util.err_response(msg=f"User does not have permissions for resource {resource}." + f"Please select from one of {valid_queue_names}", + code=status.HTTP_400_BAD_REQUEST) # clean up any old specs from the repo repo = git.clean_up_git_repo(repo, repo_name=settings.REPO_NAME) # creating hysds-io file @@ -311,6 +321,7 @@ def post(self): hysds.write_file("{}/{}".format(settings.REPO_PATH, settings.REPO_NAME), "job-submission.json", job_submission_json) logging.debug("Created spec files") + except Exception as ex: tb = traceback.format_exc() response_body["code"] = status.HTTP_500_INTERNAL_SERVER_ERROR @@ -411,6 +422,10 @@ def get(self, algo_id): try: job_type = "job-{}".format(algo_id) response = hysds.get_job_spec(job_type) + if response is None: + return Response(ogc.get_exception(type="FailedSearch", origin_process="DescribeProcess", + ex_message="Algorithm not found. {}".format(job_type)), + status=status.HTTP_404_NOT_FOUND,mimetype='text/xml') params = response.get("result").get("params") queue = response.get("result").get("recommended-queues")[0] response_body = ogc.describe_process_response(algo_id, params, queue) @@ -450,6 +465,9 @@ def delete(self, algo_id): @ns.route('/algorithm/resource') class ResourceList(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() def get(self): """ This function would query DPS to see what resources (named based on memory space) are available for @@ -458,9 +476,12 @@ def get(self): """ try: response_body = {"code": None, "message": None} - queues = hysds.get_mozart_queues() + user = get_authorized_user() + queues = job_queue.get_user_queues(user.id) + queue_names = list(map(lambda q: q.queue_name, queues)) + response_body["code"] = status.HTTP_200_OK - response_body["queues"] = queues + response_body["queues"] = queue_names response_body["message"] = "success" return response_body except Exception as ex: diff --git a/api/endpoints/job.py b/api/endpoints/job.py index 0e7474b..b170a63 100644 --- a/api/endpoints/job.py +++ b/api/endpoints/job.py @@ -5,6 +5,7 @@ from api.restplus import api import api.utils.hysds_util as hysds import api.utils.ogc_translate as ogc +import api.utils.job_queue as job_queue import api.settings as settings try: import urllib.parse as urlparse @@ -60,9 +61,13 @@ def post(self): try: dedup = "false" if dedup is None else dedup - queue = hysds.get_recommended_queue(job_type=job_type) if queue is None or queue is "" else queue - response = hysds.mozart_submit_job(job_type=job_type, params=params, dedup=dedup, queue=queue, - identifier=identifier) + user = get_authorized_user() + queue = job_queue.validate_or_get_queue(queue, job_type, user.id) + job_time_limit = hysds_io.get("result").get("soft_time_limit", 86400) + if job_queue.contains_time_limit(queue): + job_time_limit = int(queue.time_limit_minutes) * 60 + response = hysds.mozart_submit_job(job_type=job_type, params=params, dedup=dedup, queue=queue.queue_name, + identifier=identifier, job_time_limit=int(job_time_limit)) logging.info("Mozart Response: {}".format(json.dumps(response))) job_id = response.get("result") @@ -78,6 +83,10 @@ def post(self): return Response(ogc.status_response(job_id=job_id, job_status=job_status), mimetype='text/xml') else: raise Exception(response.get("message")) + except ValueError as ex: + logging.error(traceback.format_exc()) + return Response(ogc.get_exception(type="FailedJobSubmit", origin_process="Execute", + ex_message=str(ex)), status.HTTP_400_BAD_REQUEST) except Exception as ex: logging.info("Error submitting job: {}".format(ex)) return Response(ogc.get_exception(type="FailedJobSubmit", origin_process="Execute", diff --git a/api/endpoints/members.py b/api/endpoints/members.py index b1c576c..98231aa 100755 --- a/api/endpoints/members.py +++ b/api/endpoints/members.py @@ -4,6 +4,8 @@ from flask import request, jsonify, Response from flask_api import status from sqlalchemy.exc import SQLAlchemyError +from api.utils.organization import get_member_organizations +from api.models.role import Role from api.restplus import api import api.settings as settings from api.auth.security import get_authorized_user, login_required, valid_dps_request, edl_federated_request, \ @@ -42,18 +44,25 @@ class Member(Resource): @api.doc(security='ApiKeyAuth') @login_required() def get(self): - members = db.session.query( - Member_db.id, - Member_db.username, - Member_db.first_name, - Member_db.last_name, - Member_db.email, - Member_db.status, - Member_db.creation_date + + member_query = db.session.query( + Member_db, Role, + ).filter( + Member_db.role_id == Role.id ).order_by(Member_db.username).all() - member_schema = MemberSchema() - result = [json.loads(member_schema.dumps(m)) for m in members] + result = [{ + 'id': m.Member.id, + 'username': m.Member.username, + 'first_name': m.Member.first_name, + 'last_name': m.Member.last_name, + 'email': m.Member.email, + 'role_id': m.Member.role_id, + 'role_name': m.Role.role_name, + 'status': m.Member.status, + 'creation_date': m.Member.creation_date.strftime('%m/%d/%Y'), + } for m in member_query] + return result @@ -84,6 +93,7 @@ def get(self, key): if member is None: return err_response(msg="No member found with key " + key, code=status.HTTP_404_NOT_FOUND) + member_id = member.id member_schema = MemberSchema() result = json.loads(member_schema.dumps(member)) @@ -91,7 +101,7 @@ def get(self, key): pgt_ticket = db.session \ .query(MemberSession_db) \ .with_entities(MemberSession_db.session_key) \ - .filter_by(member_id=member.id) \ + .filter_by(member_id=member_id) \ .order_by(MemberSession_db.id.desc()) \ .first() @@ -117,6 +127,8 @@ def get(self, key): member_ssh_info_result = json.loads(member_schema.dumps(member)) result = json.loads(json.dumps(dict(result.items() | member_ssh_info_result.items()))) + result['organizations'] = get_member_organizations(member_id) + return result @api.doc(security='ApiKeyAuth') @@ -263,6 +275,7 @@ def put(self, key): member.public_ssh_key_modified_date = datetime.utcnow() member.public_ssh_key = req_data.get("public_ssh_key", member.public_ssh_key) member.public_ssh_key_name = req_data.get("public_ssh_key_name", member.public_ssh_key_name) + member.role_id = req_data.get("role_id", member.role_id) db.session.commit() member_schema = MemberSchema() @@ -367,9 +380,14 @@ def get(self): .filter_by(username=authorized_user.username) \ .first() + member_id = member.id + if 'proxy-ticket' in request.headers: member_schema = MemberSchema() - return json.loads(member_schema.dumps(member)) + result = json.loads(member_schema.dumps(member)) + result['organizations'] = get_member_organizations(member_id) + return result + if 'Authorization' in request.headers: return member @@ -625,26 +643,18 @@ def get(self, endpoint_uri): if maap_user is None: return Response('Unauthorized', status=status.HTTP_401_UNAUTHORIZED) else: - edc_response = get_edc_credentials(endpoint_uri=endpoint_uri, user_id=maap_user.id) - - try: - edc_response_json = json.loads(edc_response) - response = jsonify( - accessKeyId=edc_response_json['accessKeyId'], - secretAccessKey=edc_response_json['secretAccessKey'], - sessionToken=edc_response_json['sessionToken'], - expiration=edc_response_json['expiration'] - ) + json_response = get_edc_credentials(endpoint_uri=endpoint_uri, user_id=maap_user.id) - response.headers.add('Access-Control-Allow-Origin', '*') + response = jsonify( + accessKeyId=json_response['accessKeyId'], + secretAccessKey=json_response['secretAccessKey'], + sessionToken=json_response['sessionToken'], + expiration=json_response['expiration'] + ) - return response + response.headers.add('Access-Control-Allow-Origin', '*') - except ValueError as ex: - response_body = dict() - response_body["code"] = status.HTTP_500_INTERNAL_SERVER_ERROR - response_body["message"] = edc_response.decode("utf-8") - return response_body, status.HTTP_500_INTERNAL_SERVER_ERROR + return response @ns.route('/self/awsAccess/workspaceBucket') @@ -681,6 +691,16 @@ def get(self): "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload" ], + "Resource": [ + "arn:aws:s3:::{settings.WORKSPACE_BUCKET}/{maap_user.username}/*" + ] + }}, + {{ + "Sid": "GrantListAccess", + "Effect": "Allow", + "Action": [ + "s3:ListBucket" + ], "Resource": "arn:aws:s3:::{settings.WORKSPACE_BUCKET}", "Condition": {{ "StringLike": {{ @@ -775,4 +795,4 @@ def get_edc_credentials(endpoint_uri, user_id): else: edl_response = edl_federated_request(url=endpoint) - return edl_response.content + return edl_response.json() diff --git a/api/endpoints/organizations.py b/api/endpoints/organizations.py new file mode 100644 index 0000000..9bf6e97 --- /dev/null +++ b/api/endpoints/organizations.py @@ -0,0 +1,372 @@ +import logging +import sqlalchemy +from flask_restx import Resource +from flask import request +from flask_api import status +from collections import namedtuple +from sqlalchemy.exc import SQLAlchemyError +from api.models.job_queue import JobQueue +from api.models.organization import Organization as Organization_db +from api.models.organization_job_queue import OrganizationJobQueue +from api.models.organization_membership import OrganizationMembership as OrganizationMembership_db +from api.models.member import Member +from api.models.role import Role +from api.restplus import api +from api.auth.security import login_required, get_authorized_user +from api.maap_database import db +from api.schemas.organization_job_queue_schema import OrganizationJobQueueSchema +from api.schemas.organization_membership_schema import OrganizationMembershipSchema +from api.schemas.organization_schema import OrganizationSchema +from datetime import datetime +import json + +from api.utils import organization +from api.utils.http_util import err_response + +log = logging.getLogger(__name__) +ns = api.namespace('organizations', description='Operations related to the MAAP organizations') + +@ns.route('') +class Organizations(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() + def get(self): + """ + Lists the hierarchy of organizations using MAAP + :return: + """ + orgs = organization.get_organizations() + return orgs + + @api.doc(security='ApiKeyAuth') + @login_required() + def post(self): + """ + Create new organization + :return: + """ + + req_data = request.get_json() + if not isinstance(req_data, dict): + return err_response("Valid JSON body object required.") + + name = req_data.get("name", "") + if not isinstance(name, str) or not name: + return err_response("Valid org name is required.") + + root_org = db.session \ + .query(Organization_db) \ + .filter_by(parent_org_id=None) \ + .first() + + parent_org_id = req_data.get("parent_org_id", root_org.id) + if parent_org_id is None: + parent_org_id = root_org.id + + default_job_limit_count = req_data.get("default_job_limit_count", None) + default_job_limit_hours = req_data.get("default_job_limit_hours", None) + members = req_data.get("members", []) + + new_org = organization.create_organization(name, parent_org_id, default_job_limit_count, default_job_limit_hours, members) + + return new_org + + +@ns.route('/') +class Organization(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() + def get(self, org_id): + """ + Retrieve organization + """ + org = organization.get_organization(org_id) + + if org is None: + return err_response(msg="No organization found with id " + org_id, code=status.HTTP_404_NOT_FOUND) + + org_schema = OrganizationSchema() + result = json.loads(org_schema.dumps(org)) + + return result + + @api.doc(security='ApiKeyAuth') + @login_required() + def put(self, org_id): + + """ + Update organization. Only supplied fields are updated. + """ + + if not org_id: + return err_response("Org id is required.") + + req_data = request.get_json() + if not isinstance(req_data, dict): + return err_response("Valid JSON body object required.") + + org = db.session.query(Organization_db).filter_by(id=org_id).first() + + if org is None: + return err_response(msg="No org found with id " + org_id) + + org.name = req_data.get("name", org.name) + org.parent_org_id = req_data.get("parent_org_id", org.parent_org_id) + org.default_job_limit_count = req_data.get("default_job_limit_count", org.default_job_limit_count) + org.default_job_limit_hours = req_data.get("default_job_limit_hours", org.default_job_limit_hours) + members = req_data.get("members", []) + + updated_org = organization.update_organization(org, members) + return updated_org + + + + @api.doc(security='ApiKeyAuth') + @login_required() + def delete(self, org_id): + """ + Delete organization + """ + + org = organization.get_organization(org_id) + + if org is None: + return err_response(msg="Organization does not exist") + + org_name = org.name + organization.delete_organization(org.id) + + return {"code": status.HTTP_200_OK, "message": "Successfully deleted {}.".format(org_name)} + + +@ns.route('//membership') +class OrganizationMemberships(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() + def get(self, org_id): + """ + Retrieve organization members + """ + try: + org_members = db.session.query( + OrganizationMembership_db, Member, Organization_db, + ).filter( + OrganizationMembership_db.member_id == Member.id, + ).filter( + OrganizationMembership_db.org_id == Organization_db.id, + ).filter( + OrganizationMembership_db.org_id == org_id, + ).order_by(Member.username).all() + + result = [{ + 'org_id': om.organization.id + } for om in org_members] + + return result + except SQLAlchemyError as ex: + raise ex + + +@ns.route('//membership/') +class OrganizationMembership(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() + def post(self, org_id, username): + """ + Add organization member + :return: + """ + try: + req_data = request.get_json() + if not isinstance(req_data, dict): + return err_response("Valid JSON body object required.") + + member = get_authorized_user() + membership = db.session.query(OrganizationMembership_db).filter_by(member_id=member.id, + org_id=org_id).first() + + if member.role_id != Role.ROLE_ADMIN and not membership.org_maintainer: + return err_response("Must be an org maintainer to add members.", status.HTTP_403_FORBIDDEN) + + org_member = db.session.query(Member).filter_by(username=username).first() + + if org_member is None: + return err_response("Valid username is required.") + + membership_dup = db.session.query(OrganizationMembership_db).filter_by(member_id=org_member.id, + org_id=org_id).first() + + if membership_dup is not None: + return err_response("Member {} already exists in org {}".format(username, org_id)) + + job_limit_count = req_data.get("job_limit_count", None) + job_limit_hours = req_data.get("job_limit_hours", None) + org_maintainer = req_data.get("org_maintainer", False) + + new_org_membership = OrganizationMembership_db(org_id=org_id, member_id=org_member.id, + job_limit_count=job_limit_count, + job_limit_hours=job_limit_hours, + org_maintainer=org_maintainer, + creation_date=datetime.utcnow()) + + db.session.add(new_org_membership) + db.session.commit() + + org_schema = OrganizationMembershipSchema() + return json.loads(org_schema.dumps(new_org_membership)) + + except SQLAlchemyError as ex: + raise ex + + @api.doc(security='ApiKeyAuth') + @login_required() + def delete(self, org_id, username): + """ + Delete organization member + """ + try: + member = get_authorized_user() + membership = db.session.query(OrganizationMembership_db).filter_by(member_id=member.id, + org_id=org_id).first() + + if membership is None: + return err_response("Org id {} for user {} was not found.".format(org_id, member.username)) + + if not membership.org_maintainer and member.role_id != Role.ROLE_ADMIN: + return err_response("Must be an org maintainer to remove members.", status.HTTP_403_FORBIDDEN) + + member_to_delete = db.session.query(Member).filter_by(username=username).first() + + if member_to_delete is None: + return err_response("Member {} was not found.".format(username)) + + membership_to_delete = db.session.query(OrganizationMembership_db).filter_by(member_id=member_to_delete.id, + org_id=org_id).first() + + if membership_to_delete is None: + return err_response("Org id {} for user {} was not found.".format(org_id, member_to_delete.username)) + + db.session.query(OrganizationMembership_db).filter_by(member_id=member_to_delete.id, org_id=org_id).delete() + db.session.commit() + + return {"code": status.HTTP_200_OK, + "message": "Successfully removed {} from org {}.".format(member_to_delete.username, org_id)} + + except SQLAlchemyError as ex: + raise ex + + +@ns.route('//job_queues') +class OrganizationJobQueues(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() + def get(self, org_id): + """ + Retrieve organization members + """ + try: + org_queues = db.session.query( + OrganizationJobQueue, JobQueue, Organization_db, + ).filter( + OrganizationJobQueue.job_queue_id == JobQueue.id, + ).filter( + OrganizationJobQueue.org_id == Organization_db.id, + ).filter( + OrganizationJobQueue.org_id == org_id, + ).order_by(JobQueue.queue_name).all() + + result = [{ + 'org_id': om.organization.id + } for om in org_queues] + + return result + except SQLAlchemyError as ex: + raise ex + + +@ns.route('//job_queues/') +class OrganizationJobQueueCls(Resource): + + @api.doc(security='ApiKeyAuth') + @login_required() + def post(self, org_id, queue_name): + """ + Add organization member + :return: + """ + try: + req_data = request.get_json() + if not isinstance(req_data, dict): + return err_response("Valid JSON body object required.") + + member = get_authorized_user() + membership = db.session.query(OrganizationMembership_db).filter_by(member_id=member.id, + org_id=org_id).first() + + if member.role_id != Role.ROLE_ADMIN and not membership.org_maintainer: + return err_response("Must be an org maintainer to add queues.", status.HTTP_403_FORBIDDEN) + + org_queue = db.session.query(JobQueue).filter_by(queue_name=queue_name).first() + + if org_queue is None: + return err_response("Valid job queue is required.") + + org_queue_dup = db.session.query(OrganizationJobQueue).filter_by(job_queue_id=org_queue.id, + org_id=org_id).first() + + if org_queue_dup is not None: + return err_response("Job queue {} already exists in org {}".format(queue_name, org_id)) + + new_org_queue = OrganizationJobQueue(org_id=org_id, job_queue_id=org_queue.id, + creation_date=datetime.utcnow()) + + db.session.add(new_org_queue) + db.session.commit() + + org_schema = OrganizationJobQueueSchema() + return json.loads(org_schema.dumps(new_org_queue)) + + except SQLAlchemyError as ex: + raise ex + + @api.doc(security='ApiKeyAuth') + @login_required() + def delete(self, org_id, queue_name): + """ + Delete organization member + """ + try: + member = get_authorized_user() + membership = db.session.query(OrganizationMembership_db).filter_by(member_id=member.id, + org_id=org_id).first() + + if membership is None: + return err_response("Org id {} for user {} was not found.".format(org_id, member.username)) + + if not membership.org_maintainer and member.role_id != Role.ROLE_ADMIN: + return err_response("Must be an org maintainer to remove members.", status.HTTP_403_FORBIDDEN) + + queue_to_delete = db.session.query(JobQueue).filter_by(queue_name=queue_name).first() + + if queue_to_delete is None: + return err_response("Job queue {} was not found.".format(queue_name)) + + org_queue_to_delete = db.session.query(OrganizationJobQueue).filter_by(job_queue_id=queue_to_delete.id, + org_id=org_id).first() + + if org_queue_to_delete is None: + return err_response("Org id {} for job queue {} was not found.".format(org_id, queue_name)) + + db.session.query(OrganizationJobQueue).filter_by(job_queue_id=queue_to_delete.id, org_id=org_id).delete() + db.session.commit() + + return {"code": status.HTTP_200_OK, + "message": "Successfully removed {} from org {}.".format(queue_name, org_id)} + + except SQLAlchemyError as ex: + raise ex diff --git a/api/maapapp.py b/api/maapapp.py index fd4fcba..090de1a 100755 --- a/api/maapapp.py +++ b/api/maapapp.py @@ -15,6 +15,7 @@ from api.endpoints.wms import ns as wms_namespace from api.endpoints.members import ns as members_namespace from api.endpoints.environment import ns as environment_namespace +from api.endpoints.organizations import ns as organizations_namespace from api.endpoints.admin import ns as admin_namespace from api.restplus import api from api.maap_database import db @@ -38,8 +39,8 @@ app.config['CAS_USERNAME_SESSION_KEY'] = 'cas_token_session_key' app.config['SQLALCHEMY_DATABASE_URI'] = settings.DATABASE_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False -app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {'isolation_level': 'AUTOCOMMIT', 'pool_size': 300} -app.config['SQLALCHEMY_POOL_TIMEOUT'] = 300 +app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {'isolation_level': 'AUTOCOMMIT', 'pool_size': 10, 'pool_pre_ping': True} +app.config['SQLALCHEMY_POOL_TIMEOUT'] = 30 app.app_context().push() db.init_app(app) @@ -103,6 +104,7 @@ def initialize_app(flask_app): api.add_namespace(wms_namespace) api.add_namespace(members_namespace) api.add_namespace(environment_namespace) + api.add_namespace(organizations_namespace) api.add_namespace(admin_namespace) flask_app.register_blueprint(blueprint) diff --git a/api/models/job_queue.py b/api/models/job_queue.py new file mode 100644 index 0000000..e2bb4a2 --- /dev/null +++ b/api/models/job_queue.py @@ -0,0 +1,21 @@ +from api.models import Base +from api.maap_database import db + +class JobQueue(Base): + __tablename__ = 'job_queue' + + id = db.Column(db.Integer, primary_key=True) + queue_name = db.Column(db.String()) + queue_description = db.Column(db.String()) + # Whether the queue is available to public 'Guest' users + guest_tier = db.Column(db.Boolean()) + # Whether the queue is used as a default when no queues are specified + is_default = db.Column(db.Boolean()) + # The maximum time, in minutes, that jobs are allowed to run using this queue + time_limit_minutes = db.Column(db.Integer) + creation_date = db.Column(db.DateTime()) + + def __repr__(self): + return "".format(self=self) + + diff --git a/api/models/organization.py b/api/models/organization.py new file mode 100644 index 0000000..7aed319 --- /dev/null +++ b/api/models/organization.py @@ -0,0 +1,25 @@ +from api.models import Base +from api.maap_database import db + +class Organization(Base): + __tablename__ = 'organization' + + id = db.Column(db.Integer, primary_key=True) + name = db.Column(db.String()) + parent_org_id = db.Column(db.Integer, db.ForeignKey('organization.id')) + + # The maximum number of jobs that org members can run per the defined hour(s). + # Used in conjunction with default_job_limit_hours. + # A value of null or zero equates to unlimited jobs. + default_job_limit_count = db.Column(db.Integer) + + # The number of hours during which an org member can run their allotment of jobs. + # Used in conjunction with default_job_limit_count. + # A value of null or zero equates to unlimited hours. + default_job_limit_hours = db.Column(db.Integer) + creation_date = db.Column(db.DateTime()) + + def __repr__(self): + return "".format(self=self) + + diff --git a/api/models/organization_job_queue.py b/api/models/organization_job_queue.py new file mode 100644 index 0000000..bb9ca39 --- /dev/null +++ b/api/models/organization_job_queue.py @@ -0,0 +1,14 @@ +from api.models import Base +from api.maap_database import db + + +class OrganizationJobQueue(Base): + __tablename__ = 'organization_job_queue' + + id = db.Column(db.Integer, primary_key=True) + job_queue_id = db.Column(db.Integer, db.ForeignKey('job_queue.id'), nullable=False) + org_id = db.Column(db.Integer, db.ForeignKey('organization.id'), nullable=False) + creation_date = db.Column(db.DateTime()) + + def __repr__(self): + return "".format(self=self) \ No newline at end of file diff --git a/api/models/organization_membership.py b/api/models/organization_membership.py new file mode 100644 index 0000000..64f77e4 --- /dev/null +++ b/api/models/organization_membership.py @@ -0,0 +1,24 @@ +from api.models import Base +from api.maap_database import db + + +class OrganizationMembership(Base): + __tablename__ = 'organization_membership' + + id = db.Column(db.Integer, primary_key=True) + member_id = db.Column(db.Integer, db.ForeignKey('member.id'), nullable=False) + org_id = db.Column(db.Integer, db.ForeignKey('organization.id'), nullable=False) + org_maintainer = db.Column(db.Boolean()) + # The maximum number of jobs that this org member can run per the defined hour(s). + # Used in conjunction with job_limit_hours. + # A value of null or zero equates to unlimited jobs. + job_limit_count = db.Column(db.Integer) + + # The number of hours during which this org member can run their allotment of jobs. + # Used in conjunction with job_limit_count. + # A value of null or zero equates to unlimited hours. + job_limit_hours = db.Column(db.Integer) + creation_date = db.Column(db.DateTime()) + + def __repr__(self): + return "".format(self=self) \ No newline at end of file diff --git a/api/schemas/job_queue_schema.py b/api/schemas/job_queue_schema.py new file mode 100644 index 0000000..d451d92 --- /dev/null +++ b/api/schemas/job_queue_schema.py @@ -0,0 +1,11 @@ +from api.models.job_queue import JobQueue +from marshmallow_sqlalchemy import SQLAlchemyAutoSchema + + +class JobQueueSchema(SQLAlchemyAutoSchema): + class Meta: + model = JobQueue + include_relationships = True + load_instance = True + + diff --git a/api/schemas/organization_job_queue_schema.py b/api/schemas/organization_job_queue_schema.py new file mode 100644 index 0000000..5464465 --- /dev/null +++ b/api/schemas/organization_job_queue_schema.py @@ -0,0 +1,9 @@ +from api.models.organization_job_queue import OrganizationJobQueue +from marshmallow_sqlalchemy import SQLAlchemyAutoSchema + + +class OrganizationJobQueueSchema(SQLAlchemyAutoSchema): + class Meta: + model = OrganizationJobQueue + include_fk = True + load_instance = True diff --git a/api/schemas/organization_membership_schema.py b/api/schemas/organization_membership_schema.py new file mode 100644 index 0000000..628847a --- /dev/null +++ b/api/schemas/organization_membership_schema.py @@ -0,0 +1,9 @@ +from api.models.organization_membership import OrganizationMembership +from marshmallow_sqlalchemy import SQLAlchemyAutoSchema + + +class OrganizationMembershipSchema(SQLAlchemyAutoSchema): + class Meta: + model = OrganizationMembership + include_fk = True + load_instance = True diff --git a/api/schemas/organization_schema.py b/api/schemas/organization_schema.py new file mode 100644 index 0000000..ba71b75 --- /dev/null +++ b/api/schemas/organization_schema.py @@ -0,0 +1,9 @@ +from api.models.organization import Organization +from marshmallow_sqlalchemy import SQLAlchemyAutoSchema + + +class OrganizationSchema(SQLAlchemyAutoSchema): + class Meta: + model = Organization + include_fk = True + load_instance = True diff --git a/api/settings.py b/api/settings.py index c36e872..94dae86 100755 --- a/api/settings.py +++ b/api/settings.py @@ -5,7 +5,6 @@ def str2bool(v): MAAP_API_URL = os.getenv('MAAP_API_URL', "http://localhost:5000/api") -PROJECT_QUEUE_PREFIX = os.getenv('PROJECT_QUEUE_PREFIX', "maap") API_HOST_URL = os.getenv('API_HOST_URL', 'http://0.0.0.0:5000/') # Flask settings @@ -59,12 +58,12 @@ def str2bool(v): MOZART_URL = os.getenv('MOZART_URL', 'https://[MOZART_IP]/mozart/api/v0.2') MOZART_V1_URL = os.getenv('MOZART_V1_URL', 'https://[MOZART_IP]/mozart/api/v0.1') # new from sister GRQ_URL = os.getenv('GRQ_URL', 'http://[GRQ_IP]:8878/api/v0.1') # new from sister -DEFAULT_QUEUE = os.getenv('DEFAULT_QUEUE', 'test-job_worker-large') LW_QUEUE = os.getenv('LW_QUEUE', 'system-jobs-queue') HYSDS_LW_VERSION = os.getenv('HYSDS_LW_VERSION', 'v1.2.2') GRQ_REST_URL = os.getenv('GRQ_REST_URL', 'http://[GRQ_IP]/api/v0.1') S3_CODE_BUCKET = os.getenv('S3_CODE_BUCKET', 's3://[S3_BUCKET_NAME]') DPS_MACHINE_TOKEN = os.getenv('DPS_MACHINE_TOKEN', '') +PROJECT_QUEUE_PREFIX = os.getenv('PROJECT_QUEUE_PREFIX', "maap") # FASTBROWSE API TILER_ENDPOINT = os.getenv('TILER_ENDPOINT', 'https://d852m4cmf5.execute-api.us-east-1.amazonaws.com') diff --git a/api/utils/hysds_util.py b/api/utils/hysds_util.py index 97a8a71..fa48a73 100644 --- a/api/utils/hysds_util.py +++ b/api/utils/hysds_util.py @@ -7,6 +7,9 @@ import time import copy +import api.utils.job_queue +from api.models import job_queue + log = logging.getLogger(__name__) STATUS_JOB_STARTED = "job-started" @@ -202,7 +205,7 @@ def create_hysds_io(algorithm_description, inputs, verified=False, submission_ty hysds_io["params"] = params return hysds_io -def create_job_spec(run_command, inputs, disk_usage, queue_name=settings.DEFAULT_QUEUE, verified=False): +def create_job_spec(run_command, inputs, disk_usage, queue_name, verified=False): """ Creates the contents of the job spec file :param run_command: @@ -219,6 +222,7 @@ def create_job_spec(run_command, inputs, disk_usage, queue_name=settings.DEFAULT job_spec["imported_worker_files"] = { "$HOME/.netrc": "/home/ops/.netrc", "$HOME/.aws": "/home/ops/.aws", + "$HOME/verdi/etc/maap-dps.env": "/home/ops/.maap-dps.env", "/tmp": ["/tmp", "rw"] } job_spec["post"] = ["hysds.triage.triage"] @@ -410,7 +414,8 @@ def get_algorithms(): return maap_algo_list -def mozart_submit_job(job_type, params={}, queue=settings.DEFAULT_QUEUE, dedup="false", identifier="maap-api_submit"): +def mozart_submit_job(job_type, params={}, queue="", dedup="false", identifier="maap-api_submit", + job_time_limit=86400): """ Submit a job to Mozart :param job_type: @@ -418,6 +423,7 @@ def mozart_submit_job(job_type, params={}, queue=settings.DEFAULT_QUEUE, dedup=" :param queue: :param dedup: :param identifier: + :param job_time_limit: :return: """ @@ -443,6 +449,8 @@ def mozart_submit_job(job_type, params={}, queue=settings.DEFAULT_QUEUE, dedup=" params.pop('username', None) job_payload["params"] = json.dumps(params) job_payload["enable_dedup"] = dedup + job_payload["soft_time_limit"] = job_time_limit + job_payload["time_limit"] = job_time_limit logging.info("job payload: {}".format(json.dumps(job_payload))) @@ -549,7 +557,7 @@ def get_recommended_queue(job_type): response = get_job_spec(job_type) recommended_queues = response.get("result", None).get("recommended-queues", None) recommended_queue = recommended_queues[0] if type(recommended_queues) is list else None - return recommended_queue if recommended_queue != "" else settings.DEFAULT_QUEUE + return recommended_queue if recommended_queue != "" else api.utils.job_queue.get_default_queue().queue_name def validate_job_submit(hysds_io, user_params): @@ -588,8 +596,8 @@ def validate_job_submit(hysds_io, user_params): if known_params.get(p).get("default") is not None: validated_params[p] = known_params.get(p).get("default") else: - raise Exception("Parameter {} missing from inputs. Didn't find any default set for it in " - "algorithm specification. Please specify it and attempt to submit.".format(p)) + raise ValueError("Parameter {} missing from inputs. Didn't find any default set for it in " + "algorithm specification. Please specify it and attempt to submit.".format(p)) return validated_params @@ -776,8 +784,13 @@ def revoke_mozart_job(job_id, wait_for_completion=False): return poll_for_completion(lw_job_id) -def pele_get_product_by_id(id): - return - - - +def set_timelimit_for_dps_sandbox(params: dict, queue: job_queue): + """ + Sets the soft_time_limit and time_limit parameters for DPS sandbox queue + at job submission + :param params: + :param queue: Job queue + :return: params + """ + params.update({"soft_time_limit": queue.time_limit_minutes * 60, + "time_limit": queue.time_limit_minutes * 60}) diff --git a/api/utils/job_queue.py b/api/utils/job_queue.py new file mode 100644 index 0000000..aa50e08 --- /dev/null +++ b/api/utils/job_queue.py @@ -0,0 +1,230 @@ +import json +import logging +from collections import namedtuple +from datetime import datetime + +import sqlalchemy +from sqlalchemy.exc import SQLAlchemyError +from api.maap_database import db +from api.models import job_queue +from api.models.job_queue import JobQueue +from api.models.organization import Organization +from api.models.organization_job_queue import OrganizationJobQueue +import api.utils.hysds_util as hysds +from api.schemas.job_queue_schema import JobQueueSchema +from api import settings + +log = logging.getLogger(__name__) + + +def get_user_queues(user_id): + try: + user_queues = [] + query = """select jq.id, jq.queue_name, jq.is_default, jq.time_limit_minutes from organization_membership m + inner join public.organization_job_queue ojq on m.org_id = ojq.org_id + inner join public.job_queue jq on jq.id = ojq.job_queue_id + where m.member_id = {} + union + select id, queue_name, is_default, time_limit_minutes + from job_queue + where guest_tier = true""".format(user_id) + queue_list = db.session.execute(sqlalchemy.text(query)) + + Record = namedtuple('Record', queue_list.keys()) + queue_records = [Record(*r) for r in queue_list.fetchall()] + + for r in queue_records: + user_queues.append(JobQueue(id=r.id, queue_name=r.queue_name, is_default=r.is_default, time_limit_minutes=r.time_limit_minutes)) + + return user_queues + + except SQLAlchemyError as ex: + raise ex + + +def get_all_queues(): + try: + result = [] + + queues = db.session.query( + JobQueue.id, + JobQueue.queue_name, + JobQueue.queue_description, + JobQueue.guest_tier, + JobQueue.is_default, + JobQueue.time_limit_minutes, + JobQueue.creation_date + ).order_by(JobQueue.queue_name).all() + + orgs_query = db.session.query( + Organization, OrganizationJobQueue, + ).filter( + Organization.id == OrganizationJobQueue.org_id + ).order_by(Organization.name).all() + + hysds_queues = hysds.get_mozart_queues() + + for q in queues: + queue = { + 'id': q.id, + 'queue_name': q.queue_name, + 'queue_description': q.queue_description, + 'guest_tier': q.guest_tier, + 'is_default': q.is_default, + 'time_limit_minutes': q.time_limit_minutes, + 'status': 'Online' if q.queue_name in hysds_queues else 'Offline', + 'orgs': [], + 'creation_date': q.creation_date.strftime('%m/%d/%Y'), + } + + for o in orgs_query: + if o.OrganizationJobQueue.job_queue_id == q.id: + queue['orgs'].append({ + 'id': o.Organization.id, + 'org_name': o.Organization.name, + 'default_job_limit_count': o.Organization.default_job_limit_count, + 'default_job_limit_hours': o.Organization.default_job_limit_hours + }) + + result.append(queue) + + unassigned_queues = (hq for hq in hysds_queues if hq not in map(_queue_name, queues)) + for uq in unassigned_queues: + result.append({ + 'id': 0, + 'queue_name': uq, + 'queue_description': '', + 'guest_tier': False, + 'is_default': False, + 'time_limit_minutes': 0, + 'status': 'Unassigned', + 'orgs': [], + 'creation_date': None, + }) + + return result + except SQLAlchemyError as ex: + raise ex + + +def _queue_name(q): + return q.queue_name + + +def create_queue(queue_name, queue_description, guest_tier, is_default, time_limit_minutes, orgs): + try: + new_queue = JobQueue(queue_name=queue_name, queue_description=queue_description, guest_tier=guest_tier, + is_default=is_default, time_limit_minutes=time_limit_minutes, creation_date=datetime.utcnow()) + + db.session.add(new_queue) + db.session.commit() + + if is_default: + _reset_queue_default(new_queue.id) + + queue_orgs = [] + for queue_org in orgs: + queue_orgs.append(OrganizationJobQueue(org_id=queue_org['org_id'], job_queue_id=new_queue.id, + creation_date=datetime.utcnow())) + + if len(queue_orgs) > 0: + db.session.add_all(queue_orgs) + db.session.commit() + + org_schema = JobQueueSchema() + return json.loads(org_schema.dumps(new_queue)) + + except SQLAlchemyError as ex: + raise ex + + +def update_queue(queue, orgs): + try: + # Update queue + db.session.commit() + + if queue.is_default: + _reset_queue_default(queue.id) + + # Update org assignments + db.session.execute( + db.delete(OrganizationJobQueue).filter_by(job_queue_id=queue.id) + ) + db.session.commit() + + queue_orgs = [] + for queue_org in orgs: + queue_orgs.append( + OrganizationJobQueue(org_id=queue_org['org_id'], job_queue_id=queue.id, + creation_date=datetime.utcnow())) + + if len(queue_orgs) > 0: + db.session.add_all(queue_orgs) + db.session.commit() + + queue_schema = JobQueueSchema() + return json.loads(queue_schema.dumps(queue)) + + except SQLAlchemyError as ex: + raise ex + + +def delete_queue(queue_id): + try: + # Clear orgs + db.session.execute( + db.delete(OrganizationJobQueue).filter_by(job_queue_id=queue_id) + ) + db.session.commit() + + db.session.query(JobQueue).filter_by(id=queue_id).delete() + db.session.commit() + except SQLAlchemyError as ex: + raise ex + + +def get_default_queue(): + try: + default_queue = db.session \ + .query(JobQueue) \ + .filter_by(is_default=True) \ + .first() + return default_queue + + except SQLAlchemyError as ex: + raise ex + + +def _reset_queue_default(default_id): + query = "update job_queue set is_default = False where id != {}".format(default_id) + db.session.execute(sqlalchemy.text(query)) + db.session.commit() + + +def validate_or_get_queue(queue: str, job_type: str, user_id: int): + f""" + Validates if the queue name provided is valid and exists if not raises HTTP 400 + If no queue name is provided, it will default to the default job queue. + :param queue: Queue name + :param job_type: Job type + :param user_id: User id to look up available queues + :return: queue + :raises ValueError: If the queue name provided is not valid + """ + valid_queues = get_user_queues(user_id) + + if queue is None or queue == "": + if job_type is None: + default_queue = next(q for q in valid_queues if q.is_default) + return default_queue + recommended_queue = hysds.get_recommended_queue(job_type) + queue = next(q for q in valid_queues if q.queue_name == recommended_queue) + + valid_queue_names = list(map(lambda q: q.queue_name, valid_queues)) + if queue not in valid_queue_names: + raise ValueError(f"User does not have access to {queue}. Valid queues: {valid_queue_names}") + + return next(q for q in valid_queues if q.queue_name == queue) + +def contains_time_limit(queue: job_queue): + return queue is not None and queue.time_limit_minutes is not None and queue.time_limit_minutes > 0 diff --git a/api/utils/ogc_translate.py b/api/utils/ogc_translate.py index 6be1c83..f8d9763 100644 --- a/api/utils/ogc_translate.py +++ b/api/utils/ogc_translate.py @@ -449,7 +449,7 @@ def get_exception(type, origin_process, ex_message): exception = ET.SubElement(response, "ows:Exception") exception.set("exceptionCode", type) exception.set("locator", origin_process) - ET.SubElement(exception, "ows:ExceptionText").text = ex_message + ET.SubElement(exception, "ows:ExceptionText").text = str(ex_message) return tostring(response) diff --git a/api/utils/organization.py b/api/utils/organization.py new file mode 100644 index 0000000..8ca91b2 --- /dev/null +++ b/api/utils/organization.py @@ -0,0 +1,178 @@ +import logging +from collections import namedtuple +from datetime import datetime +import json +import sqlalchemy +from sqlalchemy.exc import SQLAlchemyError +from api.maap_database import db +from api.models.job_queue import JobQueue +from api.models.member import Member +from api.models.organization import Organization +from api.models.organization_job_queue import OrganizationJobQueue +from api.models.organization_membership import OrganizationMembership +from api.schemas.organization_schema import OrganizationSchema + +log = logging.getLogger(__name__) + + +def get_organizations(): + try: + result = [] + otree = db.session.execute(sqlalchemy.text('select * from org_tree order by row_number')) + + queues_query = db.session.query( + JobQueue, OrganizationJobQueue, + ).filter( + JobQueue.id == OrganizationJobQueue.job_queue_id + ).order_by(JobQueue.queue_name).all() + + membership_query = db.session.query( + Member, OrganizationMembership, + ).filter( + Member.id == OrganizationMembership.member_id + ).order_by(Member.first_name).all() + + Record = namedtuple('Record', otree.keys()) + org_tree_records = [Record(*r) for r in otree.fetchall()] + for r in org_tree_records: + org = { + 'id': r.id, + 'parent_org_id': r.parent_org_id, + 'name': r.name, + 'depth': r.depth, + 'member_count': r.member_count, + 'default_job_limit_count': r.default_job_limit_count, + 'default_job_limit_hours': r.default_job_limit_hours, + 'job_queues': [], + 'members': [], + 'creation_date': r.creation_date.strftime('%m/%d/%Y'), + } + + for q in queues_query: + if q.OrganizationJobQueue.org_id == r.id: + org['job_queues'].append({ + 'id': q.JobQueue.id, + 'queue_name': q.JobQueue.queue_name, + 'queue_description': q.JobQueue.queue_description + }) + + for m in membership_query: + if m.OrganizationMembership.org_id == r.id: + org['members'].append({ + 'id': m.Member.id, + 'first_name': m.Member.first_name, + 'last_name': m.Member.last_name, + 'username': m.Member.username, + 'email': m.Member.email, + 'maintainer': m.OrganizationMembership.org_maintainer + }) + + result.append(org) + + return result + except SQLAlchemyError as ex: + raise ex + +def get_member_organizations(member_id): + result = [] + + user_orgs = db.session \ + .query(Organization, OrganizationMembership) \ + .filter(Organization.id == OrganizationMembership.org_id) \ + .order_by(Organization.name).all() + + for user_org in user_orgs: + if user_org.OrganizationMembership.member_id == member_id: + result.append({ + 'id': user_org.Organization.id, + 'name': user_org.Organization.name + }) + + return result + +def get_organization(org_id): + try: + org = db.session \ + .query(Organization) \ + .filter_by(id=org_id) \ + .first() + return org + + except SQLAlchemyError as ex: + raise ex + +def create_organization(name, parent_org_id, default_job_limit_count, default_job_limit_hours, members): + + try: + new_org = Organization(name=name, parent_org_id=parent_org_id, default_job_limit_count=default_job_limit_count, + default_job_limit_hours=default_job_limit_hours, creation_date=datetime.utcnow()) + + db.session.add(new_org) + db.session.commit() + + org_members = [] + for org_member in members: + org_members.append(OrganizationMembership(member_id=org_member['member_id'], org_id=new_org.id, + org_maintainer=org_member['maintainer'], + creation_date=datetime.utcnow())) + + if len(org_members) > 0: + db.session.add_all(org_members) + db.session.commit() + + org_schema = OrganizationSchema() + return json.loads(org_schema.dumps(new_org)) + + except SQLAlchemyError as ex: + raise ex + +def update_organization(org, members): + + try: + # Update org + db.session.commit() + + # Update membership + db.session.execute( + db.delete(OrganizationMembership).filter_by(org_id=org.id) + ) + db.session.commit() + + org_members = [] + for org_member in members: + org_members.append(OrganizationMembership( + member_id=org_member['member_id'], + org_id=org.id, + org_maintainer=org_member['maintainer'], + creation_date=datetime.utcnow())) + + if len(org_members) > 0: + db.session.add_all(org_members) + db.session.commit() + + org_schema = OrganizationSchema() + return json.loads(org_schema.dumps(org)) + + except SQLAlchemyError as ex: + raise ex + +def delete_organization(org_id): + try: + + # Clear membership + db.session.execute( + db.delete(OrganizationMembership).filter_by(org_id=org_id) + ) + db.session.commit() + + # Clear job queues + db.session.execute( + db.delete(OrganizationJobQueue).filter_by(org_id=org_id) + ) + db.session.commit() + + db.session.query(Organization).filter_by(id=org_id).delete() + db.session.commit() + + except SQLAlchemyError as ex: + raise ex diff --git a/pyproject.toml b/pyproject.toml index ba37abf..26fbe60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "maap-api-nasa" -version = "4.1.0a5" +version = "4.1.1a2" description = "NASA Python implementation of the MAAP API specification" authors = ["MAAP-Project Platform "] license = "Apache 2.0" diff --git a/sql/materialized_views/org_tree.sql b/sql/materialized_views/org_tree.sql new file mode 100644 index 0000000..a90019a --- /dev/null +++ b/sql/materialized_views/org_tree.sql @@ -0,0 +1,46 @@ +create view org_tree + (row_number, member_count, depth, id, name, parent_org_id, default_job_limit_count, default_job_limit_hours, + creation_date) +as +WITH RECURSIVE + node_rec AS (SELECT 1 AS row_count, + 0 AS member_count, + 1 AS depth, + organization.id, + organization.name, + organization.parent_org_id, + organization.default_job_limit_count, + organization.default_job_limit_hours, + organization.creation_date + FROM organization + WHERE organization.parent_org_id IS NULL + UNION ALL + SELECT 1 AS row_count, + 0 AS member_count, + r.depth + 1, + n.id, + n.name, + n.parent_org_id, + n.default_job_limit_count, + n.default_job_limit_hours, + n.creation_date + FROM node_rec r + JOIN organization n ON n.parent_org_id = r.id) SEARCH DEPTH FIRST BY name, + id SET path + SELECT row_number() OVER (ORDER BY node_rec.path, node_rec.name) AS row_number, + ( SELECT count(*) AS count + FROM organization_membership + WHERE organization_membership.org_id = node_rec.id) AS member_count, + node_rec.depth, + node_rec.id, + node_rec.name, + node_rec.parent_org_id, + node_rec.default_job_limit_count, + node_rec.default_job_limit_hours, + node_rec.creation_date + FROM node_rec + WHERE node_rec.parent_org_id IS NOT NULL + ORDER BY node_rec.path, node_rec.name; + +alter table org_tree + owner to postgres; \ No newline at end of file diff --git a/test/api/utils/test_hysds_util.py b/test/api/utils/test_hysds_util.py new file mode 100644 index 0000000..3384b5f --- /dev/null +++ b/test/api/utils/test_hysds_util.py @@ -0,0 +1,66 @@ +import unittest +from unittest.mock import Mock, patch +from api.utils import hysds_util, job_queue +from api import settings +import copy +from requests import Session +import json + + +def mock_session_get(*args, **kwargs): + class MockResponse: + def __init__(self, json_data, status_code): + self.json_data = json_data + self.status_code = status_code + + def json(self): + return self.json_data + + return MockResponse({}, 200) + + +class TestHySDSUtils(unittest.TestCase): + + def setUp(self): + pass + + @patch('requests.Session.get', side_effect=mock_session_get) + def test_get_mozart_job_info(self, mock_session_get): + hysds_util.get_mozart_job_info("someid") + mock_session_get.assert_called_with("{}/job/info".format(settings.MOZART_URL), params={"id": "someid"}) + + def test_remove_double_tag(self): + mozart_response = {"result": {"tags": ["duplicate", "duplicate"]}} + resp = hysds_util.remove_double_tag(mozart_response) + self.assertEqual({"result": {"tags": ["duplicate"]}}, resp) + self.assertNotEqual({"result": {"tags": ["duplicate", "duplicate"]}}, resp) + mozart_response = {"result": {}} + resp = hysds_util.remove_double_tag(mozart_response) + self.assertEqual({"result": {}}, resp) + + def test_add_product_path(self): + self.fail() + + @patch('api.utils.hysds_util.get_recommended_queue') + @patch('api.utils.hysds_util.get_mozart_queues') + def test_validate_queue(self, mock_get_mozart_queues, mock_get_recommended_queue): + mock_get_recommended_queue.return_value = "maap-dps-worker-8gb" + mock_get_mozart_queues.return_value = ["maap-dps-worker-8gb", "maap-dps-worker-16gb"] + queue = "maap-dps-worker-16gb" + job_type = "dummy-job" + user_id = 1 + new_queue = job_queue.validate_or_get_queue(queue, job_type, user_id) + self.assertEqual(queue, new_queue.queue_name) + mock_get_mozart_queues.assert_called() + new_queue = job_queue.validate_or_get_queue("", job_type, user_id) + self.assertEqual("maap-dps-worker-8gb", new_queue.queue_name) + mock_get_recommended_queue.assert_called_with("dummy-job") + with self.assertRaises(ValueError): + job_queue.validate_or_get_queue("invalid_queue", job_type, user_id) + + def test_set_time_limits(self): + params = {"input": "in1", "username": "user"} + expected_params = copy.deepcopy(params) + expected_params.update({"soft_time_limit": "6000", "time_limit": "6000"}) + hysds_util.set_timelimit_for_dps_sandbox(params) + self.assertEqual(expected_params, params)