Skip to content

Commit

Permalink
Refactoring language_container_deployer
Browse files Browse the repository at this point in the history
  • Loading branch information
ahsimb committed Dec 1, 2023
1 parent ec5cabd commit 88c4647
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 211 deletions.
20 changes: 0 additions & 20 deletions exasol_transformers_extension/deployment/deployment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

DB_PASSWORD_ENVIRONMENT_VARIABLE = f"TE_DB_PASSWORD"
BUCKETFS_PASSWORD_ENVIRONMENT_VARIABLE = f"TE_BUCKETFS_PASSWORD"
SLC_NAME = "exasol_transformers_extension_container_release.tar.gz"
GH_RELEASE_URL = "https://github.com/exasol/transformers-extension/releases/download"


def load_and_render_statement(template_name, **kwargs) -> str:
Expand All @@ -27,16 +25,6 @@ def load_and_render_statement(template_name, **kwargs) -> str:
return statement


def _download_slc(tmp_dir: Path, version: str) -> Path:
url = "/".join((GH_RELEASE_URL, version, SLC_NAME))
response = requests.get(url, stream=True)
response.raise_for_status()
slc_path = Path(tmp_dir, SLC_NAME)
with open(slc_path, 'wb') as f:
f.write(response.content)
return slc_path


def get_websocket_ssl_options(use_ssl_cert_validation: bool, ssl_cert_path: str):
websocket_sslopt = {
"cert_reqs": ssl.CERT_REQUIRED,
Expand All @@ -47,11 +35,3 @@ def get_websocket_ssl_options(use_ssl_cert_validation: bool, ssl_cert_path: str)
if ssl_cert_path is not None:
websocket_sslopt["ca_certs"] = ssl_cert_path
return websocket_sslopt


@contextmanager
def get_container_file_from_github_release(version: str):
with tempfile.TemporaryDirectory() as tmp_dir:
container_file_path = _download_slc(tmp_dir, version)
yield container_file_path

212 changes: 158 additions & 54 deletions exasol_transformers_extension/deployment/language_container_deployer.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,56 @@
#########################################################
# To be migrated to the script-languages-container-tool #
#########################################################
from enum import Enum
import pyexasol
from textwrap import dedent
from typing import List, Optional
from pathlib import Path, PurePosixPath
from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation
import logging
from exasol_transformers_extension.utils.bucketfs_operations import \
create_bucketfs_location
from exasol_transformers_extension.deployment.deployment_utils import get_websocket_ssl_options
import tempfile
import requests
import ssl
import pyexasol
from exasol_bucketfs_utils_python.bucketfs_location import BucketFSLocation
from exasol_transformers_extension.utils.bucketfs_operations import create_bucketfs_location

logger = logging.getLogger(__name__)


def get_websocket_sslopt(use_ssl_cert_validation: bool = True,
ssl_trusted_ca: Optional[str] = None,
ssl_client_certificate: Optional[str] = None,
ssl_private_key: Optional[str] = None) -> dict:
"""
Returns a dictionary in the winsocket-client format
(see https://websocket-client.readthedocs.io/en/latest/faq.html#what-else-can-i-do-with-sslopts)
"""

# Is server certificate validation required?
sslopt: dict[str, object] = {"cert_reqs": ssl.CERT_REQUIRED if use_ssl_cert_validation else ssl.CERT_NONE}

# Is a bundle with trusted CAs provided?
if ssl_trusted_ca:
trusted_ca_path = Path(ssl_trusted_ca)
if trusted_ca_path.is_dir():
sslopt["ca_cert_path"] = ssl_trusted_ca
elif trusted_ca_path.is_file():
sslopt["ca_certs"] = ssl_trusted_ca
else:
raise ValueError(f"Trusted CA location {ssl_trusted_ca} doesn't exist.")

# Is client's own certificate provided?
if ssl_client_certificate:
if not Path(ssl_client_certificate).is_file():
raise ValueError(f"Certificate file {ssl_client_certificate} doesn't exist.")
sslopt["certfile"] = ssl_client_certificate
if ssl_private_key:
if not Path(ssl_private_key).is_file():
raise ValueError(f"Private key file {ssl_private_key} doesn't exist.")
sslopt["keyfile"] = ssl_private_key

return sslopt


class LanguageActivationLevel(Enum):
f"""
Language activation level, i.e.
Expand All @@ -21,82 +61,149 @@ class LanguageActivationLevel(Enum):


class LanguageContainerDeployer:

def __init__(self,
pyexasol_connection: pyexasol.ExaConnection,
language_alias: str,
bucketfs_location: BucketFSLocation,
container_file: Path):
self._container_file = container_file
bucketfs_location: BucketFSLocation) -> None:

self._bucketfs_location = bucketfs_location
self._language_alias = language_alias
self._pyexasol_conn = pyexasol_connection
logger.debug(f"Init {LanguageContainerDeployer.__name__}")

def deploy_container(self, allow_override: bool = False) -> None:
def download_and_run(self, url: str,
bucket_file_path: str,
alter_system: bool = True,
allow_override: bool = False) -> None:
"""
Uploads the SLC and activates it at the SYSTEM level.
Downloads language container from the provided url to a temporary file and then deploys it.
See docstring on the `run` method for details on what is involved in the deployment.
allow_override - If True the activation of a language container with the same alias will be overriden,
otherwise a RuntimeException will be thrown.
url - Address where the container will be downloaded from.
bucket_file_path - Path within the designated bucket where the container should be uploaded.
alter_system - If True will try to activate the container at the System level.
allow_override - If True the activation of a language container with the same alias will be
overriden, otherwise a RuntimeException will be thrown.
"""
path_in_udf = self.upload_container()
self.activate_container(LanguageActivationLevel.System, allow_override, path_in_udf)

def upload_container(self) -> PurePosixPath:
with tempfile.NamedTemporaryFile() as tmp_file:
response = requests.get(url, stream=True)
response.raise_for_status()
tmp_file.write(response.content)

self.run(Path(tmp_file.name), bucket_file_path, alter_system, allow_override)

def run(self, container_file: Optional[Path] = None,
bucket_file_path: Optional[str] = None,
alter_system: bool = True,
allow_override: bool = False) -> None:
"""
Uploads the SLC.
Returns the path where the container is uploaded as it's seen by a UDF.
Deploys the language container. This includes two steps, both of which are optional:
- Uploading the container into the database. This step can be skipped if the container
has already been uploaded.
- Activating the container. This step may have to be skipped if the user does not have
System Privileges in the database. In that case two alternative activation SQL commands
will be printed on the console.
container_file - Path of the container tar.gz file in a local file system.
If not provided the container is assumed to be uploaded already.
bucket_file_path - Path within the designated bucket where the container should be uploaded.
If not specified the name of the container file will be used instead.
alter_system - If True will try to activate the container at the System level.
allow_override - If True the activation of a language container with the same alias will be
overriden, otherwise a RuntimeException will be thrown.
"""
if not self._container_file.is_file():
raise RuntimeError(f"Container file {self._container_file} "

if not bucket_file_path:
if not container_file:
raise ValueError('Either a container file or a bucket file path must be specified.')
bucket_file_path = container_file.name

if container_file:
self.upload_container(container_file, bucket_file_path)

if alter_system:
self.activate_container(bucket_file_path, LanguageActivationLevel.System, allow_override)
else:
message = dedent(f"""
In SQL, you can activate the SLC of the Transformers Extension
by using the following statements:
To activate the SLC only for the current session:
{self.generate_activation_command(bucket_file_path, LanguageActivationLevel.Session, True)}
To activate the SLC on the system:
{self.generate_activation_command(bucket_file_path, LanguageActivationLevel.System, True)}
""")
print(message)

def upload_container(self, container_file: Path,
bucket_file_path: Optional[str] = None) -> None:
"""
Uploads SLC to the BucketFS.
container_file - Path of the container tar.gz file in a local file system.
bucket_file_path - Path within the designated bucket where the container should be uploaded.
"""
if not container_file.is_file():
raise RuntimeError(f"Container file {container_file} "
f"is not a file.")
with open(self._container_file, "br") as f:
upload_uri, path_in_udf = \
self._bucketfs_location.upload_fileobj_to_bucketfs(
fileobj=f, bucket_file_path=self._container_file.name)
with open(container_file, "br") as f:
self._bucketfs_location.upload_fileobj_to_bucketfs(
fileobj=f, bucket_file_path=bucket_file_path)
logging.debug("Container is uploaded to bucketfs")
return PurePosixPath(path_in_udf)

def activate_container(self, alter_type: LanguageActivationLevel = LanguageActivationLevel.Session,
allow_override: bool = False,
path_in_udf: Optional[PurePosixPath] = None) -> None:
def activate_container(self, bucket_file_path: str,
alter_type: LanguageActivationLevel = LanguageActivationLevel.Session,
allow_override: bool = False) -> None:
"""
Activates the SLC container at the required level.
alter_type - Language activation level, defaults to the SESSION.
allow_override - If True the activation of a language container with the same alias will be overriden,
otherwise a RuntimeException will be thrown.
path_in_udf - If known, a path where the container is uploaded as it's seen by a UDF.
bucket_file_path - Path within the designated bucket where the container is uploaded.
alter_type - Language activation level, defaults to the SESSION.
allow_override - If True the activation of a language container with the same alias will be overriden,
otherwise a RuntimeException will be thrown.
"""
alter_command = self.generate_activation_command(alter_type, allow_override, path_in_udf)
alter_command = self.generate_activation_command(bucket_file_path, alter_type, allow_override)
self._pyexasol_conn.execute(alter_command)
logging.debug(alter_command)

def generate_activation_command(self, alter_type: LanguageActivationLevel,
allow_override: bool = False,
path_in_udf: Optional[PurePosixPath] = None) -> str:
def generate_activation_command(self, bucket_file_path: str,
alter_type: LanguageActivationLevel,
allow_override: bool = False) -> str:
"""
Generates an SQL command to activate the SLC container at the required level. The command will
preserve existing activations of other containers identified by different language aliases.
Activation of a container with the same alias, if exists, will be overwritten.
alter_type - Activation level - SYSTEM or SESSION.
allow_override - If True the activation of a language container with the same alias will be overriden,
otherwise a RuntimeException will be thrown.
path_in_udf - If known, a path where the container is uploaded as it's seen by a UDF.
bucket_file_path - Path within the designated bucket where the container is uploaded.
alter_type - Activation level - SYSTEM or SESSION.
allow_override - If True the activation of a language container with the same alias will be overriden,
otherwise a RuntimeException will be thrown.
"""
if path_in_udf is None:
path_in_udf = self._bucketfs_location.generate_bucket_udf_path(self._container_file.name)
path_in_udf = self._bucketfs_location.generate_bucket_udf_path(bucket_file_path)
new_settings = \
self._update_previous_language_settings(alter_type, allow_override, path_in_udf)
alter_command = \
f"ALTER {alter_type.value} SET SCRIPT_LANGUAGES='{new_settings}';"
return alter_command

def get_language_settings(self, alter_type: LanguageActivationLevel) -> str:
"""
Reads the current language settings at the specified level.
alter_type - Activation level - SYSTEM or SESSION.
"""
result = self._pyexasol_conn.execute(
f"""SELECT "{alter_type.value}_VALUE" FROM SYS.EXA_PARAMETERS WHERE
PARAMETER_NAME='SCRIPT_LANGUAGES'""").fetchall()
return result[0][0]

def _update_previous_language_settings(self, alter_type: LanguageActivationLevel,
allow_override: bool,
path_in_udf: PurePosixPath) -> str:
prev_lang_settings = self._get_previous_language_settings(alter_type)
prev_lang_settings = self.get_language_settings(alter_type)
prev_lang_aliases = prev_lang_settings.split(" ")
self._check_if_requested_language_alias_already_exists(
allow_override, prev_lang_aliases)
Expand Down Expand Up @@ -131,20 +238,17 @@ def _check_if_requested_language_alias_already_exists(
else:
raise RuntimeError(warning_message)

def _get_previous_language_settings(self, alter_type: LanguageActivationLevel) -> str:
result = self._pyexasol_conn.execute(
f"""SELECT "{alter_type.value}_VALUE" FROM SYS.EXA_PARAMETERS WHERE
PARAMETER_NAME='SCRIPT_LANGUAGES'""").fetchall()
return result[0][0]

@classmethod
def create(cls, bucketfs_name: str, bucketfs_host: str, bucketfs_port: int,
bucketfs_use_https: bool, bucketfs_user: str, container_file: Path,
bucketfs_password: str, bucket: str, path_in_bucket: str,
dsn: str, db_user: str, db_password: str, language_alias: str,
ssl_cert_path: str = None, use_ssl_cert_validation: bool = True) -> "LanguageContainerDeployer":
bucketfs_use_https: bool, bucketfs_user: str,
bucketfs_password: str, bucket: str, path_in_bucket: str,
dsn: str, db_user: str, db_password: str, language_alias: str,
use_ssl_cert_validation: bool = True, ssl_trusted_ca: Optional[str] = None,
ssl_client_certificate: Optional[str] = None,
ssl_private_key: Optional[str] = None) -> "LanguageContainerDeployer":

websocket_sslopt = get_websocket_ssl_options(use_ssl_cert_validation, ssl_cert_path)
websocket_sslopt = get_websocket_sslopt(use_ssl_cert_validation, ssl_trusted_ca,
ssl_client_certificate, ssl_private_key)

pyexasol_conn = pyexasol.connect(
dsn=dsn,
Expand All @@ -158,4 +262,4 @@ def create(cls, bucketfs_name: str, bucketfs_host: str, bucketfs_port: int,
bucketfs_name, bucketfs_host, bucketfs_port, bucketfs_use_https,
bucketfs_user, bucketfs_password, bucket, path_in_bucket)

return cls(pyexasol_conn, language_alias, bucketfs_location, container_file)
return cls(pyexasol_conn, language_alias, bucketfs_location)
Loading

0 comments on commit 88c4647

Please sign in to comment.