Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 ♻️ Is3125/gc review #3130

Merged
merged 13 commits into from
Jun 22, 2022
19 changes: 7 additions & 12 deletions packages/service-library/src/servicelib/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from asyncio import iscoroutinefunction
from contextlib import contextmanager
from inspect import getframeinfo, stack
from typing import Callable, Dict, Optional
from typing import Callable, Optional

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,7 +63,8 @@ def format(self, record):
return super().format(record)


DEFAULT_FORMATTING = "%(levelname)s: [%(asctime)s/%(processName)s] [%(name)s:%(funcName)s(%(lineno)d)] %(message)s"
# SEE https://docs.python.org/3/library/logging.html#logrecord-attributes
DEFAULT_FORMATTING = "%(levelname)s: [%(asctime)s/%(processName)s] [%(name)s:%(funcName)s(%(lineno)d)] - %(message)s"


def config_all_loggers():
Expand All @@ -78,25 +79,19 @@ def config_all_loggers():

def set_logging_handler(
logger: logging.Logger,
formatter_base=None,
formatting: Optional[str] = None,
formatter_base: Optional[type[logging.Formatter]] = None,
fmt: str = DEFAULT_FORMATTING,
) -> None:
if not formatting:
formatting = DEFAULT_FORMATTING
if not formatter_base:
formatter_base = CustomFormatter

for handler in logger.handlers:
handler.setFormatter(
formatter_base(
"%(levelname)s: %(name)s:%(funcName)s(%(lineno)s) - %(message)s"
)
)
handler.setFormatter(formatter_base(fmt))


def _log_arguments(
logger_obj: logging.Logger, level: int, func: Callable, *args, **kwargs
) -> Dict[str, str]:
) -> dict[str, str]:
args_passed_in_function = [repr(a) for a in args]
kwargs_passed_in_function = [f"{k}={v!r}" for k, v in kwargs.items()]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ async def get_swarm_network(dynamic_sidecar_settings: DynamicSidecarSettings) ->
]
if not networks or len(networks) > 1:
raise DynamicSidecarError(
(
f"Swarm network name (searching for '*{network_name}*') is not configured."
f"Found following networks: {networks}"
)
f"Swarm network name (searching for '*{network_name}*') is not configured."
f"Found following networks: {networks}"
)
return networks[0]

Expand Down Expand Up @@ -116,7 +114,7 @@ async def create_service_and_get_id(create_service_data: AioDockerServiceSpec) -

if "ID" not in service_start_result:
raise DynamicSidecarError(
"Error while starting service: {}".format(str(service_start_result))
f"Error while starting service: {str(service_start_result)}"
)
return service_start_result["ID"]

Expand Down Expand Up @@ -162,10 +160,8 @@ async def _sleep_or_error(started: float, task: dict):
> dynamic_sidecar_settings.DYNAMIC_SIDECAR_TIMEOUT_FETCH_DYNAMIC_SIDECAR_NODE_ID
):
raise DynamicSidecarError(
msg=(
"Timed out while searching for an assigned NodeID for "
f"service_id={service_id}. Last task inspect result: {task}"
)
"Timed out while searching for an assigned NodeID for "
f"service_id={service_id}. Last task inspect result: {task}"
)

