Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open sourcing transformer job #173

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions terraform/aws/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ module "flink" {
building_block = var.building_block
flink_container_registry = var.flink_container_registry
flink_image_tag = var.flink_image_tag
flink_merged_pipeline_release_names = var.flink_merged_pipeline_release_names
flink_unified_pipeline_release_names = var.flink_unified_pipeline_release_names
flink_release_names = var.flink_release_names
merged_pipeline_enabled = var.merged_pipeline_enabled
unified_pipeline_enabled = var.unified_pipeline_enabled
flink_checkpoint_store_type = var.flink_checkpoint_store_type
flink_chart_depends_on = [module.kafka, module.postgresql_migration, module.redis_dedup, module.redis_denorm]
postgresql_obsrv_username = module.postgresql.postgresql_obsrv_username
Expand Down
8 changes: 4 additions & 4 deletions terraform/aws/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,17 @@ variable "flink_release_names" {
}
}

variable "flink_merged_pipeline_release_names" {
variable "flink_unified_pipeline_release_names" {
description = "Create release names"
type = map(string)
default = {
merged-pipeline = "merged-pipeline"
unified-pipeline = "unified-pipeline"
master-data-processor = "master-data-processor"
}
}

variable "merged_pipeline_enabled" {
description = "Toggle to deploy merged pipeline"
variable "unified_pipeline_enabled" {
description = "Toggle to deploy unified pipeline"
type = bool
default = true
}
Expand Down
4 changes: 2 additions & 2 deletions terraform/gcp/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,9 @@ module "flink" {
building_block = var.building_block
flink_container_registry = var.flink_container_registry
flink_image_tag = var.flink_image_tag
flink_merged_pipeline_release_names = var.flink_merged_pipeline_release_names
flink_unified_pipeline_release_names = var.flink_unified_pipeline_release_names
flink_release_names = var.flink_release_names
merged_pipeline_enabled = var.merged_pipeline_enabled
unified_pipeline_enabled = var.unified_pipeline_enabled
flink_checkpoint_store_type = var.flink_checkpoint_store_type
flink_chart_depends_on = [ module.kafka, module.postgresql_migration, module.redis_dedup, module.redis_denorm ]
postgresql_obsrv_username = module.postgresql.postgresql_obsrv_username
Expand Down
8 changes: 4 additions & 4 deletions terraform/gcp/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,17 @@ variable "flink_release_names" {
}
}

variable "flink_merged_pipeline_release_names" {
variable "flink_unified_pipeline_release_names" {
description = "Create release names"
type = map(string)
default = {
merged-pipeline = "merged-pipeline"
unified-pipeline = "unified-pipeline"
master-data-processor = "master-data-processor"
}
}

variable "merged_pipeline_enabled" {
description = "Toggle to deploy merged pipeline"
variable "unified_pipeline_enabled" {
description = "Toggle to deploy unified pipeline"
type = bool
default = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ service_config: |
namespace: flink
reinstall_sleep_time: 3
jobs:
- name: "PipelineMergedJob"
release_name: merged-pipeline
job_manager_url: "merged-pipeline-jobmanager.flink.svc.cluster.local:8081"
- name: "UnifiedPipelineJob"
release_name: unified-pipeline
job_manager_url: "unified-pipeline-jobmanager.flink.svc.cluster.local:8081"
- name: "MasterDataProcessorJob"
release_name: master-data-processor
job_manager_url: "master-data-processor-jobmanager.flink.svc.cluster.local:8081"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ service_config: |
namespace: ${flink_namespace}
reinstall_sleep_time: 3
jobs:
- name: "PipelineMergedJob"
release_name: merged-pipeline
job_manager_url: "merged-pipeline-jobmanager.flink.svc.cluster.local:8081"
- name: "UnifiedPipelineJob"
release_name: unified-pipeline
job_manager_url: "unified-pipeline-jobmanager.flink.svc.cluster.local:8081"
- name: "MasterDataProcessor"
release_name: master-data-processor
job_manager_url: "master-data-processor-jobmanager.flink.svc.cluster.local:8081"
Expand Down
11 changes: 7 additions & 4 deletions terraform/modules/helm/flink/flink-helm-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace: "flink"
imagepullsecrets: ""
image:
registry: sanketikahub
repository: merged-pipeline
repository: unified-pipeline
tag: 1.0.0-GA
serviceMonitor:
enabled: false
Expand Down Expand Up @@ -161,8 +161,8 @@ base_config: |
port = "9042"
}

merged-pipeline:
merged-pipeline: |+
unified-pipeline:
unified-pipeline: |+
include file("/data/flink/conf/baseconfig.conf")
kafka {
input.topic = ${job.env}".ingest"
Expand All @@ -176,6 +176,7 @@ merged-pipeline:
output.denorm.topic = ${job.env}".denorm"
output.denorm.failed.topic = ${job.env}".failed"
output.transform.topic = ${job.env}".transform"
output.transform.failed.topic = ${job.env}".failed"
stats.topic = ${job.env}".stats"
groupId = ${job.env}"-single-pipeline-group"
producer {
Expand Down Expand Up @@ -211,7 +212,7 @@ merged-pipeline:
taskmanager.memory.process.size: 1700m
jobmanager.memory.process.size: 1600m
state.savepoints.dir: file:///tmp
job_classname: org.sunbird.obsrv.pipeline.task.MergedPipelineStreamTask
job_classname: org.sunbird.obsrv.pipeline.task.UnifiedPipelineStreamTask

extractor:
extractor: |+
Expand Down Expand Up @@ -332,6 +333,7 @@ transformer:
kafka {
input.topic = ${job.env}".denorm"
output.transform.topic = ${job.env}".transform"
output.transform.failed.topic = ${job.env}".failed"
groupId = ${job.env}"-transformer-group"
producer {
max-request-size = 10000024
Expand Down Expand Up @@ -436,6 +438,7 @@ master-data-processor:
output.duplicate.topic = ${job.env}".masterdata.failed"
output.denorm.topic = ${job.env}".masterdata.denorm"
output.transform.topic = ${job.env}".masterdata.transform"
output.transform.failed.topic = ${job.env}".masterdata.failed"
stats.topic = ${job.env}".masterdata.stats"
groupId = ${job.env}"-masterdata-pipeline-group"

Expand Down
2 changes: 1 addition & 1 deletion terraform/modules/helm/flink/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ resource "helm_release" "flink_sa" {
}

resource "helm_release" "flink" {
for_each = contains([var.merged_pipeline_enabled], true ) ? var.flink_merged_pipeline_release_names : var.flink_release_names
for_each = contains([var.unified_pipeline_enabled], true ) ? var.flink_unified_pipeline_release_names : var.flink_release_names
name = each.key
chart = "${path.module}/${var.flink_chart_path}"
namespace = var.flink_namespace
Expand Down
8 changes: 4 additions & 4 deletions terraform/modules/helm/flink/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ variable "flink_chart_path" {
# variable "flink_release_name" {
# type = string
# description = "Flink helm release name."
# default = "merged-pipeline"
# default = "unified-pipeline"
# }
# *** changed this to release map.

Expand Down Expand Up @@ -182,12 +182,12 @@ variable "flink_release_names" {
type = map(string)
}

variable "flink_merged_pipeline_release_names" {
variable "flink_unified_pipeline_release_names" {
description = "Create release names"
type = map(string)
}

variable "merged_pipeline_enabled" {
description = "Toggle to enable merged pipeline"
variable "unified_pipeline_enabled" {
description = "Toggle to enable unified pipeline"
type = bool
}
2 changes: 1 addition & 1 deletion terraform/modules/helm/lakehouse-connector/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ variable "flink_chart_path" {
# variable "flink_release_name" {
# type = string
# description = "Flink helm release name."
# default = "merged-pipeline"
# default = "unified-pipeline"
# }
# *** changed this to release map.

Expand Down
4 changes: 2 additions & 2 deletions terraform/modules/helm/unified_helm/obsrv/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ dependencies:
- name: master-data-processor
version: 0.1.2
condition: master-data-processor.enabled
- name: merged-pipeline
- name: unified-pipeline
version: 0.1.2
condition: merged-pipeline.enabled
condition: unified-pipeline.enabled
- name: postgresql
version: 12.2.7
condition: postgresql.enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ service_config: |
namespace: flink
reinstall_sleep_time: 3
jobs:
- name: "PipelineMergedJob"
release_name: merged-pipeline
job_manager_url: "merged-pipeline-jobmanager.flink.svc.cluster.local:8081"
- name: "UnifiedPipelineJob"
release_name: unified-pipeline
job_manager_url: "unified-pipeline-jobmanager.flink.svc.cluster.local:8081"
- name: "MasterDataProcessorJob"
release_name: master-data-processor
job_manager_url: "master-data-processor-jobmanager.flink.svc.cluster.local:8081"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ data:
output.duplicate.topic = ${job.env}".masterdata.failed"
output.denorm.topic = ${job.env}".masterdata.denorm"
output.transform.topic = ${job.env}".masterdata.transform"
output.transform.failed.topic = ${job.env}".masterdata.transform.failed"
output.transform.failed.topic = ${job.env}".masterdata.failed"
stats.topic = ${job.env}".masterdata.stats"
groupId = ${job.env}"-masterdata-pipeline-group"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ master-data-processor:
output.unique.topic = ${job.env}".masterdata.unique"
output.duplicate.topic = ${job.env}".masterdata.duplicate"
output.transform.topic = ${job.env}".masterdata.transform"
output.transform.failed.topic = ${job.env}".masterdata.failed"
stats.topic = ${job.env}".masterdata.stats"
groupId = ${job.env}"-masterdata-pipeline-group"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apiVersion: v2
name: merged-pipeline
name: unified-pipeline
description: A Helm chart for Kubernetes

# A chart can be either an 'application' or a 'library' chart.
Expand Down
Loading