Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: distance-based task distribution logic #119

Merged
merged 13 commits into from
Jan 26, 2023
Merged
13 changes: 4 additions & 9 deletions pro_tes/ga4gh/tes/task_runs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Class implementing TES API-server-side controller methods."""

import logging
from copy import deepcopy
from datetime import datetime
import logging
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
from typing import Dict, Tuple

import requests
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
import tes
from bson.objectid import ObjectId
from celery import uuid
from dateutil.parser import parse as parse_time
Expand All @@ -13,17 +15,10 @@
from foca.utils.misc import generate_id
from pymongo.collection import Collection
from pymongo.errors import DuplicateKeyError, PyMongoError
import requests
import tes
from tes.models import Task

from pro_tes.exceptions import BadRequest, TaskNotFound
from pro_tes.ga4gh.tes.models import (
DbDocument,
TesEndpoint,
TesState,
TesTask,
)
from pro_tes.ga4gh.tes.models import DbDocument, TesEndpoint, TesState, TesTask
from pro_tes.ga4gh.tes.states import States
from pro_tes.tasks.track_task_progress import task__track_task_progress
from pro_tes.utils.db import DbDocumentConnector
Expand Down
2 changes: 1 addition & 1 deletion pro_tes/middleware/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""proTES middlwares."""
"""proTES middlewares."""
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 6 additions & 5 deletions pro_tes/middleware/middleware.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
"""Middleware to inject into TES requests."""

import abc
from typing import (
Dict,
)
from typing import Dict

