diff --git a/helm-charts/flink-historyserver/.helmignore b/helm-charts/flink-historyserver/.helmignore new file mode 100644 index 0000000..0e8a0eb --- /dev/null +++ b/helm-charts/flink-historyserver/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/helm-charts/flink-historyserver/Chart.yaml b/helm-charts/flink-historyserver/Chart.yaml new file mode 100644 index 0000000..aad6ff0 --- /dev/null +++ b/helm-charts/flink-historyserver/Chart.yaml @@ -0,0 +1,24 @@ +apiVersion: v2 +name: flink-historyserver +description: A Helm chart for Kubernetes + +# A chart can be either an 'application' or a 'library' chart. +# +# Application charts are a collection of templates that can be packaged into versioned archives +# to be deployed. +# +# Library charts provide useful utilities or functions for the chart developer. They're included as +# a dependency of application charts to inject those utilities and functions into the rendering +# pipeline. Library charts do not define any templates and therefore cannot be deployed. +type: application + +# This is the chart version. This version number should be incremented each time you make changes +# to the chart and its templates, including the app version. +# Versions are expected to follow Semantic Versioning (https://semver.org/) +version: 0.1.2 + +# This is the version number of the application being deployed. This version number should be +# incremented each time you make changes to the application. Versions are not expected to +# follow Semantic Versioning. They should reflect the version the application is using. +# It is recommended to use it with quotes. +appVersion: "0.1.2" diff --git a/helm-charts/flink-historyserver/templates/_helpers.tpl b/helm-charts/flink-historyserver/templates/_helpers.tpl new file mode 100644 index 0000000..91cae44 --- /dev/null +++ b/helm-charts/flink-historyserver/templates/_helpers.tpl @@ -0,0 +1,62 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "flink-historyserver.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +If release name contains chart name it will be used as a full name. +*/}} +{{- define "flink-historyserver.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "flink-historyserver.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "flink-historyserver.labels" -}} +helm.sh/chart: {{ include "flink-historyserver.chart" . }} +{{ include "flink-historyserver.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "flink-historyserver.selectorLabels" -}} +app.kubernetes.io/name: {{ include "flink-historyserver.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{/* +Create the name of the service account to use +*/}} +{{- define "flink-historyserver.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "flink-historyserver.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/helm-charts/flink-historyserver/templates/efs.yaml b/helm-charts/flink-historyserver/templates/efs.yaml new file mode 100644 index 0000000..b276f9f --- /dev/null +++ b/helm-charts/flink-historyserver/templates/efs.yaml @@ -0,0 +1,37 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: {{ .Release.Name }}-efs-flink-history-sc +parameters: + provisioningMode: efs-ap + fileSystemId: "{{- .Values.efsFileSystemId }}" +provisioner: "efs.csi.aws.com" +--- +apiVersion: v1 +kind: PersistentVolume +metadata: + name: {{ .Release.Name }}-flink-historyserver-efs-pv +spec: + capacity: + storage: "1Mi" + volumeMode: "Filesystem" + accessModes: + - "ReadWriteMany" + # 'persistentVolumeReclaimPolicy' means EFS volumes must be manually cleaned up when testing is done + persistentVolumeReclaimPolicy: Retain + storageClassName: {{ .Release.Name }}-efs-flink-history + csi: + driver: "efs.csi.aws.com" + volumeHandle: "{{- .Values.efsFileSystemId }}" +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: flink-historyserver-efs-pvc +spec: + accessModes: + - ReadWriteMany + storageClassName: {{ .Release.Name }}-efs-flink-history-sc + resources: + requests: + storage: 1Mi diff --git a/helm-charts/flink-historyserver/templates/historyserver.yaml b/helm-charts/flink-historyserver/templates/historyserver.yaml new file mode 100644 index 0000000..86e4267 --- /dev/null +++ b/helm-charts/flink-historyserver/templates/historyserver.yaml @@ -0,0 +1,87 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: historyserver + name: historyserver +spec: + replicas: 1 + selector: + matchLabels: + app: historyserver + template: + metadata: + labels: + app: historyserver + spec: + containers: + - args: + - history-server + command: + - /docker-entrypoint.sh + env: + - name: _POD_IP_ADDRESS + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: status.podIP + image: flink:{{- .Values.flinkVersion }} + name: flink-main-container + ports: + - containerPort: 8082 + name: history + protocol: TCP + resources: +{{ toYaml .Values.resources | indent 10 }} + startupProbe: + httpGet: + path: /config + port: history + volumeMounts: + - mountPath: /opt/history/jobs + name: efs-flink-history + - mountPath: /opt/flink/conf + name: flink-config-volume + initContainers: + - command: + - sh + - -c + - chown 9999:9999 /opt/history/jobs && ls -lhd /opt/history/jobs + image: busybox:1.36.1 + name: efs-mount-ownership-fix + resources: {} + volumeMounts: + - mountPath: /opt/history/jobs + name: efs-flink-history + securityContext: + fsGroup: 9999 + # NOTE: this SA is set up by the flink-operator helm chart and reused here + serviceAccountName: flink + volumes: + - name: efs-flink-history + persistentVolumeClaim: + claimName: flink-historyserver-efs-pvc + - configMap: + items: + - key: log4j-console.properties + path: log4j-console.properties + - key: flink-conf.yaml + path: flink-conf.yaml + # NOTE: this configmap is set up by the flink-operator helm chart and reused here + name: flink-operator-config + name: flink-config-volume +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: historyserver + name: historyserver-rest +spec: + ports: + - name: history + port: 8082 + targetPort: 8082 + selector: + app: historyserver + type: ClusterIP diff --git a/helm-charts/flink-historyserver/templates/tests/README.md b/helm-charts/flink-historyserver/templates/tests/README.md new file mode 100644 index 0000000..e69de29 diff --git a/helm-charts/flink-historyserver/values.schema.json b/helm-charts/flink-historyserver/values.schema.json new file mode 100644 index 0000000..16cf6a6 --- /dev/null +++ b/helm-charts/flink-historyserver/values.schema.json @@ -0,0 +1,10 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["efsFileSystemId"], + "properties": { + "efsFileSystemId": { + "type": "string" + } + } +} diff --git a/helm-charts/flink-historyserver/values.yaml b/helm-charts/flink-historyserver/values.yaml new file mode 100644 index 0000000..8dc13a8 --- /dev/null +++ b/helm-charts/flink-historyserver/values.yaml @@ -0,0 +1,8 @@ +flinkVersion: "1.16" +resources: + limits: + cpu: 300m + memory: 1536Mi + requests: + cpu: 300m + memory: 1536Mi diff --git a/terraform/aws/addons.tf b/terraform/aws/addons.tf index 70448f4..c8f48d9 100644 --- a/terraform/aws/addons.tf +++ b/terraform/aws/addons.tf @@ -38,3 +38,28 @@ resource "aws_eks_addon" "ebs_provisioner" { aws_iam_role_policy_attachment.ebs_provisioner ] } + +# EFS CSI Driver for HistoryServer +resource "aws_iam_role" "efs_provisioner" { + name = "${var.cluster_name}-eks-efs-provisioner" + assume_role_policy = data.aws_iam_policy_document.assume_role_with_oidc.json +} + +resource "aws_iam_role_policy_attachment" "efs_provisioner" { + policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonEFSCSIDriverPolicy" + role = aws_iam_role.efs_provisioner.name +} + +resource "aws_eks_addon" "efs_provisioner" { + cluster_name = aws_eks_cluster.cluster.name + addon_name = "aws-efs-csi-driver" + # Fetch the most recent version for your current version of k8s + # AWS_PROFILE= eksctl utils describe-addon-versions --kubernetes-version 1.27 -v0 | jq '.Addons[] | select(.AddonName == "aws-efs-csi-driver") | .AddonVersions[0]' + addon_version = "v1.7.0-eksbuild.1" + resolve_conflicts_on_create = "OVERWRITE" + service_account_role_arn = aws_iam_role.efs_provisioner.arn + depends_on = [ + aws_iam_role_policy_attachment.efs_provisioner + ] +} + diff --git a/terraform/aws/cluster.tf b/terraform/aws/cluster.tf index 80afeb2..3b9dd2f 100644 --- a/terraform/aws/cluster.tf +++ b/terraform/aws/cluster.tf @@ -54,7 +54,7 @@ resource "aws_iam_openid_connect_provider" "cluster_oidc" { module "cluster_autoscaler_irsa" { source = "terraform-aws-modules/iam/aws//modules/iam-role-for-service-accounts-eks" - role_name = "cluster_autoscaler" + role_name = "${var.cluster_name}_cluster_autoscaler" role_permissions_boundary_arn = var.permissions_boundary attach_cluster_autoscaler_policy = true diff --git a/terraform/aws/efs.tf b/terraform/aws/efs.tf new file mode 100644 index 0000000..d1bec41 --- /dev/null +++ b/terraform/aws/efs.tf @@ -0,0 +1,10 @@ +resource "aws_efs_file_system" "job_history" { + creation_token = "${var.cluster_name}-flink-job-history" +} + +resource "aws_efs_mount_target" "job_history" { + for_each = toset(data.aws_subnets.default.ids) + file_system_id = aws_efs_file_system.job_history.id + subnet_id = each.value + security_groups = [aws_eks_cluster.cluster.vpc_config[0].cluster_security_group_id] +} diff --git a/terraform/aws/helm_historyserver.tf b/terraform/aws/helm_historyserver.tf new file mode 100644 index 0000000..9f4195f --- /dev/null +++ b/terraform/aws/helm_historyserver.tf @@ -0,0 +1,35 @@ +provider "kubernetes" { + host = aws_eks_cluster.cluster.endpoint + cluster_ca_certificate = base64decode(aws_eks_cluster.cluster.certificate_authority[0].data) + token = data.aws_eks_cluster_auth.cluster.token +} + +data "aws_eks_cluster_auth" "cluster" { + name = "${aws_eks_cluster.cluster.name}" + depends_on = [ + aws_eks_cluster.cluster, + helm_release.flink_operator + ] +} + +resource "helm_release" "flink_historyserver" { + name = "flink-historyserver" + chart = "../../helm-charts/flink-historyserver" + create_namespace = false + + set { + name = "efsFileSystemId" + value = "${aws_efs_file_system.job_history.id}" + } + + set { + name = "flinkVersion" + value = "${var.flink_version}" + } + + wait = true + depends_on = [ + aws_eks_cluster.cluster, + helm_release.flink_operator + ] +} diff --git a/terraform/aws/main.tf b/terraform/aws/main.tf index a565911..2bd39f2 100644 --- a/terraform/aws/main.tf +++ b/terraform/aws/main.tf @@ -30,4 +30,8 @@ data "aws_subnets" "default" { values = [data.aws_vpc.default.id] } } +data "aws_security_group" "default" { + vpc_id = data.aws_vpc.default.id + name = "default" +} diff --git a/terraform/aws/operator.tf b/terraform/aws/operator.tf index 90e3a33..10ce1e5 100644 --- a/terraform/aws/operator.tf +++ b/terraform/aws/operator.tf @@ -17,7 +17,6 @@ resource "helm_release" "cert_manager" { ] } - resource "helm_release" "flink_operator" { name = "flink-operator" repository = "https://downloads.apache.org/flink/flink-kubernetes-operator-${var.flink_operator_version}" @@ -43,12 +42,14 @@ resource "helm_release" "flink_operator" { set { name = "defaultConfiguration.flink-conf\\.yaml" value = yamlencode({ - "kubernetes.operator.metrics.reporter.prom.class" : "org.apache.flink.metrics.prometheus.PrometheusReporter", - "kubernetes.operator.metrics.reporter.prom.port" : "9999", + "kubernetes.operator.metrics.reporter.prom.factory.class" : "org.apache.flink.metrics.prometheus.PrometheusReporter", + "kubernetes.operator.metrics.reporter.prom.factory.port" : "9999", "kubernetes.jobmanager.annotations" : { - "prometheus.io/scrape" : "true" + "prometheus.io/scrape" : "true", "prometheus.io/port" : "9999" - } + }, + "jobmanager.archive.fs.dir": var.historyserver_mount_path, + "historyserver.archive.fs.dir": var.historyserver_mount_path, }) } diff --git a/terraform/aws/variables.tf b/terraform/aws/variables.tf index e6f7828..dde9b9f 100644 --- a/terraform/aws/variables.tf +++ b/terraform/aws/variables.tf @@ -68,12 +68,19 @@ variable "max_instances" { } variable "flink_operator_version" { - default = "1.5.0" + default = "1.6.1" description = <<-EOT Version of Flink Operator to install. EOT } +variable "flink_version" { + default = "1.16" + description = <<-EOT + Version of Flink to install. + EOT +} + variable "cluster_autoscaler_version" { default = "9.21.0" description = <<-EOT @@ -131,3 +138,11 @@ variable "buckets" { List of S3 Buckets to create. EOT } + +variable "historyserver_mount_path" { + default = "/opt/history/jobs" + description = <<-EOT + The mount path where Flink historyserver will archive jobs to so it can respond + to REST requests about statuses after job managers are gone + EOT +}