Skip to content

Commit

Permalink
feat(trigger): eventbus name change to id (#504)
Browse files Browse the repository at this point in the history
* feat(trigger): eventbus name change to id

Signed-off-by: xdlbdy <[email protected]>

* feat(trigger): eventbus name change to id

Signed-off-by: xdlbdy <[email protected]>

* feat(trigger): eventbus name change to id

Signed-off-by: xdlbdy <[email protected]>

* feat(trigger): eventbus name change to id

Signed-off-by: xdlbdy <[email protected]>

* feat(trigger): eventbus name change to id

Signed-off-by: xdlbdy <[email protected]>

---------

Signed-off-by: xdlbdy <[email protected]>
  • Loading branch information
xdlbdy authored Mar 9, 2023
1 parent c17df3d commit 304a3b9
Show file tree
Hide file tree
Showing 29 changed files with 592 additions and 389 deletions.
16 changes: 9 additions & 7 deletions internal/controller/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ func (ctrl *controller) CreateSubscription(ctx context.Context,
return nil, err
}
// subscription name can't be repeated in an eventbus
_sub := ctrl.subscriptionManager.GetSubscriptionByName(ctx, request.Subscription.Eventbus, request.Subscription.Name)
eventbusID := vanus.NewIDFromUint64(request.Subscription.EventbusId)
_sub := ctrl.subscriptionManager.GetSubscriptionByName(ctx, eventbusID, request.Subscription.Name)
if _sub != nil {
return nil, errors.ErrInvalidRequest.WithMessage(
fmt.Sprintf("subscription name %s has exist", request.Subscription.Name))
Expand Down Expand Up @@ -226,12 +227,12 @@ func (ctrl *controller) UpdateSubscription(ctx context.Context,
if err := validation.ValidateSubscriptionRequest(ctx, request.Subscription); err != nil {
return nil, err
}
if request.Subscription.Eventbus != sub.Eventbus {
if request.Subscription.EventbusId != uint64(sub.EventbusID) {
return nil, errors.ErrInvalidRequest.WithMessage("can not change eventbus")
}
if request.Subscription.Name != sub.Name {
// subscription name can't be repeated in an eventbus
_sub := ctrl.subscriptionManager.GetSubscriptionByName(ctx, sub.Eventbus, request.Subscription.Name)
_sub := ctrl.subscriptionManager.GetSubscriptionByName(ctx, sub.EventbusID, request.Subscription.Name)
if _sub != nil {
return nil, errors.ErrInvalidRequest.WithMessage(
fmt.Sprintf("subscription name %s has exist", request.Subscription.Name))
Expand All @@ -253,7 +254,7 @@ func (ctrl *controller) UpdateSubscription(ctx context.Context,
return nil, err
}
if transChange != 0 {
metrics.SubscriptionTransformerGauge.WithLabelValues(sub.Eventbus).Add(float64(transChange))
metrics.SubscriptionTransformerGauge.WithLabelValues(sub.EventbusID.Key()).Add(float64(transChange))
}
resp := convert.ToPbSubscription(sub, nil)
return resp, nil
Expand Down Expand Up @@ -459,7 +460,7 @@ func (ctrl *controller) ListSubscription(ctx context.Context,
subscriptions := ctrl.subscriptionManager.ListSubscription(ctx)
list := make([]*metapb.Subscription, 0, len(subscriptions))
for _, sub := range subscriptions {
if request.Eventbus != "" && request.Eventbus != sub.Eventbus {
if request.EventbusId != 0 && request.EventbusId != sub.EventbusID.Uint64() {
continue
}
if request.Name != "" && !strings.Contains(sub.Name, request.Name) {
Expand Down Expand Up @@ -619,7 +620,8 @@ func (ctrl *controller) Start() error {
return err
}
ctrl.secretStorage = secretStorage
ctrl.subscriptionManager = subscription.NewSubscriptionManager(ctrl.storage, ctrl.secretStorage, ctrl.ebClient)
ctrl.subscriptionManager = subscription.NewSubscriptionManager(ctrl.storage, ctrl.secretStorage,
ctrl.ebClient, ctrl.cl)
ctrl.workerManager = worker.NewTriggerWorkerManager(worker.Config{}, ctrl.storage,
ctrl.subscriptionManager, ctrl.requeueSubscription)
ctrl.scheduler = worker.NewSubscriptionScheduler(ctrl.workerManager, ctrl.subscriptionManager)
Expand Down Expand Up @@ -650,7 +652,7 @@ func (ctrl *controller) initTriggerSystemEventbus() {
}

// TODO(JiangKai) save id
if _, err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.GetRetryEventbusName(""),
if _, err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.RetryEventbusName,
"System Eventbus For Trigger Service"); err != nil {
log.Error(ctx, "failed to create RetryEventbus, exit", map[string]interface{}{
log.KeyError: err,
Expand Down
77 changes: 39 additions & 38 deletions internal/controller/trigger/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,21 @@ func TestController_CreateSubscription(t *testing.T) {
subManager.EXPECT().AddSubscription(gomock.Any(), gomock.Any()).AnyTimes().Return(nil)
create := &ctrlpb.CreateSubscriptionRequest{
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-bus",
Name: "test-name",
Sink: "test-sink",
EventbusId: vanus.NewTestID().Uint64(),
Name: "test-name",
Sink: "test-sink",
},
}
request := create.Subscription
vanus.InitFakeSnowflake()
resp, err := ctrl.CreateSubscription(ctx, create)
So(err, ShouldBeNil)
So(resp.Sink, ShouldEqual, request.Sink)
So(resp.Eventbus, ShouldEqual, request.Eventbus)
So(resp.EventbusId, ShouldEqual, request.EventbusId)
resp2, err := ctrl.CreateSubscription(ctx, create)
So(err, ShouldBeNil)
So(resp2.Sink, ShouldEqual, request.Sink)
So(resp2.Eventbus, ShouldEqual, request.Eventbus)
So(resp2.EventbusId, ShouldEqual, request.EventbusId)
So(resp.Id, ShouldNotEqual, resp2.Id)
})
})
Expand All @@ -163,6 +163,7 @@ func TestController_UpdateSubscription(t *testing.T) {
ctrl.scheduler = worker.NewSubscriptionScheduler(ctrl.workerManager, ctrl.subscriptionManager)

subID := vanus.NewTestID()
eventbusID := vanus.NewTestID()
ctrl.state = primitive.ServerStateRunning
Convey("update subscription not exist", func() {
subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil)
Expand All @@ -179,7 +180,7 @@ func TestController_UpdateSubscription(t *testing.T) {
ID: subID,
Phase: metadata.SubscriptionPhaseStopped,
TriggerWorker: "test-addr",
Eventbus: "test-eb",
EventbusID: eventbusID,
Name: "test-name",
Sink: "test-sink",
Protocol: primitive.HTTPProtocol,
Expand All @@ -193,8 +194,8 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Sink: "test-sink",
EventbusId: eventbusID.Uint64(),
Sink: "test-sink",
},
}
_, err := ctrl.UpdateSubscription(ctx, request)
Expand All @@ -204,8 +205,8 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb-modify",
Sink: "test-sink",
EventbusId: vanus.NewTestID().Uint64(),
Sink: "test-sink",
},
}
_, err := ctrl.UpdateSubscription(ctx, request)
Expand All @@ -216,9 +217,9 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Sink: "test-sink",
Protocol: metapb.Protocol_AWS_LAMBDA,
EventbusId: eventbusID.Uint64(),
Sink: "test-sink",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_AWS,
},
Expand All @@ -231,9 +232,9 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
EventbusId: eventbusID.Uint64(),
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_PLAIN,
},
Expand All @@ -246,9 +247,9 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
EventbusId: eventbusID.Uint64(),
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_AWS,
},
Expand All @@ -261,10 +262,10 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Name: "test-name",
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "arn:aws:lambda:us-west-2:843378899134:function:xdltest",
Protocol: metapb.Protocol_AWS_LAMBDA,
SinkCredential: &metapb.SinkCredential{
CredentialType: metapb.SinkCredential_AWS,
Credential: &metapb.SinkCredential_Aws{
Expand All @@ -291,25 +292,25 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Name: "test-name",
Sink: "modify-sink",
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "modify-sink",
},
}
resp, err := ctrl.UpdateSubscription(ctx, request)
So(err, ShouldBeNil)
So(resp.Sink, ShouldEqual, request.Subscription.Sink)
So(resp.Sink, ShouldNotEqual, sub.Sink)
So(resp.Eventbus, ShouldEqual, sub.Eventbus)
So(resp.EventbusId, ShouldEqual, sub.EventbusID.Uint64())
})
Convey("update filters", func() {
subManager.EXPECT().UpdateSubscription(gomock.Any(), gomock.Any()).Return(nil)
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Name: "test-name",
Sink: "test-sink",
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "test-sink",
Filters: []*metapb.Filter{
{
Exact: map[string]string{"type": "test"},
Expand All @@ -327,9 +328,9 @@ func TestController_UpdateSubscription(t *testing.T) {
request := &ctrlpb.UpdateSubscriptionRequest{
Id: subID.Uint64(),
Subscription: &ctrlpb.SubscriptionRequest{
Eventbus: "test-eb",
Name: "test-name",
Sink: "test-sink",
EventbusId: eventbusID.Uint64(),
Name: "test-name",
Sink: "test-sink",
Transformer: &metapb.Transformer{
Define: map[string]string{"k": "v"},
Template: "test",
Expand Down Expand Up @@ -428,14 +429,14 @@ func TestController_GetSubscription(t *testing.T) {
})
Convey("get subscription exist", func() {
sub := &metadata.Subscription{
ID: subID,
Eventbus: "test-bus",
ID: subID,
EventbusID: vanus.NewTestID(),
}
subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(sub)
subManager.EXPECT().GetOffset(gomock.Any(), gomock.Any()).Return(info.ListOffsetInfo{}, nil)
resp, err := ctrl.GetSubscription(ctx, request)
So(err, ShouldBeNil)
So(resp.Eventbus, ShouldEqual, sub.Eventbus)
So(resp.EventbusId, ShouldEqual, sub.EventbusID.Uint64())
So(resp.Id, ShouldEqual, sub.ID)
})
})
Expand All @@ -454,8 +455,8 @@ func TestController_ListSubscription(t *testing.T) {
ctrl.scheduler = worker.NewSubscriptionScheduler(ctrl.workerManager, ctrl.subscriptionManager)
Convey("list subscription", func() {
list := []*metadata.Subscription{
{ID: vanus.NewTestID(), Eventbus: "bus1"},
{ID: vanus.NewTestID(), Eventbus: "bus2"},
{ID: vanus.NewTestID(), EventbusID: vanus.NewTestID()},
{ID: vanus.NewTestID(), EventbusID: vanus.NewTestID()},
}
subManager.EXPECT().ListSubscription(gomock.Any()).Return(list)
subManager.EXPECT().GetOffset(gomock.Any(), gomock.Any()).AnyTimes().Return(info.ListOffsetInfo{}, nil)
Expand Down
11 changes: 7 additions & 4 deletions internal/controller/trigger/metadata/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,20 @@ type Subscription struct {
SinkCredential primitive.SinkCredential `json:"-"`
Protocol primitive.Protocol `json:"protocol,omitempty"`
ProtocolSetting *primitive.ProtocolSetting `json:"protocol_settings,omitempty"`
Eventbus string `json:"eventbus"`
EventbusID vanus.ID `json:"eventbus_id"`
Transformer *primitive.Transformer `json:"transformer,omitempty"`
Name string `json:"name"`
Description string `json:"description"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`

// not from api
Phase SubscriptionPhase `json:"phase"`
TriggerWorker string `json:"trigger_worker,omitempty"`
HeartbeatTime time.Time `json:"-"`
DeadLetterEventbusID vanus.ID `json:"dead_letter_eventbus_id"`
RetryEventbusID vanus.ID `json:"retry_eventbus_id"`
TimerEventbusID vanus.ID `json:"timer_eventbus_id"`
Phase SubscriptionPhase `json:"phase"`
TriggerWorker string `json:"trigger_worker,omitempty"`
HeartbeatTime time.Time `json:"-"`
}

// Update property change from api .
Expand Down
18 changes: 9 additions & 9 deletions internal/controller/trigger/storage/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ func TestGetSubscription(t *testing.T) {
defer ctrl.Finish()
kvClient := kv.NewMockClient(ctrl)
s := NewSubscriptionStorage(kvClient).(*subscriptionStorage)
subID := vanus.ID(1)
subID := vanus.NewTestID()
Convey("get subscription", t, func() {
expect := &metadata.Subscription{
ID: subID,
Eventbus: "bus",
ID: subID,
EventbusID: vanus.NewTestID(),
}
v, _ := json.Marshal(expect)
kvClient.EXPECT().Get(ctx, s.getKey(subID)).Return(v, nil)
data, err := s.GetSubscription(ctx, subID)
So(err, ShouldBeNil)
So(data.Eventbus, ShouldEqual, expect.Eventbus)
So(data.EventbusID, ShouldEqual, expect.EventbusID)
})
}

Expand All @@ -86,7 +86,7 @@ func TestDeleteSubscription(t *testing.T) {
defer ctrl.Finish()
kvClient := kv.NewMockClient(ctrl)
s := NewSubscriptionStorage(kvClient).(*subscriptionStorage)
subID := vanus.ID(1)
subID := vanus.NewTestID()
Convey("delete subscription", t, func() {
kvClient.EXPECT().Delete(ctx, s.getKey(subID)).Return(nil)
err := s.DeleteSubscription(ctx, subID)
Expand All @@ -100,11 +100,11 @@ func TestListSubscription(t *testing.T) {
defer ctrl.Finish()
kvClient := kv.NewMockClient(ctrl)
s := NewSubscriptionStorage(kvClient).(*subscriptionStorage)
subID := vanus.ID(1)
subID := vanus.NewTestID()
Convey("list subscription", t, func() {
expect := &metadata.Subscription{
ID: subID,
Eventbus: "bus",
ID: subID,
EventbusID: vanus.NewTestID(),
}
v, _ := json.Marshal(expect)
kvClient.EXPECT().List(ctx, KeyPrefixSubscription.String()).Return([]kv.Pair{
Expand All @@ -113,6 +113,6 @@ func TestListSubscription(t *testing.T) {
list, err := s.ListSubscription(ctx)
So(err, ShouldBeNil)
So(len(list), ShouldEqual, 1)
So(list[0].Eventbus, ShouldEqual, expect.Eventbus)
So(list[0].EventbusID, ShouldEqual, expect.EventbusID)
})
}
8 changes: 4 additions & 4 deletions internal/controller/trigger/subscription/mock_subscription.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 304a3b9

Please sign in to comment.