diff --git a/internal/gateway/proxy/proxy.go b/internal/gateway/proxy/proxy.go index 7c5ad6104..5867fa70b 100644 --- a/internal/gateway/proxy/proxy.go +++ b/internal/gateway/proxy/proxy.go @@ -18,6 +18,7 @@ import ( "context" "encoding/base64" "encoding/binary" + stderr "errors" "fmt" "net" "net/http" @@ -52,6 +53,7 @@ import ( "github.com/linkall-labs/vanus/pkg/errors" "github.com/linkall-labs/vanus/proto/pkg/cloudevents" ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller" + metapb "github.com/linkall-labs/vanus/proto/pkg/meta" proxypb "github.com/linkall-labs/vanus/proto/pkg/proxy" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/trace" @@ -241,23 +243,15 @@ func (cp *ControllerProxy) Subscribe(req *proxypb.SubscribeRequest, stream proxy newSink := fmt.Sprintf("http://%s:%d%s/%s", os.Getenv("POD_IP"), cp.cfg.SinkPort, httpRequestPrefix, req.SubscriptionId) if meta.Sink != newSink { - updateSubscriptionReq := &ctrlpb.UpdateSubscriptionRequest{ - Id: subscriptionID.Uint64(), - Subscription: &ctrlpb.SubscriptionRequest{ - Source: meta.Source, - Types: meta.Types, - Config: meta.Config, - Filters: meta.Filters, - Sink: newSink, - Protocol: meta.Protocol, - EventBus: meta.EventBus, - Transformer: meta.Transformer, - Name: meta.Name, - Description: meta.Description, - Disable: meta.Disable, - }, - } - _, err = cp.triggerCtrl.UpdateSubscription(_ctx, updateSubscriptionReq) + if err := cp.disableSubsciption(_ctx, req, subscriptionID.Uint64()); err != nil { + log.Error(_ctx, "disable subscription failed", map[string]interface{}{ + log.KeyError: err, + "id": req.SubscriptionId, + }) + return err + } + + _, err = cp.triggerCtrl.UpdateSubscription(_ctx, newSubscription(meta, subscriptionID.Uint64(), newSink)) if err != nil { log.Error(_ctx, "update subscription sink failed", map[string]interface{}{ log.KeyError: err, @@ -265,6 +259,18 @@ func (cp *ControllerProxy) Subscribe(req *proxypb.SubscribeRequest, stream proxy }) return err } + + resumeSubscriptionReq := &ctrlpb.ResumeSubscriptionRequest{ + Id: subscriptionID.Uint64(), + } + _, err = cp.triggerCtrl.ResumeSubscription(context.Background(), resumeSubscriptionReq) + if err != nil { + log.Error(_ctx, "resume subscription failed", map[string]interface{}{ + log.KeyError: err, + "id": req.SubscriptionId, + }) + return err + } } // 2. cache subscribe info @@ -287,8 +293,7 @@ func (cp *ControllerProxy) Subscribe(req *proxypb.SubscribeRequest, stream proxy break } log.Debug(_ctx, "subscribe stream send event", map[string]interface{}{ - log.KeyError: err, - "eventpb": eventpb.String(), + "eventpb": eventpb.String(), }) err = subscribe.stream().Send(&proxypb.SubscribeResponse{ SequenceId: msg.sequenceID, @@ -383,6 +388,61 @@ func ToProto(e *v2.Event) (*cloudevents.CloudEvent, error) { return container, nil } +func (cp *ControllerProxy) disableSubsciption( + ctx context.Context, req *proxypb.SubscribeRequest, subscriptionID uint64, +) error { + disableSubscriptionReq := &ctrlpb.DisableSubscriptionRequest{ + Id: subscriptionID, + } + _, err := cp.triggerCtrl.DisableSubscription(context.Background(), disableSubscriptionReq) + if err != nil { + log.Error(ctx, "disable subscription failed", map[string]interface{}{ + log.KeyError: err, + "id": req.SubscriptionId, + }) + return err + } + + // TODO(jiangkai): delete me after disable supports synchronization interface + retryTime := 3 + disable := false + for i := 0; i < retryTime; i++ { + s, _ := cp.triggerCtrl.GetSubscription(context.Background(), &ctrlpb.GetSubscriptionRequest{ + Id: subscriptionID, + }) + if s.Disable { + disable = true + break + } + stdtime.Sleep(stdtime.Second) + } + if !disable { + return stderr.New("disable is not completed, please try again") + } + return nil +} + +func newSubscription( + info *metapb.Subscription, subscriptionID uint64, newsink string, +) *ctrlpb.UpdateSubscriptionRequest { + return &ctrlpb.UpdateSubscriptionRequest{ + Id: subscriptionID, + Subscription: &ctrlpb.SubscriptionRequest{ + Source: info.Source, + Types: info.Types, + Config: info.Config, + Filters: info.Filters, + Sink: newsink, + Protocol: info.Protocol, + EventBus: info.EventBus, + Transformer: info.Transformer, + Name: info.Name, + Description: info.Description, + Disable: info.Disable, + }, + } +} + func attributeFor(v interface{}) (*cloudevents.CloudEvent_CloudEventAttributeValue, error) { vv, err := types.Validate(v) if err != nil {