From 4842e8f294d86ba7983928f4248d0bbec23bf685 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 27 Mar 2024 11:14:51 +0800 Subject: [PATCH 1/4] fix Signed-off-by: yisaer --- internal/processor/rule.go | 6 ++++++ internal/server/rest_test.go | 23 +++++++++++++++++++++++ internal/server/rule_manager.go | 4 ++++ 3 files changed, 33 insertions(+) diff --git a/internal/processor/rule.go b/internal/processor/rule.go index 5a677585a5..4458702e78 100644 --- a/internal/processor/rule.go +++ b/internal/processor/rule.go @@ -235,6 +235,12 @@ func clone(opt api.RuleOption) *api.RuleOption { } } +func (p *RuleProcessor) ExecExists(name string) bool { + var s1 string + f, _ := p.db.Get(name, &s1) + return f +} + func (p *RuleProcessor) ExecDesc(name string) (string, error) { var s1 string f, _ := p.db.Get(name, &s1) diff --git a/internal/server/rest_test.go b/internal/server/rest_test.go index 6a5a8d7ca5..3464071247 100644 --- a/internal/server/rest_test.go +++ b/internal/server/rest_test.go @@ -793,3 +793,26 @@ func (suite *RestTestSuite) TestCreateRuleReplacePasswd() { require.True(suite.T(), ok) require.Equal(suite.T(), "4444", c["password"]) } + +func (suite *RestTestSuite) TestCreateDuplicateRule() { + buf1 := bytes.NewBuffer([]byte(`{"sql":"CREATE stream demo123() WITH (DATASOURCE=\"0\", TYPE=\"mqtt\")"}`)) + req1, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/streams", buf1) + w1 := httptest.NewRecorder() + suite.r.ServeHTTP(w1, req1) + + ruleJson2 := `{"id":"test12345","triggered":false,"sql":"select * from demo123","actions":[{"log":{}}]}` + buf2 := bytes.NewBuffer([]byte(ruleJson2)) + req2, _ := http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf2) + w2 := httptest.NewRecorder() + suite.r.ServeHTTP(w2, req2) + require.Equal(suite.T(), http.StatusCreated, w2.Code) + + buf2 = bytes.NewBuffer([]byte(ruleJson2)) + req2, _ = http.NewRequest(http.MethodPost, "http://localhost:8080/rules", buf2) + w2 = httptest.NewRecorder() + suite.r.ServeHTTP(w2, req2) + require.Equal(suite.T(), http.StatusBadRequest, w2.Code) + var returnVal []byte + returnVal, _ = io.ReadAll(w2.Result().Body) + require.Equal(suite.T(), `{"error":1000,"message":"rule test12345 already exists"}`+"\ng", string(returnVal)) +} diff --git a/internal/server/rule_manager.go b/internal/server/rule_manager.go index 81e902d9bf..4dfdc7689e 100644 --- a/internal/server/rule_manager.go +++ b/internal/server/rule_manager.go @@ -85,6 +85,10 @@ func createRule(name, ruleJson string) (string, error) { return "", fmt.Errorf("invalid rule json: %v", err) } + if exists := ruleProcessor.ExecExists(r.Id); exists { + return "", fmt.Errorf("rule %v already exists", r.Id) + } + // Validate the topo err = infra.SafeRun(func() error { rs, err = createRuleState(r) From 5a690ad09747bf73524616e5c43f2e3559184797 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 27 Mar 2024 12:52:30 +0800 Subject: [PATCH 2/4] fix Signed-off-by: yisaer --- internal/server/rest_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/server/rest_test.go b/internal/server/rest_test.go index 3464071247..afe9710b21 100644 --- a/internal/server/rest_test.go +++ b/internal/server/rest_test.go @@ -814,5 +814,5 @@ func (suite *RestTestSuite) TestCreateDuplicateRule() { require.Equal(suite.T(), http.StatusBadRequest, w2.Code) var returnVal []byte returnVal, _ = io.ReadAll(w2.Result().Body) - require.Equal(suite.T(), `{"error":1000,"message":"rule test12345 already exists"}`+"\ng", string(returnVal)) + require.Equal(suite.T(), `{"error":1000,"message":"rule test12345 already exists"}`+"\n", string(returnVal)) } From 304f3be993f519e4d110939ab3f2aa72db366e15 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 27 Mar 2024 14:54:47 +0800 Subject: [PATCH 3/4] address the comment Signed-off-by: yisaer --- internal/server/rule_manager.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/internal/server/rule_manager.go b/internal/server/rule_manager.go index 4dfdc7689e..0c8411fd57 100644 --- a/internal/server/rule_manager.go +++ b/internal/server/rule_manager.go @@ -75,9 +75,14 @@ func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) { return result, ok } -func createRule(name, ruleJson string) (string, error) { +func createRule(name, ruleJson string) (id string, err error) { var rs *rule.RuleState = nil - var err error = nil + defer func() { + if err != nil { + // Do not store to registry so also delete the KV + deleteRule(id) + } + }() // Validate the rule json r, err := ruleProcessor.GetRuleByJson(name, ruleJson) @@ -86,7 +91,7 @@ func createRule(name, ruleJson string) (string, error) { } if exists := ruleProcessor.ExecExists(r.Id); exists { - return "", fmt.Errorf("rule %v already exists", r.Id) + return r.Id, fmt.Errorf("rule %v already exists", r.Id) } // Validate the topo @@ -101,8 +106,6 @@ func createRule(name, ruleJson string) (string, error) { // Store to KV err = ruleProcessor.ExecCreate(r.Id, ruleJson) if err != nil { - // Do not store to registry so also delete the KV - deleteRule(r.Id) return r.Id, fmt.Errorf("store the rule error: %v", err) } From dd295d2a2cad2952130416dfc84e7df6712b2f3d Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 27 Mar 2024 16:12:47 +0800 Subject: [PATCH 4/4] fix Signed-off-by: yisaer --- internal/server/rule_manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/server/rule_manager.go b/internal/server/rule_manager.go index 0c8411fd57..b3e16ac333 100644 --- a/internal/server/rule_manager.go +++ b/internal/server/rule_manager.go @@ -77,12 +77,6 @@ func (rr *RuleRegistry) Delete(key string) (*rule.RuleState, bool) { func createRule(name, ruleJson string) (id string, err error) { var rs *rule.RuleState = nil - defer func() { - if err != nil { - // Do not store to registry so also delete the KV - deleteRule(id) - } - }() // Validate the rule json r, err := ruleProcessor.GetRuleByJson(name, ruleJson) @@ -102,6 +96,12 @@ func createRule(name, ruleJson string) (id string, err error) { if err != nil { return r.Id, err } + defer func() { + if err != nil { + // Do not store to registry so also delete the KV + deleteRule(id) + } + }() // Store to KV err = ruleProcessor.ExecCreate(r.Id, ruleJson)