from pro_tes.middleware.task_distribution import (
from pro_tes.middleware.task_distribution.distance import (
task_distribution_by_distance,
)
from pro_tes.middleware.task_distribution.random import (
random_task_distribution,
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
)


# pragma pylint: disable=too-few-public-methods


class AbstractMiddleware(metaclass=abc.ABCMeta):
"""Abstract class to implement different middleware."""

Expand Down
44 changes: 24 additions & 20 deletions pro_tes/middleware/models.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,35 @@
"""Model for the Access Uri Combination."""
# pylint: disable=no-name-in-module

# pragma pylint: disable=no-name-in-module

from typing import List
from pydantic import BaseModel

from pydantic import AnyUrl, BaseModel, HttpUrl

# pragma pylint: disable=too-few-public-methods
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
class TesUriList(BaseModel):
"""Combination of the tes_uri and total distance."""

tes_uri: str
total_distance: int = None

class TesStats(BaseModel):
"""Combination of Tes stats, currently total distance."""

class AccessUriCombination(BaseModel):
"""Combination of input_uri of the TES task and the TES instances."""
total_distance: float = None


class TaskParams(BaseModel):
"""Combination of task parameters, currently input uris."""

input_uri: List[str]
tes_uri_list: List[TesUriList]
input_uris: List[AnyUrl]

def convert_tes_uri_list_to_dict(self):
"""Convert the list of type TESUriList to list of dictionary."""
converted_list = []
for combo in self.tes_uri_list:
converted_list.append(combo.__dict__)
return converted_list

def convert_combination_to_dict(self):
"""Convert the AccessUriCombination object to dictionary."""
tes_uri_list = self.convert_tes_uri_list_to_dict()
return {"input_uri": self.input_uri, "tes_uri_list": tes_uri_list}
class TesDeployment(BaseModel):
"""Combination of the tes_uri and its stats."""

tes_uri: HttpUrl
stats: TesStats


class AccessUriCombination(BaseModel):
"""Combination of input_uri of the TES task and the TES instances."""

task_params: TaskParams
tes_deployments: List[TesDeployment]
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions pro_tes/middleware/task_distribution/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""proTES task distribution middlewares."""
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,42 +1,25 @@
"""Module for task distribution logic."""
"""Module for distance based task distribution logic ."""
uniqueg marked this conversation as resolved.
Show resolved Hide resolved

from copy import deepcopy
import random
from urllib.parse import urlparse
from socket import gaierror, gethostbyname
from typing import Dict, List, Set, Tuple, Optional
from itertools import combinations
from ip2geotools.errors import InvalidRequestError
from ip2geotools.databases.noncommercial import DbIpCity
from geopy.distance import geodesic
from socket import gaierror, gethostbyname
from typing import Dict, List, Optional, Set, Tuple
from urllib.parse import urlparse

from flask import current_app
import requests

from pro_tes.middleware.models import AccessUriCombination, TesUriList
from geopy.distance import geodesic
from ip2geotools.databases.noncommercial import DbIpCity
from ip2geotools.errors import InvalidRequestError
uniqueg marked this conversation as resolved.
Show resolved Hide resolved

from pro_tes.middleware.models import (
AccessUriCombination,
TaskParams,
TesDeployment,
TesStats
)

# pylint: disable-msg=R0912
# pylint: disable-msg=too-many-locals
def random_task_distribution() -> Optional[str]:
uniqueg marked this conversation as resolved.
Show resolved Hide resolved
"""Random task distributor.

Randomly distribute tasks across available TES instances.

Returns:
A randomly selected, available TES instance.
"""
foca_conf = current_app.config.foca
tes_uri: List[str] = deepcopy(foca_conf.tes["service_list"])
timeout: int = foca_conf.controllers["post_task"]["timeout"]["poll"]
while len(tes_uri) != 0:
random_tes_uri: str = random.choice(tes_uri)
response = requests.get(url=random_tes_uri, timeout=timeout)
if response.status_code == 200:
tes_uri.clear()
tes_uri.insert(0, random_tes_uri)
return tes_uri
tes_uri.remove(random_tes_uri)
return None


def task_distribution_by_distance(input_uri: List) -> Optional[List]:
Expand All @@ -57,7 +40,6 @@ def task_distribution_by_distance(input_uri: List) -> Optional[List]:

# get the combination of the tes ip and input ip
ips = ip_combination(input_uri=input_uri, tes_uri=tes_uri)

ips_unique: Dict[Set[str], List[Tuple[int, str]]] = {
v: [] for v in ips.values() # type: ignore
}
Expand All @@ -67,7 +49,6 @@ def task_distribution_by_distance(input_uri: List) -> Optional[List]:
# Calculate distances between all IPs
distances_unique: Dict[Set[str], float] = {}
ips_all = frozenset().union(*list(ips_unique.keys())) # type: ignore

try:
distances_full = ip_distance(*ips_all)
except ValueError:
Expand All @@ -92,7 +73,6 @@ def task_distribution_by_distance(input_uri: List) -> Optional[List]:

# Map distances back to each access URI combination
distances = [deepcopy({}) for i in range(len(tes_uri))]

for ip_set, combination in ips_unique.items(): # type: ignore
for combo in combination:
try:
Expand All @@ -106,18 +86,21 @@ def task_distribution_by_distance(input_uri: List) -> Optional[List]:

# Add total distance corresponding to TES uri's in
# access URI combination
for index, value in enumerate(access_uri_combination.tes_uri_list):
value.total_distance = distances[index]["total"]
for index, value in enumerate(access_uri_combination.tes_deployments):
value.stats.total_distance = distances[index]["total"]

combination = []
for index, value in enumerate(access_uri_combination.tes_deployments):
combination.append(value.dict())

combination = access_uri_combination.convert_combination_to_dict()
# sorting the TES uri in decreasing order of total distance
ranked_combination = sorted(
combination["tes_uri_list"], key=lambda x: x["total_distance"]
combination, key=lambda x: x["stats"]["total_distance"]
)

ranked_tes_uri = []
for index, value in enumerate(ranked_combination):
ranked_tes_uri.append(value["tes_uri"])
ranked_tes_uri.append(str(value["tes_uri"]))

return ranked_tes_uri

Expand All @@ -134,35 +117,44 @@ def get_uri_combination(
Returns:
A AccessUriCombination object of the form like:
{
"input_uri": [
"task_params": {
"input_uri": [
"ftp://vm4466.kaj.pouta.csc.fi/upload/foivos/test1.txt",
"ftp://vm4466.kaj.pouta.csc.fi/upload/foivos/test2.txt",
"ftp://vm4466.kaj.pouta.csc.fi/upload/foivos/test3.txt",
],

"tes_uri_list": [
{
"tes_uri": "https://tesk-eu.hypatia-comp.athenarc.gr",
"total_distance": None
]
},

"tes_deployments": [
{ "tes_uri": "https://tesk-eu.hypatia-comp.athenarc.gr",
"stats": {
"total_distance": None
}
},
{
"tes_uri": "https://csc-tesk-noauth.rahtiapp.fi",
"total_distance": None
{ "tes_uri": "https://csc-tesk-noauth.rahtiapp.fi",
"stats": {
"total_distance": None
}
},
{
"tes_uri": "https://tesk-na.cloud.e-infra.cz",
"total_distance" : None
{ "tes_uri": "https://tesk-na.cloud.e-infra.cz",
"stats": {
"total_distance": None
}
},
]
}
"""
tes_uri_list = []
tes_deployment_list = []
for uri in tes_uri:
obj = TesUriList(tes_uri=uri, total_distance=None)
tes_uri_list.append(obj)
temp_obj = TesDeployment(
tes_uri=uri,
stats=TesStats(total_distance=None)
)
tes_deployment_list.append(temp_obj)

task_param = TaskParams(input_uris=input_uri)
access_uri_combination = AccessUriCombination(
input_uri=input_uri, tes_uri_list=tes_uri_list
task_params=task_param,
tes_deployments=tes_deployment_list
)
return access_uri_combination

Expand Down
30 changes: 30 additions & 0 deletions pro_tes/middleware/task_distribution/random.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Module for random task distribution."""

import random
from copy import deepcopy
from typing import List, Optional

import requests
from flask import current_app


def random_task_distribution() -> Optional[List]:
"""Random task distributor.

Randomly distribute tasks across available TES instances.

Returns:
A randomly selected, available TES instance.
"""
foca_conf = current_app.config.foca
tes_uri: List[str] = deepcopy(foca_conf.tes["service_list"])
timeout: int = foca_conf.controllers["post_task"]["timeout"]["poll"]
while len(tes_uri) != 0:
random_tes_uri: str = random.choice(tes_uri)
response = requests.get(url=random_tes_uri, timeout=timeout)
if response.status_code == 200:
tes_uri.clear()
tes_uri.insert(0, random_tes_uri)
return tes_uri
tes_uri.remove(random_tes_uri)
return None