async with docker_client() as client:
Expand Down Expand Up @@ -219,10 +215,8 @@ async def get_service_placement(

if "NodeID" not in task:
raise DynamicSidecarError(
msg=(
f"Could not find an assigned NodeID for service_id={service_id}. "
f"Last task inspect result: {task}"
)
f"Could not find an assigned NodeID for service_id={service_id}. "
f"Last task inspect result: {task}"
)

return task["NodeID"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncio
import logging
from itertools import chain
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import Any, Optional

import asyncpg.exceptions
from aiohttp import web
Expand All @@ -17,7 +17,11 @@
from . import director_v2_api, users_exceptions
from .director.director_exceptions import DirectorException, ServiceNotFoundError
from .garbage_collector_settings import GUEST_USER_RC_LOCK_FORMAT
from .garbage_collector_utils import get_new_project_owner_gid, replace_current_owner
from .garbage_collector_utils import (
get_new_project_owner_gid,
log_context,
replace_current_owner,
)
from .projects.projects_api import (
get_project_for_user,
get_workbench_node_ids_from_project_uuid,
Expand Down Expand Up @@ -61,29 +65,28 @@ async def collect_garbage(app: web.Application):
The field `garbage_collection_interval_seconds` defines the interval at which this
function will be called.
"""
logger.info("Collecting garbage...")

registry: RedisResourceRegistry = get_registry(app)

# Removes disconnected user resources
# Triggers signal to close possible pending opened projects
# Removes disconnected GUEST users after they finished their sessions
await remove_disconnected_user_resources(registry, app)
with log_context(logger.info, "Step 1: Removes disconnected user resources"):
# Triggers signal to close possible pending opened projects
# Removes disconnected GUEST users after they finished their sessions
await remove_disconnected_user_resources(registry, app)

# Users manually marked for removal:
# if a user was manually marked as GUEST it needs to be
# removed together with all the associated projects
await remove_users_manually_marked_as_guests(registry, app)
with log_context(logger.info, "Step 2: Removes users manually marked for removal"):
# if a user was manually marked as GUEST it needs to be
# removed together with all the associated projects
await remove_users_manually_marked_as_guests(registry, app)

# For various reasons, some services remain pending after
# the projects are closed or the user was disconencted.
# This will close and remove all these services from
# the cluster, thus freeing important resources.
with log_context(logger.info, "Step 3: Removes orphaned services"):
# For various reasons, some services remain pending after
# the projects are closed or the user was disconencted.
# This will close and remove all these services from
# the cluster, thus freeing important resources.

# Temporary disabling GC to until the dynamic service
# safe function is invoked by the GC. This will avoid
# data loss for current users.
await remove_orphaned_services(registry, app)
# Temporary disabling GC to until the dynamic service
# safe function is invoked by the GC. This will avoid
# data loss for current users.
await remove_orphaned_services(registry, app)


async def remove_disconnected_user_resources(
Expand Down Expand Up @@ -125,9 +128,9 @@ async def remove_disconnected_user_resources(
if await lock_manager.lock(
GUEST_USER_RC_LOCK_FORMAT.format(user_id=user_id)
).locked():
logger.debug(
"Skipping garbage-collecting user '%d' since it is still locked",
user_id,
logger.info(
"Skipping garbage-collecting %s since it is still locked",
f"{user_id=}",
)
continue

Expand All @@ -138,13 +141,13 @@ async def remove_disconnected_user_resources(
continue

# (1,2) CAREFULLY releasing every resource acquired by the expired key
logger.debug(
"Key '%s' expired. Cleaning the following resources: '%s'",
dead_key,
dead_key_resources,
logger.info(
"%s expired. Checking resources to cleanup",
f"{dead_key=}",
)

for resource_name, resource_value in dead_key_resources.items():
resource_value = f"{resource_value}"

# Releasing a resource consists of two steps
# - (1) release actual resource (e.g. stop service, close project, deallocate memory, etc)
Expand Down Expand Up @@ -175,10 +178,10 @@ async def remove_disconnected_user_resources(

# (1) releasing acquired resources
logger.info(
"(1) Releasing resource %s:%s acquired by expired key %s",
resource_name,
resource_value,
dead_key,
"(1) Releasing resource %s:%s acquired by expired %s",
f"{resource_name=}",
f"{resource_value=}",
f"{dead_key!r}",
)

if resource_name == "project_id":
Expand Down Expand Up @@ -208,15 +211,17 @@ async def remove_disconnected_user_resources(

# ONLY GUESTS: if this user was a GUEST also remove it from the database
# with the only associated project owned
# FIXME: if a guest can share, it will become permanent user!
await remove_guest_user_with_all_its_resources(
app=app,
user_id=int(dead_key["user_id"]),
)

# (2) remove resource field in collected keys since (1) is completed
logger.info(
"(2) Removing resource %s field entry from registry keys: %s",
resource_name,
"(2) Removing field for released resource %s:%s from registry keys: %s",
f"{resource_name=}",
f"{resource_value=}",
keys_to_update,
)
on_released_tasks = [
Expand Down Expand Up @@ -248,7 +253,7 @@ async def remove_users_manually_marked_as_guests(
skip_users.add(int(entry["user_id"]))

# Prevent creating this list if a guest user
guest_users: List[Tuple[int, str]] = await get_guest_user_ids_and_names(app)
guest_users: list[tuple[int, str]] = await get_guest_user_ids_and_names(app)

for guest_user_id, guest_user_name in guest_users:
# Prevents removing GUEST users that were automatically (NOT manually) created
Expand Down Expand Up @@ -291,20 +296,20 @@ async def remove_users_manually_marked_as_guests(

async def _remove_single_orphaned_service(
app: web.Application,
interactive_service: Dict[str, Any],
currently_opened_projects_node_ids: Set[str],
interactive_service: dict[str, Any],
currently_opened_projects_node_ids: set[str],
) -> None:
service_host = interactive_service["service_host"]
# if not present in DB or not part of currently opened projects, can be removed
service_uuid = interactive_service["service_uuid"]
# if the node does not exist in any project in the db
# they can be safely remove it without saving any state
if not await is_node_id_present_in_any_project_workbench(app, service_uuid):
message = (
logger.info(
"Will remove orphaned service without saving state since "
f"this service is not part of any project {service_host}"
"this service is not part of any project %s",
f"{service_host=}",
)
logger.info(message)
try:
await director_v2_api.stop_service(app, service_uuid, save_state=False)
except (ServiceNotFoundError, DirectorException) as err:
Expand All @@ -331,8 +336,8 @@ async def _remove_single_orphaned_service(
#
# a service state might be one of [pending, pulling, starting, running, complete, failed]
logger.warning(
"Skipping %s since image is in %s",
service_host,
"Skipping %s since service state is %s",
f"{service_host=}",
interactive_service.get("service_state", "unknown"),
)
return
Expand Down Expand Up @@ -378,7 +383,7 @@ async def remove_orphaned_services(
"""
logger.debug("Starting orphaned services removal...")

currently_opened_projects_node_ids: Set[str] = set()
currently_opened_projects_node_ids: set[str] = set()
alive_keys, _ = await registry.get_all_resource_keys()
for alive_key in alive_keys:
resources = await registry.get_resources(alive_key)
Expand All @@ -389,11 +394,11 @@ async def remove_orphaned_services(
node_ids = await get_workbench_node_ids_from_project_uuid(app, project_uuid)
currently_opened_projects_node_ids.update(node_ids)

running_interactive_services: List[Dict[str, Any]] = []
running_interactive_services: list[dict[str, Any]] = []
try:
running_interactive_services = await director_v2_api.get_services(app)
except director_v2_api.DirectorServiceError:
logger.debug(("Could not fetch running_interactive_services"))
logger.debug("Could not fetch running_interactive_services")

logger.info(
"Currently running services %s",
Expand Down Expand Up @@ -434,7 +439,7 @@ async def _delete_all_projects_for_user(app: web.Application, user_id: int) -> N
"""
# recover user's primary_gid
try:
project_owner: Dict = await get_user(app=app, user_id=user_id)
project_owner: dict = await get_user(app=app, user_id=user_id)
except users_exceptions.UserNotFoundError:
logger.warning(
"Could not recover user data for user '%s', stopping removal of projects!",
Expand All @@ -456,11 +461,11 @@ async def _delete_all_projects_for_user(app: web.Application, user_id: int) -> N
f"{user_project_uuids=}",
)

delete_tasks: List[asyncio.Task] = []
delete_tasks: list[asyncio.Task] = []

for project_uuid in user_project_uuids:
try:
project: Dict = await get_project_for_user(
project: dict = await get_project_for_user(
app=app,
project_uuid=project_uuid,
user_id=user_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

from .garbage_collector_core import collect_garbage
from .garbage_collector_settings import GarbageCollectorSettings, get_plugin_settings
from .garbage_collector_utils import log_context

logger = logging.getLogger(__name__)


GC_TASK_NAME = f"background-task.{__name__}.collect_garbage_periodically"
GC_TASK_CONFIG = f"{GC_TASK_NAME}.config"
GC_TASK = f"{GC_TASK_NAME}.task"


async def run_background_task(app: web.Application):
Expand All @@ -29,6 +31,8 @@ async def run_background_task(app: web.Application):
gc_bg_task = asyncio.create_task(
collect_garbage_periodically(app), name=GC_TASK_NAME
)
# attaches variable to the app's lifetime
app[GC_TASK] = gc_bg_task
pcrespov marked this conversation as resolved.
Show resolved Hide resolved

# FIXME: added this config to overcome the state in which the
# task cancelation is ignored and the exceptions enter in a loop
Expand Down Expand Up @@ -59,29 +63,31 @@ async def run_background_task(app: web.Application):


async def collect_garbage_periodically(app: web.Application):

settings: GarbageCollectorSettings = get_plugin_settings(app)
interval = settings.GARBAGE_COLLECTOR_INTERVAL_S

while True:
logger.info("Starting garbage collector...")
try:
interval = settings.GARBAGE_COLLECTOR_INTERVAL_S
while True:
await collect_garbage(app)
with log_context(logger.info, "Garbage collect cycle"):
await collect_garbage(app)

if app[GC_TASK_CONFIG].get("force_stop", False):
raise Exception("Forced to stop garbage collection")
if app[GC_TASK_CONFIG].get("force_stop", False):
raise RuntimeError("Forced to stop garbage collection")

logger.info("Garbage collect cycle pauses %ss", interval)
await asyncio.sleep(interval)

except asyncio.CancelledError:
logger.info("Garbage collection task was cancelled, it will not restart!")
except asyncio.CancelledError: # EXIT
logger.info(
"Stopped: Garbage collection task was cancelled, it will not restart!"
)
# do not catch Cancellation errors
raise

except Exception: # pylint: disable=broad-except
except Exception: # RESILIENT restart # pylint: disable=broad-except
logger.warning(
"There was an error during garbage collection, restarting...",
"Stopped: There was an error during garbage collection, restarting...",
exc_info=True,
)

Expand Down
Loading