From a9af1a3996489285f57f63413c49c8c0b86bdfda Mon Sep 17 00:00:00 2001 From: horoc Date: Thu, 10 Nov 2022 00:57:21 +0800 Subject: [PATCH] add workflow eventmest queue type --- eventmesh-workflow-go/config/config.go | 2 + .../internal/constants/constants.go | 2 + .../internal/queue/eventmesh_queue.go | 174 ++++++++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 eventmesh-workflow-go/internal/queue/eventmesh_queue.go diff --git a/eventmesh-workflow-go/config/config.go b/eventmesh-workflow-go/config/config.go index 7e8bdee542..3d68aa087e 100644 --- a/eventmesh-workflow-go/config/config.go +++ b/eventmesh-workflow-go/config/config.go @@ -28,6 +28,7 @@ type Config struct { Flow struct { Queue struct { Store string `yaml:"store"` + Topic string `yaml:"topic"` } `yaml:"queue"` Scheduler struct { Type string `yaml:"type"` @@ -50,6 +51,7 @@ type Config struct { UserName string `yaml:"username"` Password string `yaml:"password"` ProducerGroup string `yaml:"producer_group"` + ConsumerGroup string `yaml:"consumer_group"` TTL int `yaml:"ttl"` } `yaml:"eventmesh"` } diff --git a/eventmesh-workflow-go/internal/constants/constants.go b/eventmesh-workflow-go/internal/constants/constants.go index 2abe5c830c..735879769a 100644 --- a/eventmesh-workflow-go/internal/constants/constants.go +++ b/eventmesh-workflow-go/internal/constants/constants.go @@ -21,6 +21,8 @@ type QueueType int const ( // QueueTypeInMemory in memory queue type QueueTypeInMemory = "in-memory" + // QueueTypeEventMesh EventMesh queue + QueueTypeEventMesh = "eventmesh" ) const ( diff --git a/eventmesh-workflow-go/internal/queue/eventmesh_queue.go b/eventmesh-workflow-go/internal/queue/eventmesh_queue.go new file mode 100644 index 0000000000..4440fdf47c --- /dev/null +++ b/eventmesh-workflow-go/internal/queue/eventmesh_queue.go @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "context" + "encoding/json" + sdk "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc" + sdk_conf "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf" + sdk_pb "github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto" + "github.com/apache/incubator-eventmesh/eventmesh-server-go/log" + "github.com/gogf/gf/util/gconv" + "github.com/google/uuid" + + conf "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/config" + + "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/constants" + "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal" + "github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model" +) + +func init() { + cfg := conf.Get() + // init and register EventMesh task queue only when config corresponding queue type + if cfg != nil && cfg.Flow.Queue.Store == constants.QueueTypeEventMesh { + RegisterQueue(newEventMeshQueue(cfg)) + } +} + +type eventMeshQueue struct { + // EventMesh go sdk grpc client + grpcClient sdk.Interface + // EventMesh go sdk grpc config + grpcConfig *sdk_conf.GRPCConfig + + workflowConfig *conf.Config + workflowDAL dal.WorkflowDAL + observeTopic string +} + +func newEventMeshQueue(workflowConfig *conf.Config) ObserveQueue { + eventMeshConfig := workflowConfig.EventMesh + grpcConfig := &sdk_conf.GRPCConfig{ + Host: eventMeshConfig.Host, + Port: eventMeshConfig.GRPC.Port, + ENV: eventMeshConfig.Env, + IDC: eventMeshConfig.IDC, + SYS: eventMeshConfig.Sys, + Username: eventMeshConfig.UserName, + Password: eventMeshConfig.Password, + ProtocolType: sdk.EventmeshMessage, + ConsumerConfig: sdk_conf.ConsumerConfig{ + Enabled: true, + ConsumerGroup: eventMeshConfig.ConsumerGroup, + }, + } + client, err := sdk.New(grpcConfig) + if err != nil { + log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to init EventMesh client , error=%v", err) + panic(err) + } + return &eventMeshQueue{ + grpcClient: client, + grpcConfig: grpcConfig, + workflowConfig: workflowConfig, + observeTopic: workflowConfig.Flow.Queue.Topic, + workflowDAL: dal.NewWorkflowDAL(), + } +} + +func (q *eventMeshQueue) Name() string { + return constants.QueueTypeEventMesh +} + +// Publish send task to EventMesh queue, store task info in message content with json structure +func (q *eventMeshQueue) Publish(tasks []*model.WorkflowTaskInstance) error { + if len(tasks) == 0 { + return nil + } + for _, task := range tasks { + if task == nil { + continue + } + message, err := q.toEventMeshMessage(task) + if err != nil { + log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err) + return err + } + _, err = q.grpcClient.Publish(context.Background(), message) + if err != nil { + log.Get(constants.LogQueue).Errorf("EventMesh task queue, fail to publish task, error=%v", err) + return err + } + } + return nil +} + +// Ack do nothing +func (q *eventMeshQueue) Ack(tasks *model.WorkflowTaskInstance) error { + return nil +} + +// Observe consume task by EventMesh subscription api +func (q *eventMeshQueue) Observe() { + err := q.grpcClient.SubscribeStream(sdk_conf.SubscribeItem{ + Topic: q.observeTopic, + SubscribeMode: sdk_conf.CLUSTERING, + SubscribeType: sdk_conf.SYNC, + }, q.handler) + if err != nil { + log.Get(constants.LogQueue).Errorf("EventMesh task queue observe error=%v", err) + panic(err) + } +} + +func (q *eventMeshQueue) handler(message *sdk_pb.SimpleMessage) interface{} { + workflowTask, err := q.toWorkflowTask(message) + if err != nil { + return err + } + log.Get(constants.LogQueue).Infof("receive task from EventMesh queue, task=%s", gconv.String(workflowTask)) + if workflowTask.ID != 0 { + if err := q.workflowDAL.UpdateTaskInstance(dal.GetDalClient(), workflowTask); err != nil { + log.Get(constants.LogQueue).Errorf("EventMesh task queue observe UpdateTaskInstance error=%v", err) + } + return err + } + // new task + if err := q.workflowDAL.InsertTaskInstance(context.Background(), workflowTask); err != nil { + log.Get(constants.LogQueue).Errorf("EventMesh task queue observe InsertTaskInstance error=%v", err) + } + return nil +} + +func (q *eventMeshQueue) toEventMeshMessage(task *model.WorkflowTaskInstance) (*sdk_pb.SimpleMessage, error) { + taskJsonBytes, err := json.Marshal(task) + if err != nil { + return nil, err + } + + message := &sdk_pb.SimpleMessage{ + Header: sdk.CreateHeader(q.grpcConfig), + ProducerGroup: q.workflowConfig.EventMesh.ProducerGroup, + Topic: q.observeTopic, + Content: string(taskJsonBytes), + Ttl: gconv.String(q.workflowConfig.EventMesh.TTL), + UniqueId: uuid.New().String(), + SeqNum: uuid.New().String(), + } + return message, nil +} + +func (q *eventMeshQueue) toWorkflowTask(message *sdk_pb.SimpleMessage) (*model.WorkflowTaskInstance, error) { + taskJsonBytes := []byte(message.Content) + task := &model.WorkflowTaskInstance{} + err := json.Unmarshal(taskJsonBytes, task) + if err != nil { + return nil, err + } + return task, nil +}