Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] Add option to use JobManager to store history of jobs #151

Merged
merged 17 commits into from
Nov 28, 2023
Merged
101 changes: 98 additions & 3 deletions pangeo_forge_runner/bakery/flink.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Bakery for baking pangeo-forge recipes in GCP DataFlow
"""
import copy
import hashlib
import json
import shutil
Expand All @@ -10,7 +11,7 @@

import escapism
from apache_beam.pipeline import PipelineOptions
from traitlets import Dict, Integer, Unicode
from traitlets import Bool, Dict, Integer, Unicode

from .base import Bakery

Expand Down Expand Up @@ -150,13 +151,101 @@ class FlinkOperatorBakery(Bakery):
""",
)

enable_job_archiving = Bool(
False,
config=True,
help="""
Enable the ability for past jobs to be archived so the job
manager's REST API can still return information after completing
""",
)

job_archive_efs_mount = Unicode(
"/opt/history/jobs",
config=True,
help="""
The NFS mount path where past jobs are archived so the historyserver
REST API can return some information about job statuses after
job managers are torn down

The default path here corresponds to what the pangeo-forge-cloud-federation Terraform deploys as the mount path:
https://github.com/pangeo-forge/pangeo-forge-cloud-federation/blob/main/terraform/aws/operator.tf

If using that Terraform you can configure the path via `historyserver_mount_path`:
https://github.com/pangeo-forge/pangeo-forge-cloud-federation/blob/main/terraform/aws/variables.tf
""",
)

def add_job_manager_pod_template(self, current_flink_deploy: dict):
"""Add the job manager pod template to the existing flink deploy

Returns:
a dictionary representing the whole `kind: flinkdeployment`
"""
pod_template = {
"podTemplate": {
"spec": {
"securityContext": {
# flink uid/guid
"fsGroup": 9999
},
"initContainers": [
{
"name": "efs-mount-ownership-fix",
"image": "busybox:1.36.1",
"command": [
"sh",
"-c",
# makes sure the flink uid/gid is owner of the archival mount
f"chown 9999:9999 {self.job_archive_efs_mount} && ls -lhd {self.job_archive_efs_mount}",
],
"volumeMounts": [
{
"name": "efs-flink-history",
"mountPath": f"{self.job_archive_efs_mount}",
}
],
}
],
"containers": [
{
# NOTE: "name" and "image" are required here
# and were taken from existing deployed manifests
# all other attributes get back-filled in by the operator
"name": "flink-main-container",
"image": f"flink:{self.flink_version}",
"volumeMounts": [
{
"name": "efs-flink-history",
"mountPath": f"{self.job_archive_efs_mount}",
}
],
}
],
"volumes": [
{
"name": "efs-flink-history",
"persistentVolumeClaim": {
"claimName": "flink-historyserver-efs-pvc"
},
}
],
}
},
}

# shallow copy
new_flink_deploy = copy.deepcopy(current_flink_deploy)
new_flink_deploy["spec"]["jobManager"].update(pod_template)
return new_flink_deploy

def make_flink_deployment(self, name: str, worker_image: str):
"""
Return YAML for a FlinkDeployment
"""
image = f"flink:{self.flink_version}"
flink_version_str = f'v{self.flink_version.replace(".", "_")}'
return {
flink_deploy = {
"apiVersion": "flink.apache.org/v1beta1",
"kind": "FlinkDeployment",
"metadata": {"name": name},
Expand All @@ -165,7 +254,9 @@ def make_flink_deployment(self, name: str, worker_image: str):
"flinkVersion": flink_version_str,
"flinkConfiguration": self.flink_configuration,
"serviceAccount": "flink",
"jobManager": {"resource": self.job_manager_resources},
"jobManager": {
"resource": self.job_manager_resources,
},
"taskManager": {
"replicas": 5,
"resource": self.task_manager_resources,
Expand All @@ -192,6 +283,10 @@ def make_flink_deployment(self, name: str, worker_image: str):
},
}

if self.enable_job_archiving:
flink_deploy = self.add_job_manager_pod_template(flink_deploy)
return flink_deploy

def get_pipeline_options(
self, job_name: str, container_image: str, extra_options: dict
) -> PipelineOptions:
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_flink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from unittest.mock import patch

import pytest

from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery


Expand All @@ -23,3 +25,26 @@ def test_pipelineoptions():

assert opts["parallelism"] == 100
assert opts["max_parallelism"] == 100


@pytest.mark.parametrize(
"archiving_enabled, deploy_name, container_image",
(
[False, "archive_disabled", "apache/beam_python3.10_sdk:2.51.0"],
[True, "archive_enabled", "apache/beam_python3.10_sdk:2.51.0"],
),
)
def test_make_flink_deployment(archiving_enabled, deploy_name, container_image):
"""test paths for enabled job archiving"""
fbake = FlinkOperatorBakery()
fbake.enable_job_archiving = archiving_enabled
print(deploy_name, container_image)
manifest = fbake.make_flink_deployment(deploy_name, container_image)
if archiving_enabled:
pod_template = manifest["spec"]["jobManager"].get("podTemplate")
assert pod_template is not None
for key in ["securityContext", "containers", "initContainers", "volumes"]:
assert key in pod_template["spec"]
else:
pod_template = manifest["spec"]["jobManager"].get("podTemplate")
assert pod_template is None
Loading