Skip to content

Commit

Permalink
Fix bug introduced by e1393a2 commit
Browse files Browse the repository at this point in the history
  • Loading branch information
xDaile committed Jun 17, 2024
1 parent 62f7d14 commit b6491cd
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 90 deletions.
174 changes: 112 additions & 62 deletions iib/workers/tasks/opm_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import random
import re
import shutil
import socket
import subprocess
Expand Down Expand Up @@ -78,7 +79,7 @@ def lock_acquire(self):
# test if port is free
s.bind(("localhost", self.port))
# create file-lock
f = os.open(self.filename, os.O_CREAT | os.O_EXCL | os.O_RDWR)
f = os.open(self.filename, os.O_CREAT | os.O_EXCL)
self.locked = True
log.debug("Port %s used as %s was locked.", self.port, self.purpose)
except FileExistsError:
Expand All @@ -92,7 +93,7 @@ def lock_acquire(self):
finally:
s.close()
if f:
f.close()
os.close(f)

def unlock(self):
"""Delete file representing port lock."""
Expand Down Expand Up @@ -143,7 +144,7 @@ def port_file_locks_generator(
raise IIBError(err_msg)


def get_opm_port_stacks() -> Tuple[List[List[int]], List[str]]:
def get_opm_port_stacks(port_purposes: List[str]) -> Tuple[List[List[int]], List[str]]:
"""
Get stack with port numbers and list of their intended purposes.
Expand All @@ -160,18 +161,20 @@ def get_opm_port_stacks() -> Tuple[List[List[int]], List[str]]:
Example:
ports, purposes = get_opm_port_stacks()
ports contains: [[50051, 50151], [50052, 50152]]
purposes content: ['opm_port', 'pprof_port']
purposes content: ['opm_port', 'opm_pprof_port']
In this example, ports 50051 and 50052 are port intended to use as 'opm_port' and ports 50151
and 50152 as 'pprof_port'.
and 50152 as 'opm_pprof_port'.
:param list(str) port_purposes: list with port intended purposes
:return: tuple with port stacks and their purposes
:rtype: tuple(list(list(int)), list(str))
"""
conf = get_worker_config()
port_purposes = list(conf.iib_opm_port_ranges)

if Version(Opm.opm_version) < Version(conf.iib_opm_pprof_lock_required_min_version):
port_purposes.remove('pprof_port')
opm_version = Opm.get_opm_version_number()
if Version(opm_version) < Version(conf.iib_opm_pprof_lock_required_min_version):
port_purposes.remove('opm_pprof_port')

# get port_ranges we need for the give opm_version
port_ranges = [range(*conf.iib_opm_port_ranges[port_purpose]) for port_purpose in port_purposes]
Expand All @@ -184,60 +187,78 @@ def get_opm_port_stacks() -> Tuple[List[List[int]], List[str]]:
return ports_list, port_purposes


def create_port_filelocks(func: Callable) -> Callable:
def create_port_filelocks(port_purposes: List[str]) -> Callable:
"""
Create a file-lock on random port from the configured range.
:param function func: the function to be decorated
:rtype: function
:return: the decorated function
:param List[str] port_purposes: the list of port purposes to be locked
:rtype: Callable
:return: the decorator function
"""

@wraps(func)
def inner(*args, **kwargs):
def decorator(func: Callable) -> Callable:
"""
Create a file-lock on random port from the configured range.
port_stacks, port_purposes = get_opm_port_stacks()
# Attempt to acquire the lock for each port in the range (shuffled order)
lock_success = False
:param function func: the function to be decorated
:rtype: function
:return: the decorated function
"""

# Initialize the generator
gen = port_file_locks_generator(port_stacks=port_stacks, port_purposes=port_purposes)
@wraps(func)
def inner(*args, **kwargs):

# Use the function to retrieve values from the generator
while not lock_success:
new_locks = next(gen)
currently_active_locks = []
# If we do not have any ports to lock
if len(port_purposes) == 0:
return func(*args, **kwargs)

try:
# Atomically acquire the locks for the given ports
for new_lock in new_locks:
new_lock.lock_acquire()
currently_active_locks.append(new_lock)

port_args = {
port_purpose: currently_active_locks[port_position].port
for port_position, port_purpose in enumerate(port_purposes)
}

result = func(*args, **port_args, **kwargs)
lock_success = True

# Exception raised during execution of func()
except AddressAlreadyInUse:
lock_success = False
for active_lock in currently_active_locks:
active_lock.unlock()

finally:
# Exit loop after successful lock acquisition
if lock_success:
port_stacks, port_purposes_updated = get_opm_port_stacks(port_purposes)
# Attempt to acquire the lock for each port in the range (shuffled order)
lock_success = False

# Initialize the generator
gen = port_file_locks_generator(
port_stacks=port_stacks,
port_purposes=port_purposes_updated,
)

# Use the function to retrieve values from the generator
while not lock_success:
new_locks = next(gen)
currently_active_locks = []

try:
# Atomically acquire the locks for the given ports
for new_lock in new_locks:
new_lock.lock_acquire()
currently_active_locks.append(new_lock)

port_args = {
port_purpose: currently_active_locks[port_position].port
for port_position, port_purpose in enumerate(port_purposes_updated)
}

result = func(*args, **port_args, **kwargs)
lock_success = True

# Exception raised during execution of func()
except AddressAlreadyInUse:
lock_success = False
for active_lock in currently_active_locks:
active_lock.unlock()
break

return result
finally:
# Exit loop after successful lock acquisition
if lock_success:
for active_lock in currently_active_locks:
active_lock.unlock()
break

return result

return inner
return inner

return decorator


def opm_serve_from_index(base_dir: str, from_index: str) -> Tuple[int, subprocess.Popen]:
Expand All @@ -263,11 +284,11 @@ def opm_serve_from_index(base_dir: str, from_index: str) -> Tuple[int, subproces
return opm_serve(catalog_dir=catalog_dir)


@create_port_filelocks
@create_port_filelocks(port_purposes=["opm_port", "opm_pprof_port"])
def opm_serve(
opm_port: int,
catalog_dir: str,
pprof_port: Optional[int] = None,
opm_pprof_port: Optional[int] = None,
) -> Tuple[int, subprocess.Popen]:
"""
Locally start OPM service, which can be communicated with using gRPC queries.
Expand All @@ -276,7 +297,7 @@ def opm_serve(
binding conflicts. Resolution of port conflicts is handled in this function as well.
:param int opm_port: OPM port number obtained from create_port_filelock decorator
:param int pprof_port: Pprof opm port number obtained from create_port_filelock decorator
:param int opm_pprof_port: Pprof opm port number obtained from create_port_filelock decorator
:param str catalog_dir: path to file-based catalog directory that should be served.
:return: tuple containing port number of the running service and the running Popen object.
:rtype: (int, Popen)
Expand All @@ -288,14 +309,14 @@ def opm_serve(
'serve',
catalog_dir,
'-p',
opm_port,
str(opm_port),
'-t',
'/dev/null',
]

if pprof_port:
if opm_pprof_port:
# by default opm uses the 127.0.0.1:6060
cmd.extend(["--pprof-addr", f"127.0.0.1:{pprof_port}"])
cmd.extend(["--pprof-addr", f"127.0.0.1:{str(opm_pprof_port)}"])

cwd = os.path.abspath(os.path.join(catalog_dir, os.path.pardir))
result = (
Expand All @@ -305,11 +326,11 @@ def opm_serve(
return result


@create_port_filelocks
@create_port_filelocks(port_purposes=["opm_port", "opm_pprof_port"])
def opm_registry_serve(
opm_port: int,
db_path: str,
pprof_port: Optional[int] = None,
opm_pprof_port: Optional[int] = None,
) -> Tuple[int, subprocess.Popen]:
"""
Locally start OPM registry service, which can be communicated with using gRPC queries.
Expand All @@ -318,7 +339,7 @@ def opm_registry_serve(
binding conflicts. Resolution of port conflicts is handled in this function as well.
:param int opm_port: OPM port number obtained from create_port_filelock decorator
:param int pprof_port: Pprof opm port number obtained from create_port_filelock decorator
:param int opm_pprof_port: Pprof opm port number obtained from create_port_filelock decorator
:param str db_path: path to index database containing the registry data.
:return: tuple containing port number of the running service and the running Popen object.
:rtype: (int, Popen)
Expand All @@ -330,15 +351,15 @@ def opm_registry_serve(
'registry',
'serve',
'-p',
opm_port,
str(opm_port),
'-d',
db_path,
'-t',
'/dev/null',
]
if pprof_port:
if opm_pprof_port:
# by default opm uses the 127.0.0.1:6060
cmd.extend(["--pprof-addr", f"127.0.0.1:{pprof_port}"])
cmd.extend(["--pprof-addr", f"127.0.0.1:{str(opm_pprof_port)}"])

cwd = os.path.dirname(db_path)
result = (
Expand Down Expand Up @@ -695,7 +716,13 @@ def insert_cache_into_dockerfile(dockerfile_path: str) -> None:
verify_cache_insertion_edit_dockerfile(f.readlines())


def generate_cache_locally(base_dir: str, fbc_dir: str, local_cache_path: str) -> None:
@create_port_filelocks(port_purposes=["opm_pprof_port"])
def generate_cache_locally(
base_dir: str,
fbc_dir: str,
local_cache_path: str,
opm_pprof_port: Optional[int] = None,
) -> None:
"""
Generate the cache for the index image locally before building it.
Expand All @@ -719,6 +746,10 @@ def generate_cache_locally(base_dir: str, fbc_dir: str, local_cache_path: str) -
'/dev/null',
]

if opm_pprof_port:
# by default opm uses the 127.0.0.1:6060
cmd.extend(["--pprof-addr", f"127.0.0.1:{str(opm_pprof_port)}"])

log.info('Generating cache for the file-based catalog')
if os.path.exists(local_cache_path):
shutil.rmtree(local_cache_path)
Expand Down Expand Up @@ -1369,3 +1400,22 @@ def set_opm_version(cls, from_index: Optional[str] = None):
if index_version in opm_versions_config:
Opm.opm_version = opm_versions_config.get(index_version)
log.info("OPM version set to %s", Opm.opm_version)

@classmethod
def get_opm_version_number(cls):
"""
Get the opm version number to be used for the entire IIB operation.
:return: currently set-up Opm version number
:rtype: str
"""
log.info("Determining the OPM version number")

from iib.workers.tasks.utils import run_cmd

opm_version_output = run_cmd([Opm.opm_version, 'version'])
match = re.search(r'OpmVersion:"v([\d.]+)"', opm_version_output)
if match:
return match.group(1)
else:
raise IIBError("Opm version not found in the output of \"OPM version\" command")
Loading

0 comments on commit b6491cd

Please sign in to comment.