From eb969642199e2cfe3cc3a77247cbb750c8ef8ed7 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Tue, 10 Dec 2024 14:10:24 +0800 Subject: [PATCH] fix(state): untriggerd rule always cancel validate may plan topo but not run if rule is not triggered. This will lead to buffer full if the source is a shared source. Signed-off-by: Jiyong Huang --- fvt/rulestate_test.go | 240 ++++++++++++++++++++++++ fvt/sdk.go | 13 ++ internal/server/rule_manager.go | 9 +- internal/topo/planner/planner_source.go | 2 +- internal/topo/rule/state.go | 37 ++-- internal/topo/rule/state_test.go | 2 +- 6 files changed, 276 insertions(+), 27 deletions(-) create mode 100644 fvt/rulestate_test.go diff --git a/fvt/rulestate_test.go b/fvt/rulestate_test.go new file mode 100644 index 0000000000..ce7fa6048d --- /dev/null +++ b/fvt/rulestate_test.go @@ -0,0 +1,240 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fvt + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type RuleStateTestSuite struct { + suite.Suite +} + +func TestRuleTestSuite(t *testing.T) { + suite.Run(t, new(RuleStateTestSuite)) +} + +func (s *RuleStateTestSuite) TestUpdate() { + s.Run("init rule1", func() { + conf := map[string]any{ + "interval": "10ms", + } + resp, err := client.CreateConf("sources/simulator/confKeys/ttt", conf) + s.Require().NoError(err) + s.Require().Equal(http.StatusOK, resp.StatusCode) + + streamSql := `{"sql": "create stream simStream() WITH (TYPE=\"simulator\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}` + resp, err = client.CreateStream(streamSql) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + + ruleSql := `{ + "id": "rule1", + "name": "keep rule", + "sql": "SELECT * FROM simStream", + "actions": [ + { + "nop":{} + } + ], + "options": { + "sendError": false + } +}` + resp, err = client.CreateRule(ruleSql) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + + ruleSql2 := `{ + "id": "rule2", + "name": "to update rule", + "sql": "SELECT * FROM simStream", + "actions": [ + { + "nop":{} + } + ], + "options": { + "sendError": false, + "bufferLength": 2 + } +}` + resp, err = client.CreateRule(ruleSql2) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + }) + s.Run("stop and update rule2 but not start", func() { + resp, err := client.StopRule("rule2") + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusOK, resp.StatusCode) + + ruleSql2 := `{ + "id": "rule2", + "triggered": false, + "name": "to update rule", + "sql": "SELECT * FROM simStream", + "actions": [ + { + "nop":{} + } + ], + "options": { + "sendError": false, + "bufferLength": 2 + } +}` + resp, err = client.UpdateRule("rule2", ruleSql2) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusOK, resp.StatusCode) + }) + s.Run("check no buffer is not full exp", func() { + // Get metrics + metrics, err := client.GetRuleStatus("rule1") + s.Require().NoError(err) + s.Equal("running", metrics["status"]) + s.T().Log(metrics) + exp, ok := metrics["source_simStream_0_exceptions_total"] + s.True(ok) + s.Require().True(exp.(float64) == 0) + sinkOut1, ok := metrics["source_simStream_0_records_in_total"] + s.True(ok) + // Get 2nd metrics + time.Sleep(50 * time.Millisecond) + metrics, err = client.GetRuleStatus("rule1") + s.Require().NoError(err) + s.Equal("running", metrics["status"]) + s.T().Log(metrics) + exp, ok = metrics["source_simStream_0_exceptions_total"] + s.True(ok) + s.Require().True(exp.(float64) == 0, "has exception") + sinkOut2, ok := metrics["source_simStream_0_records_in_total"] + s.True(ok) + s.Require().True(sinkOut2.(float64)-sinkOut1.(float64) > 0) + }) + s.Run("clean up", func() { + res, e := client.Delete("rules/rule2") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + + res, e = client.Delete("rules/rule1") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + + res, e = client.Delete("streams/simStream") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + }) +} + +func (s *RuleStateTestSuite) TestCreateStoppedRule() { + s.Run("init rule1", func() { + conf := map[string]any{ + "interval": "10ms", + } + resp, err := client.CreateConf("sources/simulator/confKeys/ttt", conf) + s.Require().NoError(err) + s.Require().Equal(http.StatusOK, resp.StatusCode) + + streamSql := `{"sql": "create stream simStream() WITH (TYPE=\"simulator\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}` + resp, err = client.CreateStream(streamSql) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + + ruleSql := `{ + "id": "rule1", + "name": "keep rule", + "sql": "SELECT * FROM simStream", + "actions": [ + { + "nop":{} + } + ], + "options": { + "sendError": false + } +}` + resp, err = client.CreateRule(ruleSql) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + + ruleSql2 := `{ + "triggered": false, + "id": "rule2", + "name": "to update rule", + "sql": "SELECT * FROM simStream", + "actions": [ + { + "nop":{} + } + ], + "options": { + "sendError": false, + "bufferLength": 2 + } +}` + resp, err = client.CreateRule(ruleSql2) + s.Require().NoError(err) + s.T().Log(GetResponseText(resp)) + s.Require().Equal(http.StatusCreated, resp.StatusCode) + }) + s.Run("check no buffer is not full exp", func() { + // Get metrics + metrics, err := client.GetRuleStatus("rule1") + s.Require().NoError(err) + s.Equal("running", metrics["status"]) + s.T().Log(metrics) + exp, ok := metrics["source_simStream_0_exceptions_total"] + s.True(ok) + s.Require().True(exp.(float64) == 0) + sinkOut1, ok := metrics["source_simStream_0_records_in_total"] + s.True(ok) + // Get 2nd metrics + time.Sleep(50 * time.Millisecond) + metrics, err = client.GetRuleStatus("rule1") + s.Require().NoError(err) + s.Equal("running", metrics["status"]) + s.T().Log(metrics) + exp, ok = metrics["source_simStream_0_exceptions_total"] + s.True(ok) + s.Require().True(exp.(float64) == 0, "has exception") + sinkOut2, ok := metrics["source_simStream_0_records_in_total"] + s.True(ok) + s.Require().True(sinkOut2.(float64)-sinkOut1.(float64) > 0) + }) + s.Run("clean up", func() { + res, e := client.Delete("rules/rule2") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + + res, e = client.Delete("rules/rule1") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + + res, e = client.Delete("streams/simStream") + s.NoError(e) + s.Equal(http.StatusOK, res.StatusCode) + }) +} diff --git a/fvt/sdk.go b/fvt/sdk.go index 6f83d42654..d62ddd8ccb 100644 --- a/fvt/sdk.go +++ b/fvt/sdk.go @@ -89,6 +89,19 @@ func (sdk *SDK) RestartRule(ruleId string) (resp *http.Response, err error) { return http.Post(sdk.baseUrl.JoinPath("rules", ruleId, "restart").String(), ContentTypeJson, nil) } +func (sdk *SDK) StopRule(ruleId string) (resp *http.Response, err error) { + return http.Post(sdk.baseUrl.JoinPath("rules", ruleId, "stop").String(), ContentTypeJson, nil) +} + +func (sdk *SDK) UpdateRule(name, ruleJson string) (resp *http.Response, err error) { + req, err := http.NewRequest(http.MethodPut, sdk.baseUrl.JoinPath("rules", name).String(), bytes.NewBufferString(ruleJson)) + if err != nil { + fmt.Println(err) + return + } + return sdk.httpClient.Do(req) +} + func (sdk *SDK) DeleteRule(name string) (resp *http.Response, err error) { req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath("rules", name).String(), nil) if err != nil { diff --git a/internal/server/rule_manager.go b/internal/server/rule_manager.go index aeeec7b44d..1dc3bd99d7 100644 --- a/internal/server/rule_manager.go +++ b/internal/server/rule_manager.go @@ -113,7 +113,7 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error) // create state and save rs := rule.NewState(r) // Validate the topo - err = rs.Validate() + tp, err := rs.Validate() if err != nil { return r.Id, err } @@ -124,6 +124,7 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error) } // Start the rule asyncly if r.Triggered { + rs.WithTopo(tp) go func() { panicOrError := infra.SafeRun(func() error { // Start the rule which runs async @@ -133,6 +134,8 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error) logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError) } }() + } else if tp != nil { + tp.Cancel() } return r.Id, nil } @@ -174,7 +177,7 @@ func (rr *RuleRegistry) UpdateRule(ruleId, ruleJson string) error { oldRule := rs.Rule rs.Rule = r // validateRule only check plan is valid, topology shouldn't be changed before ruleState stop - newTopo, err := rs.ValidateRule() + newTopo, err := rs.Validate() if err != nil { rs.Rule = oldRule return err @@ -189,6 +192,8 @@ func (rr *RuleRegistry) UpdateRule(ruleId, ruleJson string) error { if err2 != nil { return err2 } + } else if newTopo != nil { + newTopo.Cancel() } return err1 } diff --git a/internal/topo/planner/planner_source.go b/internal/topo/planner/planner_source.go index 69ad3ef564..ac7a94c292 100644 --- a/internal/topo/planner/planner_source.go +++ b/internal/topo/planner/planner_source.go @@ -188,7 +188,7 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option index++ } - if t.streamStmt.Options.SHARED && len(ops) > 0 { + if t.streamStmt.Options.SHARED { // Create subtopo in the end to avoid errors in the middle srcSubtopo, existed := topo.GetOrCreateSubTopo(string(t.name)) if !existed { diff --git a/internal/topo/rule/state.go b/internal/topo/rule/state.go index e652205b88..77369a0720 100644 --- a/internal/topo/rule/state.go +++ b/internal/topo/rule/state.go @@ -113,33 +113,24 @@ func (s *State) WithTopo(topo *topo.Topo) *State { return s } -// Validate is the second level validation -// It tries to plan and return any errors -// Only run when creating the Rule -func (s *State) Validate() error { +// Validate tries to plan and return the planned topo and any errors +// Need to cancel the topo if it is of no use because the input/output channels are set +// Otherwise, the shared source may send to these channels and hang +func (s *State) Validate() (*topo.Topo, error) { s.Lock() defer s.Unlock() - err := infra.SafeRun(func() error { - if tp, err := planner.Plan(s.Rule); err != nil { - return err - } else { - s.topology = tp - } - return nil - }) - return err -} - -func (s *State) ValidateRule() (*topo.Topo, error) { - s.Lock() - defer s.Unlock() - var topo *topo.Topo - var err error - infra.SafeRun(func() error { - topo, err = planner.Plan(s.Rule) + var ( + tp *topo.Topo + err error + ) + err = infra.SafeRun(func() error { + tp, err = planner.Plan(s.Rule) return err }) - return topo, err + if err != nil { + return nil, err + } + return tp, err } func (s *State) transit(newState RunState, err error) { diff --git a/internal/topo/rule/state_test.go b/internal/topo/rule/state_test.go index d8deeeba41..2f6b93fc32 100644 --- a/internal/topo/rule/state_test.go +++ b/internal/topo/rule/state_test.go @@ -103,7 +103,7 @@ func TestAPIs(t *testing.T) { assert.Equal(t, Stopped, st.currentState) // Update rule st.Rule = def.GetDefaultRule("testAPI", "select abc from demo where a > 3") - e = st.Validate() + _, e = st.Validate() assert.NoError(t, e) e = st.Start() assert.NoError(t, e)