Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save Jobs History on Flink #6

Merged
merged 14 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions helm-charts/flink-historyserver/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
24 changes: 24 additions & 0 deletions helm-charts/flink-historyserver/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: v2
name: flink-historyserver
description: A Helm chart for Kubernetes

# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.2

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.1.2"
62 changes: 62 additions & 0 deletions helm-charts/flink-historyserver/templates/_helpers.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "flink-historyserver.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}

{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "flink-historyserver.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}

{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "flink-historyserver.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}

{{/*
Common labels
*/}}
{{- define "flink-historyserver.labels" -}}
helm.sh/chart: {{ include "flink-historyserver.chart" . }}
{{ include "flink-historyserver.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}

{{/*
Selector labels
*/}}
{{- define "flink-historyserver.selectorLabels" -}}
app.kubernetes.io/name: {{ include "flink-historyserver.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}

{{/*
Create the name of the service account to use
*/}}
{{- define "flink-historyserver.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "flink-historyserver.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}
37 changes: 37 additions & 0 deletions helm-charts/flink-historyserver/templates/efs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: {{ .Release.Name }}-efs-flink-history
parameters:
provisioningMode: efs-ap
fileSystemId: "{{- .Values.efsFileSystemId }}"
provisioner: "efs.csi.aws.com"
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: flink-historyserver-efs-pv
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this has 'historyserver' in the name, so it gets used specifically just for this and not much more :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistentVolume is also not namespaced, and should get same treatment as StorageClass with .Release.Name. sorry for not catching that earlier.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ranchodeluxe I think this still needs to be fixed?

spec:
capacity:
storage: "1Mi"
volumeMode: "Filesystem"
accessModes:
- "ReadWriteMany"
# 'persistentVolumeReclaimPolicy' means EFS volumes must be manually cleaned up when testing is done
persistentVolumeReclaimPolicy: Retain
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
storageClassName: {{ .Release.Name }}-efs-flink-history
csi:
driver: "efs.csi.aws.com"
volumeHandle: "{{- .Values.efsFileSystemId }}"
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-historyserver-efs-pvc
spec:
accessModes:
- ReadWriteMany
storageClassName: {{ .Release.Name }}-efs-flink-history
resources:
requests:
storage: 1Mi
98 changes: 98 additions & 0 deletions helm-charts/flink-historyserver/templates/historyserver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: historyserver
name: historyserver
spec:
replicas: 1
revisionHistoryLimit: 10
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
selector:
matchLabels:
app: historyserver
template:
metadata:
labels:
app: historyserver
spec:
containers:
- args:
- history-server
command:
- /docker-entrypoint.sh
env:
- name: _POD_IP_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
image: flink:{{- .Values.flinkVersion }}
name: flink-main-container
ports:
- containerPort: 8082
name: history
protocol: TCP
resources:
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
limits:
cpu: 300m
memory: 1536Mi
requests:
cpu: 300m
memory: 1536Mi
startupProbe:
httpGet:
path: /config
port: history
volumeMounts:
- mountPath: /opt/history/jobs
name: efs-flink-history
- mountPath: /opt/flink/conf
name: flink-config-volume
initContainers:
- command:
- sh
- -c
- chown 9999:9999 /opt/history/jobs && ls -lhd /opt/history/jobs
image: busybox:1.36.1
imagePullPolicy: IfNotPresent
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
name: efs-mount-ownership-fix
resources: {}
volumeMounts:
- mountPath: /opt/history/jobs
name: efs-flink-history
schedulerName: default-scheduler
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
securityContext:
fsGroup: 9999
# NOTE: this SA is set up by the flink-operator helm chart and reused here
serviceAccountName: flink
terminationGracePeriodSeconds: 30
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
volumes:
- name: efs-flink-history
persistentVolumeClaim:
claimName: flink-historyserver-efs-pvc
- configMap:
items:
- key: log4j-console.properties
path: log4j-console.properties
- key: flink-conf.yaml
path: flink-conf.yaml
# NOTE: this configmap is set up by the flink-operator helm chart and reused here
name: flink-operator-config
name: flink-config-volume
---
apiVersion: v1
kind: Service
metadata:
labels:
app: historyserver
name: historyserver-rest
spec:
ports:
- name: history
port: 8082
targetPort: 8082
selector:
app: historyserver
type: ClusterIP
status:
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
loadBalancer: {}
Empty file.
11 changes: 11 additions & 0 deletions helm-charts/flink-historyserver/values.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["efsFileSystemId"],
"properties": {
"efsFileSystemId": {
"type": "string",
"minLength": 1
}
}
}
2 changes: 2 additions & 0 deletions helm-charts/flink-historyserver/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
efsFileSystemId: ""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for schema items that we mark as required, we should not specify them here. That way, when the user tries to deploy this without those set, the schema setup will kick in and complain - I think specifying empty values here negates that.

Copy link
Collaborator Author

@ranchodeluxe ranchodeluxe Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minLength in values.schema.json handles validation for this and two of them were unneeded given changes that weren't in this branch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think undoing the 'required' is much cleaner than enforcing them via minLength: 1. If there is a future change coming in that makes these required, we can modify the schema at that point, no?

flinkVersion: "1.16"
1 change: 1 addition & 0 deletions terraform/aws/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.yaml
ranchodeluxe marked this conversation as resolved.
Show resolved Hide resolved
24 changes: 24 additions & 0 deletions terraform/aws/addons.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,28 @@ resource "aws_eks_addon" "ebs_provisioner" {
depends_on = [
aws_iam_role_policy_attachment.ebs_provisioner
]
}

# EFS CSI Driver for HistoryServer
resource "aws_iam_role" "efs_provisioner" {
name = "${var.cluster_name}-eks-efs-provisioner"
assume_role_policy = data.aws_iam_policy_document.assume_role_with_oidc.json
}

resource "aws_iam_role_policy_attachment" "efs_provisioner" {
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEFSCSIDriverPolicy"
role = aws_iam_role.efs_provisioner.name
}

resource "aws_eks_addon" "efs_provisioner" {
cluster_name = aws_eks_cluster.cluster.name
addon_name = "aws-efs-csi-driver"
# Fetch the most recent version for your current version of k8s
# AWS_PROFILE=<your-profile> eksctl utils describe-addon-versions --kubernetes-version 1.27 -v0 | jq '.Addons[] | select(.AddonName == "aws-efs-csi-driver") | .AddonVersions[0]'
addon_version = "v1.7.0-eksbuild.1"
resolve_conflicts_on_create = "OVERWRITE"
service_account_role_arn = aws_iam_role.efs_provisioner.arn
depends_on = [
aws_iam_role_policy_attachment.efs_provisioner
]
}
2 changes: 1 addition & 1 deletion terraform/aws/cluster.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ resource "aws_iam_openid_connect_provider" "cluster_oidc" {
module "cluster_autoscaler_irsa" {
source = "terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks"

role_name = "cluster_autoscaler"
role_name = "${var.cluster_name}_cluster_autoscaler"

attach_cluster_autoscaler_policy = true
cluster_autoscaler_cluster_ids = [
Expand Down
10 changes: 10 additions & 0 deletions terraform/aws/efs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
resource "aws_efs_file_system" "job_history" {
creation_token = "${var.cluster_name}-flink-job-history"
}

resource "aws_efs_mount_target" "job_history" {
for_each = toset(data.aws_subnets.default.ids)
file_system_id = aws_efs_file_system.job_history.id
subnet_id = each.value
security_groups = [aws_eks_cluster.cluster.vpc_config[0].cluster_security_group_id]
}
35 changes: 35 additions & 0 deletions terraform/aws/helm_historyserver.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
provider "kubernetes" {
host = aws_eks_cluster.cluster.endpoint
cluster_ca_certificate = base64decode(aws_eks_cluster.cluster.certificate_authority[0].data)
token = data.aws_eks_cluster_auth.cluster.token
}

data "aws_eks_cluster_auth" "cluster" {
name = "${aws_eks_cluster.cluster.name}"
depends_on = [
aws_eks_cluster.cluster,
helm_release.flink_operator
]
}

resource "helm_release" "flink_historyserver" {
name = "flink-historyserver"
chart = "../../helm-charts/flink-historyserver"
create_namespace = false

set {
name = "efsFileSystemId"
value = "${aws_efs_file_system.job_history.id}"
}

set {
name = "flinkVersion"
value = "${var.flink_version}"
}

wait = true
depends_on = [
aws_eks_cluster.cluster,
helm_release.flink_operator
]
}
4 changes: 4 additions & 0 deletions terraform/aws/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ data "aws_subnets" "default" {
values = [data.aws_vpc.default.id]
}
}
data "aws_security_group" "default" {
vpc_id = data.aws_vpc.default.id
name = "default"
}

11 changes: 6 additions & 5 deletions terraform/aws/operator.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ resource "helm_release" "cert_manager" {
]
}


resource "helm_release" "flink_operator" {
name = "flink-operator"
repository = "https://downloads.apache.org/flink/flink-kubernetes-operator-${var.flink_operator_version}"
Expand All @@ -43,12 +42,14 @@ resource "helm_release" "flink_operator" {
set {
name = "defaultConfiguration.flink-conf\\.yaml"
value = yamlencode({
"kubernetes.operator.metrics.reporter.prom.class" : "org.apache.flink.metrics.prometheus.PrometheusReporter",
"kubernetes.operator.metrics.reporter.prom.port" : "9999",
"kubernetes.operator.metrics.reporter.prom.factory.class" : "org.apache.flink.metrics.prometheus.PrometheusReporter",
"kubernetes.operator.metrics.reporter.prom.factory.port" : "9999",
"kubernetes.jobmanager.annotations" : {
"prometheus.io/scrape" : "true"
"prometheus.io/scrape" : "true",
"prometheus.io/port" : "9999"
}
},
"jobmanager.archive.fs.dir": var.historyserver_mount_path,
"historyserver.archive.fs.dir": var.historyserver_mount_path,
})
}

Expand Down
Loading