diff --git a/pangeo_forge_runner/bakery/flink.py b/pangeo_forge_runner/bakery/flink.py index 594a035..eeb2560 100644 --- a/pangeo_forge_runner/bakery/flink.py +++ b/pangeo_forge_runner/bakery/flink.py @@ -1,6 +1,7 @@ """ Bakery for baking pangeo-forge recipes in GCP DataFlow """ +import copy import hashlib import json import shutil @@ -10,7 +11,7 @@ import escapism from apache_beam.pipeline import PipelineOptions -from traitlets import Dict, Integer, Unicode +from traitlets import Bool, Dict, Integer, Unicode from .base import Bakery @@ -150,13 +151,101 @@ class FlinkOperatorBakery(Bakery): """, ) + enable_job_archiving = Bool( + False, + config=True, + help=""" + Enable the ability for past jobs to be archived so the job + manager's REST API can still return information after completing + """, + ) + + job_archive_efs_mount = Unicode( + "/opt/history/jobs", + config=True, + help=""" + The NFS mount path where past jobs are archived so the historyserver + REST API can return some information about job statuses after + job managers are torn down + + The default path here corresponds to what the pangeo-forge-cloud-federation Terraform deploys as the mount path: + https://github.com/pangeo-forge/pangeo-forge-cloud-federation/blob/main/terraform/aws/operator.tf + + If using that Terraform you can configure the path via `historyserver_mount_path`: + https://github.com/pangeo-forge/pangeo-forge-cloud-federation/blob/main/terraform/aws/variables.tf + """, + ) + + def add_job_manager_pod_template(self, current_flink_deploy: dict): + """Add the job manager pod template to the existing flink deploy + + Returns: + a dictionary representing the whole `kind: flinkdeployment` + """ + pod_template = { + "podTemplate": { + "spec": { + "securityContext": { + # flink uid/guid + "fsGroup": 9999 + }, + "initContainers": [ + { + "name": "efs-mount-ownership-fix", + "image": "busybox:1.36.1", + "command": [ + "sh", + "-c", + # makes sure the flink uid/gid is owner of the archival mount + f"chown 9999:9999 {self.job_archive_efs_mount} && ls -lhd {self.job_archive_efs_mount}", + ], + "volumeMounts": [ + { + "name": "efs-flink-history", + "mountPath": f"{self.job_archive_efs_mount}", + } + ], + } + ], + "containers": [ + { + # NOTE: "name" and "image" are required here + # and were taken from existing deployed manifests + # all other attributes get back-filled in by the operator + "name": "flink-main-container", + "image": f"flink:{self.flink_version}", + "volumeMounts": [ + { + "name": "efs-flink-history", + "mountPath": f"{self.job_archive_efs_mount}", + } + ], + } + ], + "volumes": [ + { + "name": "efs-flink-history", + "persistentVolumeClaim": { + "claimName": "flink-historyserver-efs-pvc" + }, + } + ], + } + }, + } + + # shallow copy + new_flink_deploy = copy.deepcopy(current_flink_deploy) + new_flink_deploy["spec"]["jobManager"].update(pod_template) + return new_flink_deploy + def make_flink_deployment(self, name: str, worker_image: str): """ Return YAML for a FlinkDeployment """ image = f"flink:{self.flink_version}" flink_version_str = f'v{self.flink_version.replace(".", "_")}' - return { + flink_deploy = { "apiVersion": "flink.apache.org/v1beta1", "kind": "FlinkDeployment", "metadata": {"name": name}, @@ -165,7 +254,9 @@ def make_flink_deployment(self, name: str, worker_image: str): "flinkVersion": flink_version_str, "flinkConfiguration": self.flink_configuration, "serviceAccount": "flink", - "jobManager": {"resource": self.job_manager_resources}, + "jobManager": { + "resource": self.job_manager_resources, + }, "taskManager": { "replicas": 5, "resource": self.task_manager_resources, @@ -192,6 +283,10 @@ def make_flink_deployment(self, name: str, worker_image: str): }, } + if self.enable_job_archiving: + flink_deploy = self.add_job_manager_pod_template(flink_deploy) + return flink_deploy + def get_pipeline_options( self, job_name: str, container_image: str, extra_options: dict ) -> PipelineOptions: diff --git a/tests/unit/test_flink.py b/tests/unit/test_flink.py index b9d94fd..d9f3c73 100644 --- a/tests/unit/test_flink.py +++ b/tests/unit/test_flink.py @@ -1,5 +1,7 @@ from unittest.mock import patch +import pytest + from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery @@ -23,3 +25,26 @@ def test_pipelineoptions(): assert opts["parallelism"] == 100 assert opts["max_parallelism"] == 100 + + +@pytest.mark.parametrize( + "archiving_enabled, deploy_name, container_image", + ( + [False, "archive_disabled", "apache/beam_python3.10_sdk:2.51.0"], + [True, "archive_enabled", "apache/beam_python3.10_sdk:2.51.0"], + ), +) +def test_make_flink_deployment(archiving_enabled, deploy_name, container_image): + """test paths for enabled job archiving""" + fbake = FlinkOperatorBakery() + fbake.enable_job_archiving = archiving_enabled + print(deploy_name, container_image) + manifest = fbake.make_flink_deployment(deploy_name, container_image) + if archiving_enabled: + pod_template = manifest["spec"]["jobManager"].get("podTemplate") + assert pod_template is not None + for key in ["securityContext", "containers", "initContainers", "volumes"]: + assert key in pod_template["spec"] + else: + pod_template = manifest["spec"]["jobManager"].get("podTemplate") + assert pod_template is None