From 5c93bfd818f15af91a95c9bf2411658859d005da Mon Sep 17 00:00:00 2001 From: Santhosh Vasabhaktula Date: Thu, 20 Jun 2024 22:42:49 +0530 Subject: [PATCH] feat #0000 - open-sourcing transformer job and renaming merged-pipeline to unified-pipeline --- terraform/aws/main.tf | 4 +- terraform/aws/variables.tf | 8 +- terraform/gcp/main.tf | 4 +- terraform/gcp/variables.tf | 8 +- .../command-service-helm-chart/values.yaml | 6 +- .../command_service/command_service.yaml.tfpl | 6 +- .../helm/flink/flink-helm-chart/values.yaml | 11 ++- terraform/modules/helm/flink/main.tf | 2 +- terraform/modules/helm/flink/variables.tf | 8 +- .../helm/lakehouse-connector/variables.tf | 2 +- .../helm/unified_helm/obsrv/Chart.yaml | 4 +- .../obsrv/charts/command-api/values.yaml | 6 +- .../templates/deployment.yaml | 2 +- .../charts/master-data-processor/values.yaml | 1 + .../obsrv/charts/merged-pipeline/Chart.yaml | 2 +- .../merged-pipeline/templates/deployment.yaml | 95 ++++++++++--------- .../obsrv/charts/merged-pipeline/values.yaml | 9 +- .../helm/unified_helm/obsrv/values.yaml | 8 +- 18 files changed, 96 insertions(+), 90 deletions(-) diff --git a/terraform/aws/main.tf b/terraform/aws/main.tf index d43352e4..cc044e57 100644 --- a/terraform/aws/main.tf +++ b/terraform/aws/main.tf @@ -140,9 +140,9 @@ module "flink" { building_block = var.building_block flink_container_registry = var.flink_container_registry flink_image_tag = var.flink_image_tag - flink_merged_pipeline_release_names = var.flink_merged_pipeline_release_names + flink_unified_pipeline_release_names = var.flink_unified_pipeline_release_names flink_release_names = var.flink_release_names - merged_pipeline_enabled = var.merged_pipeline_enabled + unified_pipeline_enabled = var.unified_pipeline_enabled flink_checkpoint_store_type = var.flink_checkpoint_store_type flink_chart_depends_on = [module.kafka, module.postgresql_migration, module.redis_dedup, module.redis_denorm] postgresql_obsrv_username = module.postgresql.postgresql_obsrv_username diff --git a/terraform/aws/variables.tf b/terraform/aws/variables.tf index e25d84fb..c83e90a0 100644 --- a/terraform/aws/variables.tf +++ b/terraform/aws/variables.tf @@ -141,17 +141,17 @@ variable "flink_release_names" { } } -variable "flink_merged_pipeline_release_names" { +variable "flink_unified_pipeline_release_names" { description = "Create release names" type = map(string) default = { - merged-pipeline = "merged-pipeline" + unified-pipeline = "unified-pipeline" master-data-processor = "master-data-processor" } } -variable "merged_pipeline_enabled" { - description = "Toggle to deploy merged pipeline" +variable "unified_pipeline_enabled" { + description = "Toggle to deploy unified pipeline" type = bool default = true } diff --git a/terraform/gcp/main.tf b/terraform/gcp/main.tf index 410564e4..cec40ef5 100644 --- a/terraform/gcp/main.tf +++ b/terraform/gcp/main.tf @@ -390,9 +390,9 @@ module "flink" { building_block = var.building_block flink_container_registry = var.flink_container_registry flink_image_tag = var.flink_image_tag - flink_merged_pipeline_release_names = var.flink_merged_pipeline_release_names + flink_unified_pipeline_release_names = var.flink_unified_pipeline_release_names flink_release_names = var.flink_release_names - merged_pipeline_enabled = var.merged_pipeline_enabled + unified_pipeline_enabled = var.unified_pipeline_enabled flink_checkpoint_store_type = var.flink_checkpoint_store_type flink_chart_depends_on = [ module.kafka, module.postgresql_migration, module.redis_dedup, module.redis_denorm ] postgresql_obsrv_username = module.postgresql.postgresql_obsrv_username diff --git a/terraform/gcp/variables.tf b/terraform/gcp/variables.tf index 165d3771..eb60f2af 100644 --- a/terraform/gcp/variables.tf +++ b/terraform/gcp/variables.tf @@ -226,17 +226,17 @@ variable "flink_release_names" { } } -variable "flink_merged_pipeline_release_names" { +variable "flink_unified_pipeline_release_names" { description = "Create release names" type = map(string) default = { - merged-pipeline = "merged-pipeline" + unified-pipeline = "unified-pipeline" master-data-processor = "master-data-processor" } } -variable "merged_pipeline_enabled" { - description = "Toggle to deploy merged pipeline" +variable "unified_pipeline_enabled" { + description = "Toggle to deploy unified pipeline" type = bool default = true } diff --git a/terraform/modules/helm/command_service/command-service-helm-chart/values.yaml b/terraform/modules/helm/command_service/command-service-helm-chart/values.yaml index 7c180110..0aad64f5 100644 --- a/terraform/modules/helm/command_service/command-service-helm-chart/values.yaml +++ b/terraform/modules/helm/command_service/command-service-helm-chart/values.yaml @@ -35,9 +35,9 @@ service_config: | namespace: flink reinstall_sleep_time: 3 jobs: - - name: "PipelineMergedJob" - release_name: merged-pipeline - job_manager_url: "merged-pipeline-jobmanager.flink.svc.cluster.local:8081" + - name: "UnifiedPipelineJob" + release_name: unified-pipeline + job_manager_url: "unified-pipeline-jobmanager.flink.svc.cluster.local:8081" - name: "MasterDataProcessorJob" release_name: master-data-processor job_manager_url: "master-data-processor-jobmanager.flink.svc.cluster.local:8081" diff --git a/terraform/modules/helm/command_service/command_service.yaml.tfpl b/terraform/modules/helm/command_service/command_service.yaml.tfpl index c382c55d..fd248e07 100644 --- a/terraform/modules/helm/command_service/command_service.yaml.tfpl +++ b/terraform/modules/helm/command_service/command_service.yaml.tfpl @@ -22,9 +22,9 @@ service_config: | namespace: ${flink_namespace} reinstall_sleep_time: 3 jobs: - - name: "PipelineMergedJob" - release_name: merged-pipeline - job_manager_url: "merged-pipeline-jobmanager.flink.svc.cluster.local:8081" + - name: "UnifiedPipelineJob" + release_name: unified-pipeline + job_manager_url: "unified-pipeline-jobmanager.flink.svc.cluster.local:8081" - name: "MasterDataProcessor" release_name: master-data-processor job_manager_url: "master-data-processor-jobmanager.flink.svc.cluster.local:8081" diff --git a/terraform/modules/helm/flink/flink-helm-chart/values.yaml b/terraform/modules/helm/flink/flink-helm-chart/values.yaml index 131e8e96..eb066f5f 100644 --- a/terraform/modules/helm/flink/flink-helm-chart/values.yaml +++ b/terraform/modules/helm/flink/flink-helm-chart/values.yaml @@ -2,7 +2,7 @@ namespace: "flink" imagepullsecrets: "" image: registry: sanketikahub - repository: merged-pipeline + repository: unified-pipeline tag: 1.0.0-GA serviceMonitor: enabled: false @@ -161,8 +161,8 @@ base_config: | port = "9042" } -merged-pipeline: - merged-pipeline: |+ +unified-pipeline: + unified-pipeline: |+ include file("/data/flink/conf/baseconfig.conf") kafka { input.topic = ${job.env}".ingest" @@ -176,6 +176,7 @@ merged-pipeline: output.denorm.topic = ${job.env}".denorm" output.denorm.failed.topic = ${job.env}".failed" output.transform.topic = ${job.env}".transform" + output.transform.failed.topic = ${job.env}".failed" stats.topic = ${job.env}".stats" groupId = ${job.env}"-single-pipeline-group" producer { @@ -211,7 +212,7 @@ merged-pipeline: taskmanager.memory.process.size: 1700m jobmanager.memory.process.size: 1600m state.savepoints.dir: file:///tmp - job_classname: org.sunbird.obsrv.pipeline.task.MergedPipelineStreamTask + job_classname: org.sunbird.obsrv.pipeline.task.UnifiedPipelineStreamTask extractor: extractor: |+ @@ -332,6 +333,7 @@ transformer: kafka { input.topic = ${job.env}".denorm" output.transform.topic = ${job.env}".transform" + output.transform.failed.topic = ${job.env}".failed" groupId = ${job.env}"-transformer-group" producer { max-request-size = 10000024 @@ -436,6 +438,7 @@ master-data-processor: output.duplicate.topic = ${job.env}".masterdata.failed" output.denorm.topic = ${job.env}".masterdata.denorm" output.transform.topic = ${job.env}".masterdata.transform" + output.transform.failed.topic = ${job.env}".masterdata.failed" stats.topic = ${job.env}".masterdata.stats" groupId = ${job.env}"-masterdata-pipeline-group" diff --git a/terraform/modules/helm/flink/main.tf b/terraform/modules/helm/flink/main.tf index 6030928d..7ebcddd3 100644 --- a/terraform/modules/helm/flink/main.tf +++ b/terraform/modules/helm/flink/main.tf @@ -20,7 +20,7 @@ resource "helm_release" "flink_sa" { } resource "helm_release" "flink" { - for_each = contains([var.merged_pipeline_enabled], true ) ? var.flink_merged_pipeline_release_names : var.flink_release_names + for_each = contains([var.unified_pipeline_enabled], true ) ? var.flink_unified_pipeline_release_names : var.flink_release_names name = each.key chart = "${path.module}/${var.flink_chart_path}" namespace = var.flink_namespace diff --git a/terraform/modules/helm/flink/variables.tf b/terraform/modules/helm/flink/variables.tf index c0491ef6..b3e93125 100644 --- a/terraform/modules/helm/flink/variables.tf +++ b/terraform/modules/helm/flink/variables.tf @@ -35,7 +35,7 @@ variable "flink_chart_path" { # variable "flink_release_name" { # type = string # description = "Flink helm release name." -# default = "merged-pipeline" +# default = "unified-pipeline" # } # *** changed this to release map. @@ -182,12 +182,12 @@ variable "flink_release_names" { type = map(string) } -variable "flink_merged_pipeline_release_names" { +variable "flink_unified_pipeline_release_names" { description = "Create release names" type = map(string) } -variable "merged_pipeline_enabled" { - description = "Toggle to enable merged pipeline" +variable "unified_pipeline_enabled" { + description = "Toggle to enable unified pipeline" type = bool } diff --git a/terraform/modules/helm/lakehouse-connector/variables.tf b/terraform/modules/helm/lakehouse-connector/variables.tf index 616d6cb5..1372b893 100644 --- a/terraform/modules/helm/lakehouse-connector/variables.tf +++ b/terraform/modules/helm/lakehouse-connector/variables.tf @@ -24,7 +24,7 @@ variable "flink_chart_path" { # variable "flink_release_name" { # type = string # description = "Flink helm release name." -# default = "merged-pipeline" +# default = "unified-pipeline" # } # *** changed this to release map. diff --git a/terraform/modules/helm/unified_helm/obsrv/Chart.yaml b/terraform/modules/helm/unified_helm/obsrv/Chart.yaml index 8a9a9184..220aeda1 100644 --- a/terraform/modules/helm/unified_helm/obsrv/Chart.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/Chart.yaml @@ -40,9 +40,9 @@ dependencies: - name: master-data-processor version: 0.1.2 condition: master-data-processor.enabled - - name: merged-pipeline + - name: unified-pipeline version: 0.1.2 - condition: merged-pipeline.enabled + condition: unified-pipeline.enabled - name: postgresql version: 12.2.7 condition: postgresql.enabled diff --git a/terraform/modules/helm/unified_helm/obsrv/charts/command-api/values.yaml b/terraform/modules/helm/unified_helm/obsrv/charts/command-api/values.yaml index ee006d0d..6d1c5b0a 100644 --- a/terraform/modules/helm/unified_helm/obsrv/charts/command-api/values.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/charts/command-api/values.yaml @@ -12,9 +12,9 @@ service_config: | namespace: flink reinstall_sleep_time: 3 jobs: - - name: "PipelineMergedJob" - release_name: merged-pipeline - job_manager_url: "merged-pipeline-jobmanager.flink.svc.cluster.local:8081" + - name: "UnifiedPipelineJob" + release_name: unified-pipeline + job_manager_url: "unified-pipeline-jobmanager.flink.svc.cluster.local:8081" - name: "MasterDataProcessorJob" release_name: master-data-processor job_manager_url: "master-data-processor-jobmanager.flink.svc.cluster.local:8081" diff --git a/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/templates/deployment.yaml b/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/templates/deployment.yaml index fa4728e9..49d9d0ac 100644 --- a/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/templates/deployment.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/templates/deployment.yaml @@ -128,7 +128,7 @@ data: output.duplicate.topic = ${job.env}".masterdata.failed" output.denorm.topic = ${job.env}".masterdata.denorm" output.transform.topic = ${job.env}".masterdata.transform" - output.transform.failed.topic = ${job.env}".masterdata.transform.failed" + output.transform.failed.topic = ${job.env}".masterdata.failed" stats.topic = ${job.env}".masterdata.stats" groupId = ${job.env}"-masterdata-pipeline-group" diff --git a/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/values.yaml b/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/values.yaml index 49b8cde7..d3497e87 100644 --- a/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/values.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/charts/master-data-processor/values.yaml @@ -157,6 +157,7 @@ master-data-processor: output.unique.topic = ${job.env}".masterdata.unique" output.duplicate.topic = ${job.env}".masterdata.duplicate" output.transform.topic = ${job.env}".masterdata.transform" + output.transform.failed.topic = ${job.env}".masterdata.failed" stats.topic = ${job.env}".masterdata.stats" groupId = ${job.env}"-masterdata-pipeline-group" diff --git a/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/Chart.yaml b/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/Chart.yaml index eb70f976..f8f30dff 100644 --- a/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/Chart.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v2 -name: merged-pipeline +name: unified-pipeline description: A Helm chart for Kubernetes # A chart can be either an 'application' or a 'library' chart. diff --git a/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/templates/deployment.yaml b/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/templates/deployment.yaml index 29b9b08b..f3ff2074 100644 --- a/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/templates/deployment.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/templates/deployment.yaml @@ -1,5 +1,5 @@ --- -# Source: obsrv-chart/charts/merged-pipeline/templates/flink_job_configmap.yaml +# Source: obsrv-chart/charts/unified-pipeline/templates/flink_job_configmap.yaml apiVersion: v1 data: base-config: |+ @@ -71,7 +71,7 @@ data: taskmanager.memory.process.size: 1700m jobmanager.memory.process.size: 1600m state.savepoints.dir: file:///tmp - job_classname: org.sunbird.obsrv.pipeline.task.MergedPipelineStreamTask + job_classname: org.sunbird.obsrv.pipeline.task.UnifiedPipelineStreamTask log4j_console_properties: | # This affects logging for both user code and Flink rootLogger.level = INFO @@ -102,7 +102,7 @@ data: # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF - merged-pipeline: | + unified-pipeline: | include file("/data/flink/conf/baseconfig.conf") kafka { input.topic = ${job.env}".ingest" @@ -117,6 +117,7 @@ data: output.denorm.topic = ${job.env}".denorm" output.denorm.failed.topic = ${job.env}".denorm.failed" output.transform.topic = ${job.env}".transform" + output.transform.failed.topic = ${job.env}".failed" stats.topic = ${job.env}".stats" groupId = ${job.env}"-single-pipeline-group" producer { @@ -142,18 +143,18 @@ kind: ConfigMap metadata: labels: app: flink - name: merged-pipeline-config + name: unified-pipeline-config namespace: flink --- -# Source: obsrv-chart/charts/merged-pipeline/templates/deployment.yaml +# Source: obsrv-chart/charts/unified-pipeline/templates/deployment.yaml apiVersion: v1 kind: Service metadata: labels: app: flink - component: merged-pipeline-jobmanager - name: merged-pipeline-jobmanager + component: unified-pipeline-jobmanager + name: unified-pipeline-jobmanager namespace: flink spec: ports: @@ -169,14 +170,14 @@ spec: port: 9250 selector: app: flink - component: merged-pipeline-jobmanager + component: unified-pipeline-jobmanager type: ClusterIP --- -# Source: obsrv-chart/charts/merged-pipeline/templates/deployment.yaml +# Source: obsrv-chart/charts/unified-pipeline/templates/deployment.yaml apiVersion: v1 kind: Service metadata: - name: merged-pipeline-jobmanager-webui + name: unified-pipeline-jobmanager-webui namespace: flink spec: ports: @@ -186,17 +187,17 @@ spec: targetPort: 8081 selector: app: flink - component: merged-pipeline-jobmanager + component: unified-pipeline-jobmanager type: ClusterIP --- -# Source: obsrv-chart/charts/merged-pipeline/templates/deployment.yaml +# Source: obsrv-chart/charts/unified-pipeline/templates/deployment.yaml apiVersion: v1 kind: Service metadata: labels: app: flink - component: merged-pipeline-taskmanager - name: merged-pipeline-taskmanager + component: unified-pipeline-taskmanager + name: unified-pipeline-taskmanager namespace: flink spec: ports: @@ -204,29 +205,29 @@ spec: port: 9251 selector: app: flink - component: merged-pipeline-taskmanager + component: unified-pipeline-taskmanager type: ClusterIP --- --- -# Source: obsrv-chart/charts/merged-pipeline/templates/deployment.yaml +# Source: obsrv-chart/charts/unified-pipeline/templates/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: - name: merged-pipeline-taskmanager + name: unified-pipeline-taskmanager namespace: flink spec: replicas: 1 selector: matchLabels: app: flink - component: merged-pipeline-taskmanager + component: unified-pipeline-taskmanager template: metadata: labels: azure-extensions-usage-release-identifier: obsrv-base app: flink - component: merged-pipeline-taskmanager + component: unified-pipeline-taskmanager system.processing: "true" spec: containers: @@ -235,21 +236,21 @@ spec: - -Dfs.azure.account.key.{{ .Values.global.azure_storage_account_name }}.blob.core.windows.net={{ .Values.global.azure_storage_account_key }} - -Dweb.submit.enable=false - -Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter - - -Dmetrics.reporter.prom.host=merged-pipeline-taskmanager + - -Dmetrics.reporter.prom.host=unified-pipeline-taskmanager - -Dmetrics.reporter.prom.port=9251-9260 - - -Djobmanager.rpc.address=merged-pipeline-jobmanager + - -Djobmanager.rpc.address=unified-pipeline-jobmanager - -Dtaskmanager.rpc.port=6122 - --config.file.path - - /data/flink/conf/merged-pipeline.conf + - /data/flink/conf/unified-pipeline.conf command: - /opt/flink/bin/taskmanager.sh - {{ if .Values.global.azure.images.merged_pipeline }} - image: "{{ .Values.global.azure.images.merged_pipeline.registry }}/{{ .Values.global.azure.images.merged_pipeline.image }}:{{ .Values.global.azure.images.merged_pipeline.digest }}" + {{ if .Values.global.azure.images.unified_pipeline }} + image: "{{ .Values.global.azure.images.unified_pipeline.registry }}/{{ .Values.global.azure.images.unified_pipeline.image }}:{{ .Values.global.azure.images.unified_pipeline.digest }}" {{ else }} - image: sanketikahub/merged-pipeline:release-0.5.0_RC26 + image: sanketikahub/unified-pipeline:release-0.5.0_RC26 {{ end }} imagePullPolicy: IfNotPresent - name: merged-pipeline-taskmanager + name: unified-pipeline-taskmanager ports: - containerPort: 6122 name: rpc @@ -268,9 +269,9 @@ spec: - mountPath: /data/flink/conf/baseconfig.conf name: flink-config-volume subPath: base-config.conf - - mountPath: /data/flink/conf/merged-pipeline.conf + - mountPath: /data/flink/conf/unified-pipeline.conf name: flink-config-volume - subPath: merged-pipeline.conf + subPath: unified-pipeline.conf workingDir: volumes: - configMap: @@ -281,22 +282,22 @@ spec: path: log4j-console.properties - key: base-config path: base-config.conf - - key: merged-pipeline - path: merged-pipeline.conf - name: merged-pipeline-config + - key: unified-pipeline + path: unified-pipeline.conf + name: unified-pipeline-config name: flink-config-volume --- -# Source: obsrv-chart/charts/merged-pipeline/templates/deployment.yaml +# Source: obsrv-chart/charts/unified-pipeline/templates/deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: - name: merged-pipeline-jobmanager + name: unified-pipeline-jobmanager namespace: flink spec: selector: matchLabels: app: flink - component: merged-pipeline-jobmanager + component: unified-pipeline-jobmanager azure-extensions-usage-release-identifier: obsrv-base template: metadata: @@ -305,7 +306,7 @@ spec: prometheus.io/scrape: "true" labels: app: flink - component: merged-pipeline-jobmanager + component: unified-pipeline-jobmanager azure-extensions-usage-release-identifier: obsrv-base spec: initContainers: @@ -340,26 +341,26 @@ spec: - args: - start-foreground - -Dfs.azure.account.key.{{ .Values.global.azure_storage_account_name }}.blob.core.windows.net={{ .Values.global.azure_storage_account_key }} - - --job-classname=org.sunbird.obsrv.pipeline.task.MergedPipelineStreamTask + - --job-classname=org.sunbird.obsrv.pipeline.task.UnifiedPipelineStreamTask - -Dweb.submit.enable=false - -Dmetrics.reporter.prom.class=org.apache.flink.metrics.prometheus.PrometheusReporter - -Dmetrics.reporter.prom.port=9250 - - -Djobmanager.rpc.address=merged-pipeline-jobmanager + - -Djobmanager.rpc.address=unified-pipeline-jobmanager - -Djobmanager.rpc.port=6123 - -Dparallelism.default=1 - -Dblob.server.port=6124 - -Dqueryable-state.server.ports=6125 - --config.file.path - - /data/flink/conf/merged-pipeline.conf + - /data/flink/conf/unified-pipeline.conf command: - /opt/flink/bin/standalone-job.sh - {{ if .Values.global.azure.images.merged_pipeline }} - image: "{{ .Values.global.azure.images.merged_pipeline.registry }}/{{ .Values.global.azure.images.merged_pipeline.image }}:{{ .Values.global.azure.images.merged_pipeline.digest }}" + {{ if .Values.global.azure.images.unified_pipeline }} + image: "{{ .Values.global.azure.images.unified_pipeline.registry }}/{{ .Values.global.azure.images.unified_pipeline.image }}:{{ .Values.global.azure.images.unified_pipeline.digest }}" {{ else }} - image: sanketikahub/merged-pipeline:release-0.5.0_RC26 + image: sanketikahub/unified-pipeline:release-0.5.0_RC26 {{ end }} imagePullPolicy: IfNotPresent - name: merged-pipeline-jobmanager + name: unified-pipeline-jobmanager ports: - containerPort: 6123 name: rpc @@ -381,9 +382,9 @@ spec: - mountPath: /data/flink/conf/baseconfig.conf name: flink-config-volume subPath: base-config.conf - - mountPath: /data/flink/conf/merged-pipeline.conf + - mountPath: /data/flink/conf/unified-pipeline.conf name: flink-config-volume - subPath: merged-pipeline.conf + subPath: unified-pipeline.conf - mountPath: /opt/flink/conf/log4j-console.properties name: flink-config-volume subPath: log4j-console.properties @@ -396,10 +397,10 @@ spec: path: flink-conf.yaml - key: base-config path: base-config.conf - - key: merged-pipeline - path: merged-pipeline.conf + - key: unified-pipeline + path: unified-pipeline.conf - key: log4j_console_properties path: log4j-console.properties - name: merged-pipeline-config + name: unified-pipeline-config name: flink-config-volume --- diff --git a/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/values.yaml b/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/values.yaml index d2c0b221..118a1316 100644 --- a/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/values.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/charts/merged-pipeline/values.yaml @@ -2,7 +2,7 @@ namespace: "flink" imagepullsecrets: "" image: registry: sanketikahub - repository: merged-pipeline + repository: unified-pipeline tag: release-0.5.0_RC23 serviceMonitor: enabled: false @@ -142,8 +142,8 @@ base_config: | port = "9042" } -merged-pipeline: - merged-pipeline: |+ +unified-pipeline: + unified-pipeline: |+ include file("/data/flink/conf/baseconfig.conf") kafka { input.topic = ${job.env}".ingest" @@ -158,6 +158,7 @@ merged-pipeline: output.denorm.topic = ${job.env}".denorm" output.denorm.failed.topic = ${job.env}".denorm.failed" output.transform.topic = ${job.env}".transform" + output.transform.failed.topic = ${job.env}".failed" stats.topic = ${job.env}".stats" groupId = ${job.env}"-single-pipeline-group" producer { @@ -193,7 +194,7 @@ merged-pipeline: taskmanager.memory.process.size: 1700m jobmanager.memory.process.size: 1600m state.savepoints.dir: file:///tmp - job_classname: org.sunbird.obsrv.pipeline.task.MergedPipelineStreamTask + job_classname: org.sunbird.obsrv.pipeline.task.UnifiedPipelineStreamTask serviceAccount: # Specifies whether a service account should be created diff --git a/terraform/modules/helm/unified_helm/obsrv/values.yaml b/terraform/modules/helm/unified_helm/obsrv/values.yaml index 9dc108d6..dc21db97 100644 --- a/terraform/modules/helm/unified_helm/obsrv/values.yaml +++ b/terraform/modules/helm/unified_helm/obsrv/values.yaml @@ -41,10 +41,10 @@ global: # digest: sha256:77bdba3135998baadc20015e00a9742eebac52167b90c3e46d0c339a2d668b12 # image: os-shell # registry: docker.io/bitnami - merged_pipeline: + unified_pipeline: # tag: 1.0.2-GA digest: 1.0.3-GA - image: merged-pipeline + image: unified-pipeline registry: *sanketika_docker_registry master_data_processor: # tag: 1.0.2-GA @@ -377,9 +377,9 @@ druid-raw-cluster: eks.amazonaws.com/role-arn: arn:aws:iam::725876873105:role/dev-obsrv-test-druid-raw-sa-iam-role name: druid-raw-sa -merged-pipeline: +unified-pipeline: enabled: true - name: merged-pipeline + name: unified-pipeline namespace: flink env: *global-env checkpoint_store_type: *global-cloud-storage-provider