Skip to content

Commit

Permalink
[#4139] Add Flink connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ljupcovangelski committed Feb 6, 2024
1 parent a9aae70 commit 4749dfb
Show file tree
Hide file tree
Showing 18 changed files with 881 additions and 0 deletions.
32 changes: 32 additions & 0 deletions backend/components/flink/helm/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@rules_pkg//:pkg.bzl", "pkg_tar")
load("@com_github_airyhq_bazel_tools//helm:helm.bzl", "helm_template_test")
load("//tools/build:helm.bzl", "helm_push_develop", "helm_push_release")

filegroup(
name = "files",
srcs = glob(
["**/*"],
exclude = ["BUILD"],
),
visibility = ["//visibility:public"],
)

pkg_tar(
name = "package",
srcs = [":files"],
extension = "tgz",
strip_prefix = "./",
)

helm_template_test(
name = "template",
chart = ":package",
)

helm_push_develop(
chart = ":package",
)

helm_push_release(
chart = ":package",
)
5 changes: 5 additions & 0 deletions backend/components/flink/helm/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
apiVersion: v2
appVersion: "1.0"
description: Flink connector
name: flink-connector
version: 1.0
10 changes: 10 additions & 0 deletions backend/components/flink/helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.component }}
labels:
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: "{{ .Values.component }}"
annotations:
core.airy.co/enabled: "{{ .Values.enabled }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.resultSender.topic }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
15 changes: 15 additions & 0 deletions backend/components/flink/helm/templates/result-sender/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.resultSender.name }}
labels:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.resultSender.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.resultSender.name }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}
core.airy.co/managed: "true"
core.airy.co/mandatory: "{{ .Values.mandatory }}"
core.airy.co/component: {{ .Values.component }}
spec:
replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }}
selector:
matchLabels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
strategy:
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
type: RollingUpdate
template:
metadata:
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
containers:
- name: app
image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release"
imagePullPolicy: Always
envFrom:
- configMapRef:
name: security
- configMapRef:
name: kafka-config
- configMapRef:
name: {{ .Values.component }}
env:
- name: KAFKA_TOPIC_NAME
value: {{ .Values.executor.topic }}
livenessProbe:
httpGet:
path: /actuator/health
port: {{ .Values.port }}
httpHeaders:
- name: Health-Check
value: health-check
initialDelaySeconds: 43200
periodSeconds: 10
failureThreshold: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
name: {{ .Values.component }}-{{ .Values.executor.name }}
labels:
app: {{ .Values.component }}-{{ .Values.executor.name }}
spec:
type: ClusterIP
clusterIP: None
ports:
- name: {{ .Values.component }}-{{ .Values.executor.name }}
port: 80
targetPort: {{ .Values.port }}
selector:
app: {{ .Values.component }}-{{ .Values.executor.name }}
13 changes: 13 additions & 0 deletions backend/components/flink/helm/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
component: flink-connector
mandatory: false
enabled: false
port: 8080
resources:
executor:
name: statements-executor
image: connectors/flink/statements-executor
topic: flink.statements
resultSender:
name: result-sender
image: connectors/flink/result-sender
topic: flink.output
22 changes: 22 additions & 0 deletions backend/components/flink/result-sender/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use the official Golang image as the base image
FROM golang:1.17

# Set the working directory inside the container
WORKDIR /app

# Copy the Go source code into the container
COPY ./src/*.go ./

# Install the required libraries
RUN go mod init main && \
go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \
go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \
go get golang.org/x/net

# Build the Go program
RUN go build -o app

# Command to run the Go program
CMD ["./app"]
6 changes: 6 additions & 0 deletions backend/components/flink/result-sender/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
build:
docker build -t flink-connector/result-sender .

release: build
docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release
docker push ghcr.io/airyhq/connectors/flink/result-sender:release
119 changes: 119 additions & 0 deletions backend/components/flink/result-sender/src/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package main

import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

// Create Kafka consumer to read the statements
kafkaURL := os.Getenv("KAFKA_BROKERS")
schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL")
topicName := os.Getenv("KAFKA_TOPIC_NAME")
systemToken := os.Getenv("systemToken")
authUsername := os.Getenv("AUTH_JAAS_USERNAME")
authPassword := os.Getenv("AUTH_JAAS_PASSWORD")
//timestamp := time.Now().Unix()
//strTimestamp := fmt.Sprintf("%d", timestamp)
groupID := "result-sender" //+ "-" + strTimestamp

if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" {
fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set")
return
}

// Healthcheck
http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) {
response := map[string]string{"status": "UP"}
jsonResponse, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(jsonResponse)
})

go func() {
if err := http.ListenAndServe(":80", nil); err != nil {
panic(err)
}
}()

fmt.Println("Health-check started")

// Create Kafka consumer configuration
fmt.Println("Creating Kafka consumer for topic: ", topicName)

c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": kafkaURL,
"group.id": groupID,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": authUsername,
"sasl.password": authPassword,
})
if err != nil {
fmt.Printf("Error creating consumer: %v\n", err)
return
}
c.SubscribeTopics([]string{topicName}, nil)
// Channel for signals
signals := make(chan os.Signal, 1)
done := make(chan bool, 1)

signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

go func() {
for {
select {
case sig := <-signals:
// If an interrupt signal is received, break the loop
fmt.Printf("Caught signal %v: terminating\n", sig)
done <- true
return
default:
msg, err := c.ReadMessage(-1)
if err == nil {
var flinkOutput FlinkOutput
if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil {
fmt.Printf("Error unmarshalling message: %v\n", err)
continue
} else {
fmt.Printf("Received message: %+v\n", flinkOutput)

flinkGatewayURL := "http://flink-jobmanager:8083" //"http://flink.us-east-2.aws.confluent.cloud/v1beta1/sql", Replace with your Flink Gateway URL https://flink.region.provider.confluent.cloud
fmt.Println("Flink gateway: ", flinkGatewayURL)
result, err := getFlinkResult(flinkGatewayURL, flinkOutput.SessionID)
if err != nil {
fmt.Println("Unable to get Flink result:", err)
return
}
response, err := convertResultToMarkdown(result)
if err != nil {
fmt.Println("Unable to generate Markdown from result:", err)
sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken)
return
}
sendMessage(response, flinkOutput.ConversationID, systemToken)
}
} else {
// Log the error and continue
fmt.Printf("Consumer error: %v\n", err)
}
}
}
}()
<-done
c.Close()
fmt.Println("Consumer closed")
}
Loading

0 comments on commit 4749dfb

Please sign in to comment.