Skip to content

Commit

Permalink
[Flink] Add option to use JobManager to store history of jobs (#151)
Browse files Browse the repository at this point in the history
Mount EFS to Job Managers so they can archive jobs for historical status lookups

Addresses #122

Related PR: pangeo-forge/pangeo-forge-cloud-federation#6

Co-authored-by: ranchodeluxe <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 28, 2023
1 parent 95ce6f4 commit 0401cb0
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 3 deletions.
101 changes: 98 additions & 3 deletions pangeo_forge_runner/bakery/flink.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Bakery for baking pangeo-forge recipes in GCP DataFlow
"""
import copy
import hashlib
import json
import shutil
Expand All @@ -10,7 +11,7 @@

import escapism
from apache_beam.pipeline import PipelineOptions
from traitlets import Dict, Integer, Unicode
from traitlets import Bool, Dict, Integer, Unicode

from .base import Bakery

Expand Down Expand Up @@ -150,13 +151,101 @@ class FlinkOperatorBakery(Bakery):
""",
)

enable_job_archiving = Bool(
False,
config=True,
help="""
Enable the ability for past jobs to be archived so the job
manager's REST API can still return information after completing
""",
)

job_archive_efs_mount = Unicode(
"/opt/history/jobs",
config=True,
help="""
The NFS mount path where past jobs are archived so the historyserver
REST API can return some information about job statuses after
job managers are torn down
The default path here corresponds to what the pangeo-forge-cloud-federation Terraform deploys as the mount path:
https://github.com/pangeo-forge/pangeo-forge-cloud-federation/blob/main/terraform/aws/operator.tf
If using that Terraform you can configure the path via `historyserver_mount_path`:
https://github.com/pangeo-forge/pangeo-forge-cloud-federation/blob/main/terraform/aws/variables.tf
""",
)

def add_job_manager_pod_template(self, current_flink_deploy: dict):
"""Add the job manager pod template to the existing flink deploy
Returns:
a dictionary representing the whole `kind: flinkdeployment`
"""
pod_template = {
"podTemplate": {
"spec": {
"securityContext": {
# flink uid/guid
"fsGroup": 9999
},
"initContainers": [
{
"name": "efs-mount-ownership-fix",
"image": "busybox:1.36.1",
"command": [
"sh",
"-c",
# makes sure the flink uid/gid is owner of the archival mount
f"chown 9999:9999 {self.job_archive_efs_mount} && ls -lhd {self.job_archive_efs_mount}",
],
"volumeMounts": [
{
"name": "efs-flink-history",
"mountPath": f"{self.job_archive_efs_mount}",
}
],
}
],
"containers": [
{
# NOTE: "name" and "image" are required here
# and were taken from existing deployed manifests
# all other attributes get back-filled in by the operator
"name": "flink-main-container",
"image": f"flink:{self.flink_version}",
"volumeMounts": [
{
"name": "efs-flink-history",
"mountPath": f"{self.job_archive_efs_mount}",
}
],
}
],
"volumes": [
{
"name": "efs-flink-history",
"persistentVolumeClaim": {
"claimName": "flink-historyserver-efs-pvc"
},
}
],
}
},
}

# shallow copy
new_flink_deploy = copy.deepcopy(current_flink_deploy)
new_flink_deploy["spec"]["jobManager"].update(pod_template)
return new_flink_deploy

def make_flink_deployment(self, name: str, worker_image: str):
"""
Return YAML for a FlinkDeployment
"""
image = f"flink:{self.flink_version}"
flink_version_str = f'v{self.flink_version.replace(".", "_")}'
return {
flink_deploy = {
"apiVersion": "flink.apache.org/v1beta1",
"kind": "FlinkDeployment",
"metadata": {"name": name},
Expand All @@ -165,7 +254,9 @@ def make_flink_deployment(self, name: str, worker_image: str):
"flinkVersion": flink_version_str,
"flinkConfiguration": self.flink_configuration,
"serviceAccount": "flink",
"jobManager": {"resource": self.job_manager_resources},
"jobManager": {
"resource": self.job_manager_resources,
},
"taskManager": {
"replicas": 5,
"resource": self.task_manager_resources,
Expand All @@ -192,6 +283,10 @@ def make_flink_deployment(self, name: str, worker_image: str):
},
}

if self.enable_job_archiving:
flink_deploy = self.add_job_manager_pod_template(flink_deploy)
return flink_deploy

def get_pipeline_options(
self, job_name: str, container_image: str, extra_options: dict
) -> PipelineOptions:
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_flink.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from unittest.mock import patch

import pytest

from pangeo_forge_runner.bakery.flink import FlinkOperatorBakery


Expand All @@ -23,3 +25,26 @@ def test_pipelineoptions():

assert opts["parallelism"] == 100
assert opts["max_parallelism"] == 100


@pytest.mark.parametrize(
"archiving_enabled, deploy_name, container_image",
(
[False, "archive_disabled", "apache/beam_python3.10_sdk:2.51.0"],
[True, "archive_enabled", "apache/beam_python3.10_sdk:2.51.0"],
),
)
def test_make_flink_deployment(archiving_enabled, deploy_name, container_image):
"""test paths for enabled job archiving"""
fbake = FlinkOperatorBakery()
fbake.enable_job_archiving = archiving_enabled
print(deploy_name, container_image)
manifest = fbake.make_flink_deployment(deploy_name, container_image)
if archiving_enabled:
pod_template = manifest["spec"]["jobManager"].get("podTemplate")
assert pod_template is not None
for key in ["securityContext", "containers", "initContainers", "volumes"]:
assert key in pod_template["spec"]
else:
pod_template = manifest["spec"]["jobManager"].get("podTemplate")
assert pod_template is None

0 comments on commit 0401cb0

Please sign in to comment.