diff --git a/backend/components/flink/helm/BUILD b/backend/components/flink/helm/BUILD new file mode 100644 index 000000000..5bb7f8f59 --- /dev/null +++ b/backend/components/flink/helm/BUILD @@ -0,0 +1,32 @@ +load("@rules_pkg//:pkg.bzl", "pkg_tar") +load("@com_github_airyhq_bazel_tools//helm:helm.bzl", "helm_template_test") +load("//tools/build:helm.bzl", "helm_push_develop", "helm_push_release") + +filegroup( + name = "files", + srcs = glob( + ["**/*"], + exclude = ["BUILD"], + ), + visibility = ["//visibility:public"], +) + +pkg_tar( + name = "package", + srcs = [":files"], + extension = "tgz", + strip_prefix = "./", +) + +helm_template_test( + name = "template", + chart = ":package", +) + +helm_push_develop( + chart = ":package", +) + +helm_push_release( + chart = ":package", +) diff --git a/backend/components/flink/helm/Chart.yaml b/backend/components/flink/helm/Chart.yaml new file mode 100644 index 000000000..e260114c6 --- /dev/null +++ b/backend/components/flink/helm/Chart.yaml @@ -0,0 +1,5 @@ +apiVersion: v2 +appVersion: "1.0" +description: Flink connector +name: flink-connector +version: 1.0 diff --git a/backend/components/flink/helm/templates/configmap.yaml b/backend/components/flink/helm/templates/configmap.yaml new file mode 100644 index 000000000..05de4d589 --- /dev/null +++ b/backend/components/flink/helm/templates/configmap.yaml @@ -0,0 +1,10 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.component }} + labels: + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: "{{ .Values.component }}" + annotations: + core.airy.co/enabled: "{{ .Values.enabled }}" diff --git a/backend/components/flink/helm/templates/result-sender/deployment.yaml b/backend/components/flink/helm/templates/result-sender/deployment.yaml new file mode 100644 index 000000000..193cd11a5 --- /dev/null +++ b/backend/components/flink/helm/templates/result-sender/deployment.yaml @@ -0,0 +1,48 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.component }}-{{ .Values.resultSender.name }} + labels: + app: {{ .Values.component }} + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: {{ .Values.component }} +spec: + replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }} + selector: + matchLabels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} + spec: + containers: + - name: app + image: "ghcr.io/airyhq/{{ .Values.resultSender.image }}:release" + imagePullPolicy: Always + envFrom: + - configMapRef: + name: security + - configMapRef: + name: kafka-config + - configMapRef: + name: {{ .Values.component }} + env: + - name: KAFKA_TOPIC_NAME + value: {{ .Values.resultSender.topic }} + livenessProbe: + httpGet: + path: /actuator/health + port: {{ .Values.port }} + httpHeaders: + - name: Health-Check + value: health-check + initialDelaySeconds: 43200 + periodSeconds: 10 + failureThreshold: 3 diff --git a/backend/components/flink/helm/templates/result-sender/service.yaml b/backend/components/flink/helm/templates/result-sender/service.yaml new file mode 100644 index 000000000..b5ef5f7f7 --- /dev/null +++ b/backend/components/flink/helm/templates/result-sender/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.component }}-{{ .Values.resultSender.name }} + labels: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} +spec: + type: ClusterIP + clusterIP: None + ports: + - name: {{ .Values.component }}-{{ .Values.resultSender.name }} + port: 80 + targetPort: {{ .Values.port }} + selector: + app: {{ .Values.component }}-{{ .Values.resultSender.name }} diff --git a/backend/components/flink/helm/templates/statements-executor/deployment.yaml b/backend/components/flink/helm/templates/statements-executor/deployment.yaml new file mode 100644 index 000000000..fdf52ddb4 --- /dev/null +++ b/backend/components/flink/helm/templates/statements-executor/deployment.yaml @@ -0,0 +1,48 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Values.component }}-{{ .Values.executor.name }} + labels: + app: {{ .Values.component }} + core.airy.co/managed: "true" + core.airy.co/mandatory: "{{ .Values.mandatory }}" + core.airy.co/component: {{ .Values.component }} +spec: + replicas: {{ if .Values.enabled }} 1 {{ else }} 0 {{ end }} + selector: + matchLabels: + app: {{ .Values.component }}-{{ .Values.executor.name }} + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 1 + type: RollingUpdate + template: + metadata: + labels: + app: {{ .Values.component }}-{{ .Values.executor.name }} + spec: + containers: + - name: app + image: "ghcr.io/airyhq/{{ .Values.executor.image }}:release" + imagePullPolicy: Always + envFrom: + - configMapRef: + name: security + - configMapRef: + name: kafka-config + - configMapRef: + name: {{ .Values.component }} + env: + - name: KAFKA_TOPIC_NAME + value: {{ .Values.executor.topic }} + livenessProbe: + httpGet: + path: /actuator/health + port: {{ .Values.port }} + httpHeaders: + - name: Health-Check + value: health-check + initialDelaySeconds: 43200 + periodSeconds: 10 + failureThreshold: 3 diff --git a/backend/components/flink/helm/templates/statements-executor/service.yaml b/backend/components/flink/helm/templates/statements-executor/service.yaml new file mode 100644 index 000000000..e0cbbd5e8 --- /dev/null +++ b/backend/components/flink/helm/templates/statements-executor/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.component }}-{{ .Values.executor.name }} + labels: + app: {{ .Values.component }}-{{ .Values.executor.name }} +spec: + type: ClusterIP + clusterIP: None + ports: + - name: {{ .Values.component }}-{{ .Values.executor.name }} + port: 80 + targetPort: {{ .Values.port }} + selector: + app: {{ .Values.component }}-{{ .Values.executor.name }} diff --git a/backend/components/flink/helm/values.yaml b/backend/components/flink/helm/values.yaml new file mode 100644 index 000000000..db62a9851 --- /dev/null +++ b/backend/components/flink/helm/values.yaml @@ -0,0 +1,13 @@ +component: flink-connector +mandatory: false +enabled: false +port: 8080 +resources: +executor: + name: statements-executor + image: connectors/flink/statements-executor + topic: flink.statements +resultSender: + name: result-sender + image: connectors/flink/result-sender + topic: flink.output diff --git a/backend/components/flink/result-sender/Dockerfile b/backend/components/flink/result-sender/Dockerfile new file mode 100644 index 000000000..13e3499ac --- /dev/null +++ b/backend/components/flink/result-sender/Dockerfile @@ -0,0 +1,22 @@ +# Use the official Golang image as the base image +FROM golang:1.17 + +# Set the working directory inside the container +WORKDIR /app + +# Copy the Go source code into the container +COPY ./src/*.go ./ + +# Install the required libraries +RUN go mod init main && \ + go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \ + go get golang.org/x/net + +# Build the Go program +RUN go build -o app + +# Command to run the Go program +CMD ["./app"] diff --git a/backend/components/flink/result-sender/Makefile b/backend/components/flink/result-sender/Makefile new file mode 100644 index 000000000..3b5059783 --- /dev/null +++ b/backend/components/flink/result-sender/Makefile @@ -0,0 +1,6 @@ +build: + docker build -t flink-connector/result-sender . + +release: build + docker tag flink-connector/result-sender ghcr.io/airyhq/connectors/flink/result-sender:release + docker push ghcr.io/airyhq/connectors/flink/result-sender:release diff --git a/backend/components/flink/result-sender/src/main.go b/backend/components/flink/result-sender/src/main.go new file mode 100644 index 000000000..9c358bf15 --- /dev/null +++ b/backend/components/flink/result-sender/src/main.go @@ -0,0 +1,119 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + // Create Kafka consumer to read the statements + kafkaURL := os.Getenv("KAFKA_BROKERS") + schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") + topicName := os.Getenv("KAFKA_TOPIC_NAME") + systemToken := os.Getenv("systemToken") + authUsername := os.Getenv("AUTH_JAAS_USERNAME") + authPassword := os.Getenv("AUTH_JAAS_PASSWORD") + //timestamp := time.Now().Unix() + //strTimestamp := fmt.Sprintf("%d", timestamp) + groupID := "result-sender" //+ "-" + strTimestamp + + if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { + fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") + return + } + + // Healthcheck + http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { + response := map[string]string{"status": "UP"} + jsonResponse, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(jsonResponse) + }) + + go func() { + if err := http.ListenAndServe(":80", nil); err != nil { + panic(err) + } + }() + + fmt.Println("Health-check started") + + // Create Kafka consumer configuration + fmt.Println("Creating Kafka consumer for topic: ", topicName) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + "group.id": groupID, + "auto.offset.reset": "earliest", + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "sasl.username": authUsername, + "sasl.password": authPassword, + }) + if err != nil { + fmt.Printf("Error creating consumer: %v\n", err) + return + } + c.SubscribeTopics([]string{topicName}, nil) + // Channel for signals + signals := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + + go func() { + for { + select { + case sig := <-signals: + // If an interrupt signal is received, break the loop + fmt.Printf("Caught signal %v: terminating\n", sig) + done <- true + return + default: + msg, err := c.ReadMessage(-1) + if err == nil { + var flinkOutput FlinkOutput + if err := json.Unmarshal(msg.Value, &flinkOutput); err != nil { + fmt.Printf("Error unmarshalling message: %v\n", err) + continue + } else { + fmt.Printf("Received message: %+v\n", flinkOutput) + + flinkGatewayURL := "http://flink-jobmanager:8083" //"http://flink.us-east-2.aws.confluent.cloud/v1beta1/sql", Replace with your Flink Gateway URL https://flink.region.provider.confluent.cloud + fmt.Println("Flink gateway: ", flinkGatewayURL) + result, err := getFlinkResult(flinkGatewayURL, flinkOutput.SessionID) + if err != nil { + fmt.Println("Unable to get Flink result:", err) + return + } + response, err := convertResultToMarkdown(result) + if err != nil { + fmt.Println("Unable to generate Markdown from result:", err) + sendMessage("I'm sorry, I am unable to fetch the results from the Flink table.", flinkOutput.ConversationID, systemToken) + return + } + sendMessage(response, flinkOutput.ConversationID, systemToken) + } + } else { + // Log the error and continue + fmt.Printf("Consumer error: %v\n", err) + } + } + } + }() + <-done + c.Close() + fmt.Println("Consumer closed") +} diff --git a/backend/components/flink/result-sender/src/tools.go b/backend/components/flink/result-sender/src/tools.go new file mode 100644 index 000000000..0ba7f1cb6 --- /dev/null +++ b/backend/components/flink/result-sender/src/tools.go @@ -0,0 +1,158 @@ +package main + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "strings" + "time" +) + +// getFlinkResult sends an SQL statement to the Flink Gateway +func getFlinkResult(url, sessionID string) (FlinkResult, error) { + // Create the SQL Statement + fmt.Println("The Flink session is: ", sessionID) + payload := FlinkSQLRequest{ + Statement: "select * from output;", + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return FlinkResult{}, err + } + + req, err := http.NewRequest("POST", url+"/v1/sessions/"+sessionID+"/statements/", bytes.NewReader(payloadBytes)) + if err != nil { + return FlinkResult{}, err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return FlinkResult{}, err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse FlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return FlinkResult{}, err + } + + // Fetch the results from the operationHandle + fmt.Printf("Fetching result from: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionID, statementResponse.OperationHandle) + time.Sleep(10 * time.Second) + req, err = http.NewRequest("GET", url+"/v1/sessions/"+sessionID+"/operations/"+statementResponse.OperationHandle+"/result/0", nil) + if err != nil { + return FlinkResult{}, err + } + req.Header.Set("Content-Type", "application/json") + + client = &http.Client{} + resp, err = client.Do(req) + if err != nil { + return FlinkResult{}, err + } + body, err = io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var flinkResultResponse FlinkResultResponse + if err := json.Unmarshal(body, &flinkResultResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return FlinkResult{}, err + } + defer resp.Body.Close() + + // Handle the response (check if the request was successful) + return flinkResultResponse.Results, nil +} + +func sendMessage(message string, conversationId string, systemToken string) (int, string, error) { + messageContent := messageContent{ + Text: message, + } + messageToSend := ApplicationCommunicationSendMessage{ + ConversationID: conversationId, + Message: messageContent, + } + messageJSON, err := json.Marshal(messageToSend) + if err != nil { + fmt.Printf("Error encoding response to JSON: %v\n", err) + return 0, "", errors.New("The message could not be encoded to JSON for sending.") + } + + req, err := http.NewRequest("POST", "http://api-communication/messages.send", bytes.NewReader(messageJSON)) + if err != nil { + fmt.Printf("Error creating request: %v\n", err) + return 0, "", errors.New("The message could not be sent.") + } + req.Header.Add("Authorization", "Bearer "+systemToken) + req.Header.Add("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + fmt.Printf("Error sending POST request: %v\n", err) + return 0, "", errors.New("Error sending POST request.") + } + defer resp.Body.Close() + + // Read the response body + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + return 0, "", errors.New("Error reading response body.") + } + + var response SendMessageResponse + err = json.Unmarshal(body, &response) + if err != nil { + fmt.Println("Error unmarshaling response:", err) + return 0, "", errors.New("Response couldn't be unmarshaled.") + } + + fmt.Printf("Message sent with status code: %d\n", resp.StatusCode) + return resp.StatusCode, response.ID, nil +} + +func markdown(message string) (string, error) { + return message, nil +} + +func convertResultToMarkdown(result FlinkResult) (string, error) { + var builder strings.Builder + + // Add the header row + if len(result.Columns) == 0 { + return "", errors.New("No columns found for generating the Markdown table.") + } + for _, col := range result.Columns { + builder.WriteString("| " + col.Name + " ") + } + builder.WriteString("|\n") + + // Add the separator row + for range result.Columns { + builder.WriteString("|---") + } + builder.WriteString("|\n") + + // Add the data rows + for _, d := range result.Data { + for _, field := range d.Fields { + builder.WriteString(fmt.Sprintf("| %v ", field)) + } + builder.WriteString("|\n") + } + + return builder.String(), nil +} diff --git a/backend/components/flink/result-sender/src/types.go b/backend/components/flink/result-sender/src/types.go new file mode 100644 index 000000000..7c751feb0 --- /dev/null +++ b/backend/components/flink/result-sender/src/types.go @@ -0,0 +1,66 @@ +package main + +type ApplicationCommunicationSendMessage struct { + ConversationID string `json:"conversation_id"` + Message messageContent `json:"message"` + Metadata map[string]string `json:"metadata"` +} + +type messageContent struct { + Text string `json:"text"` +} + +type SendMessageResponse struct { + ID string `json:"id"` + State string `json:"state"` +} + +type FlinkOutput struct { + SessionID string `json:"session_id"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +// FlinkSQLRequest represents the payload for a Flink SQL request +type FlinkSQLRequest struct { + Statement string `json:"statement"` +} + +type FlinkSessionResponse struct { + SessionHandle string `json:"sessionHandle"` +} + +type FlinkStatementResponse struct { + OperationHandle string `json:"operationHandle"` +} + +type Column struct { + Name string `json:"name"` + LogicalType struct { + Type string `json:"type"` + Nullable bool `json:"nullable"` + Length int `json:"length,omitempty"` + } `json:"logicalType"` + Comment interface{} `json:"comment"` +} + +type Data struct { + Kind string `json:"kind"` + Fields []interface{} `json:"fields"` +} + +type FlinkResult struct { + Columns []Column `json:"columns"` + RowFormat string `json:"rowFormat"` + Data []Data `json:"data"` +} + +type FlinkResultResponse struct { + ResultType string `json:"resultType"` + IsQueryResult bool `json:"isQueryResult"` + JobID string `json:"jobID"` + ResultKind string `json:"resultKind"` + Results FlinkResult `json:"results"` + NextResultUri string `json:"nextResultUri"` +} diff --git a/backend/components/flink/statements-executor/Dockerfile b/backend/components/flink/statements-executor/Dockerfile new file mode 100644 index 000000000..13e3499ac --- /dev/null +++ b/backend/components/flink/statements-executor/Dockerfile @@ -0,0 +1,22 @@ +# Use the official Golang image as the base image +FROM golang:1.17 + +# Set the working directory inside the container +WORKDIR /app + +# Copy the Go source code into the container +COPY ./src/*.go ./ + +# Install the required libraries +RUN go mod init main && \ + go get github.com/confluentinc/confluent-kafka-go/v2/kafka && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde && \ + go get github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro && \ + go get golang.org/x/net + +# Build the Go program +RUN go build -o app + +# Command to run the Go program +CMD ["./app"] diff --git a/backend/components/flink/statements-executor/Makefile b/backend/components/flink/statements-executor/Makefile new file mode 100644 index 000000000..0a969da1d --- /dev/null +++ b/backend/components/flink/statements-executor/Makefile @@ -0,0 +1,6 @@ +build: + docker build -t flink-connector/statements-executor . + +release: build + docker tag flink-connector/statements-executor ghcr.io/airyhq/connectors/flink/statements-executor:release + docker push ghcr.io/airyhq/connectors/flink/statements-executor:release diff --git a/backend/components/flink/statements-executor/src/main.go b/backend/components/flink/statements-executor/src/main.go new file mode 100644 index 000000000..a1b60e883 --- /dev/null +++ b/backend/components/flink/statements-executor/src/main.go @@ -0,0 +1,126 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + // Create Kafka consumer to read the statements + kafkaURL := os.Getenv("KAFKA_BROKERS") + schemaRegistryURL := os.Getenv("KAFKA_SCHEMA_REGISTRY_URL") + topicName := os.Getenv("KAFKA_TOPIC_NAME") + //systemToken := os.Getenv("systemToken") + authUsername := os.Getenv("AUTH_JAAS_USERNAME") + authPassword := os.Getenv("AUTH_JAAS_PASSWORD") + timestamp := time.Now().Unix() + strTimestamp := fmt.Sprintf("%d", timestamp) + groupID := "statement-executor-" + strTimestamp + //llmEndpoint := "http://llm-controller:5000/llm.proxy" + + if kafkaURL == "" || schemaRegistryURL == "" || topicName == "" { + fmt.Println("KAFKA_BROKERS, KAFKA_SCHEMA_REGISTRY_URL, and KAFKA_TOPIC_NAME environment variables must be set") + return + } + + // Healthcheck + http.HandleFunc("/actuator/health", func(w http.ResponseWriter, r *http.Request) { + response := map[string]string{"status": "UP"} + jsonResponse, err := json.Marshal(response) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(jsonResponse) + }) + + go func() { + if err := http.ListenAndServe(":80", nil); err != nil { + panic(err) + } + }() + + fmt.Println("Health-check started") + + // Create Kafka consumer configuration + fmt.Println("Creating Kafka consumer for topic: ", topicName) + + c, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + "group.id": groupID, + "auto.offset.reset": "earliest", + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "sasl.username": authUsername, + "sasl.password": authPassword, + }) + if err != nil { + fmt.Printf("Error creating consumer: %v\n", err) + return + } + c.SubscribeTopics([]string{topicName}, nil) + // Channel for signals + signals := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + + go func() { + for { + select { + case sig := <-signals: + // If an interrupt signal is received, break the loop + fmt.Printf("Caught signal %v: terminating\n", sig) + done <- true + return + default: + msg, err := c.ReadMessage(-1) + if err == nil { + var statementSet FlinkStatementSet + if err := json.Unmarshal(msg.Value, &statementSet); err != nil { + fmt.Printf("Error unmarshalling message: %v\n", err) + continue + } else { + fmt.Printf("Received message: %+v\n", statementSet) + + flinkGatewayURL := "http://flink-jobmanager:8083" //"http://flink.us-east-2.aws.confluent.cloud/v1beta1/sql", Replace with your Flink Gateway URL https://flink.region.provider.confluent.cloud + fmt.Println("Flink gateway: ", flinkGatewayURL) + + sessionID, err := sendFlinkSQL(flinkGatewayURL, statementSet) + if err != nil { + fmt.Println("Error running Flink statement:", err) + return + } + fmt.Println("Successfully executing the Flink statement.") + var flinkOutput FlinkOutput + flinkOutput.SessionID = sessionID + flinkOutput.Question = statementSet.Question + flinkOutput.MessageID = statementSet.MessageID + flinkOutput.ConversationID = statementSet.ConversationID + err = produceFlinkOutput(flinkOutput, kafkaURL, "flink-producer-"+groupID, authUsername, authPassword) + if err != nil { + fmt.Printf("error producing message to Kafka: %v\n", err) + //sendMessage("I am sorry, I am unable to answer that question.", message.ConversationID, systemToken) + } + } + } else { + // Log the error and continue + fmt.Printf("Consumer error: %v\n", err) + } + } + } + }() + <-done + c.Close() + fmt.Println("Consumer closed") +} diff --git a/backend/components/flink/statements-executor/src/tools.go b/backend/components/flink/statements-executor/src/tools.go new file mode 100644 index 000000000..5021370e7 --- /dev/null +++ b/backend/components/flink/statements-executor/src/tools.go @@ -0,0 +1,142 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +// sendFlinkSQL sends an SQL statement to the Flink Gateway +func sendFlinkSQL(url string, statementSet FlinkStatementSet) (string, error) { + // Substitute placeholers with variables + timestamp := time.Now().Unix() + strTimestamp := fmt.Sprintf("%d", timestamp) + replacements := map[string]string{ + "{PROPERTIES_GROUP_ID}": "flink-" + strTimestamp, + "{PROPERTIES_BOOTSTRAP_SERVERS}": os.Getenv("KAFKA_BROKERS"), + "{PROPERTIES_SASL_JAAS_CONFIG}": fmt.Sprintf("org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", os.Getenv("AUTH_JAAS_USERNAME"), os.Getenv("AUTH_JAAS_PASSWORD")), + } + for i, stmt := range statementSet.Statements { + for placeholder, value := range replacements { + stmt = strings.Replace(stmt, placeholder, value, -1) + } + statementSet.Statements[i] = stmt + } + fmt.Println("Updated StatementSet: %+v\n", statementSet.Statements) + + // Create a sessionHandle + req, err := http.NewRequest("POST", url+"/v1/sessions/", bytes.NewReader([]byte(""))) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return "", err + } + body, err := io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Response: ", string(body)) + var sessionResponse FlinkSessionResponse + if err := json.Unmarshal(body, &sessionResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return "", err + } + defer resp.Body.Close() + + // Create the SQL Statement + fmt.Println("The Flink session is: ", sessionResponse.SessionHandle) + for _, statement := range statementSet.Statements { + payload := FlinkSQLRequest{ + Statement: statement, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return "", err + } + + req, err = http.NewRequest("POST", url+"/v1/sessions/"+sessionResponse.SessionHandle+"/statements/", bytes.NewReader(payloadBytes)) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/json") + + client = &http.Client{} + resp, err = client.Do(req) + if err != nil { + return "", err + } + body, err = io.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body from the API: %v", err) + } + fmt.Println("Statement submitted. Response: ", string(body)) + var statementResponse FlinkStatementResponse + if err := json.Unmarshal(body, &statementResponse); err != nil { + fmt.Printf("Error unmarshaling message: %v\n", err) + return "", err + } + fmt.Printf("Check status on: %s/v1/sessions/%s/operations/%s/result/0\n", url, sessionResponse.SessionHandle, statementResponse.OperationHandle) + defer resp.Body.Close() + } + + // Handle the response (check if the request was successful) + return sessionResponse.SessionHandle, nil +} + +func produceFlinkOutput(flinkOutput FlinkOutput, kafkaURL, groupID, authUsername, authPassword string) error { + + kafkaTopic := "flink.outputs" + + // Marshal the query to JSON + flinkOutputJSON, err := json.Marshal(flinkOutput) + if err != nil { + return fmt.Errorf("error marshaling query to JSON: %w", err) + } + + // Basic Kafka producer configuration + configMap := kafka.ConfigMap{ + "bootstrap.servers": kafkaURL, + } + // Conditionally add SASL/SSL configurations if username and password are provided + if authUsername != "" && authPassword != "" { + configMap.SetKey("security.protocol", "SASL_SSL") + configMap.SetKey("sasl.mechanisms", "PLAIN") + configMap.SetKey("sasl.username", authUsername) + configMap.SetKey("sasl.password", authPassword) + } + + // Create a new Kafka producer + producer, err := kafka.NewProducer(&configMap) + if err != nil { + return fmt.Errorf("failed to create producer: %w", err) + } + defer producer.Close() + + // Produce the message + message := kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &kafkaTopic, Partition: kafka.PartitionAny}, + Key: []byte(flinkOutput.SessionID), + Value: flinkOutputJSON, + } + + err = producer.Produce(&message, nil) + if err != nil { + return fmt.Errorf("failed to produce message: %w", err) + } + fmt.Println("message scheduled for production") + producer.Flush(15 * 1000) + fmt.Println("message flushed") + return nil +} diff --git a/backend/components/flink/statements-executor/src/types.go b/backend/components/flink/statements-executor/src/types.go new file mode 100644 index 000000000..6c9f04287 --- /dev/null +++ b/backend/components/flink/statements-executor/src/types.go @@ -0,0 +1,28 @@ +package main + +type FlinkStatementSet struct { + Statements []string `json:"statements"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +type FlinkOutput struct { + SessionID string `json:"session_id"` + Question string `json:"question"` + MessageID string `json:"message_id"` + ConversationID string `json:"conversation_id"` +} + +// FlinkSQLRequest represents the payload for a Flink SQL request +type FlinkSQLRequest struct { + Statement string `json:"statement"` +} + +type FlinkSessionResponse struct { + SessionHandle string `json:"sessionHandle"` +} + +type FlinkStatementResponse struct { + OperationHandle string `json:"operationHandle"` +}