Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state): untriggerd rule always cancel #3436

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions fvt/rulestate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fvt

import (
"net/http"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

type RuleStateTestSuite struct {
suite.Suite
}

func TestRuleTestSuite(t *testing.T) {
suite.Run(t, new(RuleStateTestSuite))
}

func (s *RuleStateTestSuite) TestUpdate() {
s.Run("init rule1", func() {
conf := map[string]any{
"interval": "10ms",
}
resp, err := client.CreateConf("sources/simulator/confKeys/ttt", conf)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)

streamSql := `{"sql": "create stream simStream() WITH (TYPE=\"simulator\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}`
resp, err = client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql := `{
"id": "rule1",
"name": "keep rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false
}
}`
resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql2 := `{
"id": "rule2",
"name": "to update rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false,
"bufferLength": 2
}
}`
resp, err = client.CreateRule(ruleSql2)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
})
s.Run("stop and update rule2 but not start", func() {
resp, err := client.StopRule("rule2")
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusOK, resp.StatusCode)

ruleSql2 := `{
"id": "rule2",
"triggered": false,
"name": "to update rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false,
"bufferLength": 2
}
}`
resp, err = client.UpdateRule("rule2", ruleSql2)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusOK, resp.StatusCode)
})
s.Run("check no buffer is not full exp", func() {
// Get metrics
metrics, err := client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok := metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0)
sinkOut1, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
// Get 2nd metrics
time.Sleep(50 * time.Millisecond)
metrics, err = client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok = metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0, "has exception")
sinkOut2, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
s.Require().True(sinkOut2.(float64)-sinkOut1.(float64) > 0)
})
s.Run("clean up", func() {
res, e := client.Delete("rules/rule2")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("rules/rule1")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("streams/simStream")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)
})
}

func (s *RuleStateTestSuite) TestCreateStoppedRule() {
s.Run("init rule1", func() {
conf := map[string]any{
"interval": "10ms",
}
resp, err := client.CreateConf("sources/simulator/confKeys/ttt", conf)
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)

streamSql := `{"sql": "create stream simStream() WITH (TYPE=\"simulator\", FORMAT=\"json\", CONF_KEY=\"ttt\", SHARED=\"true\")"}`
resp, err = client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql := `{
"id": "rule1",
"name": "keep rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false
}
}`
resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

ruleSql2 := `{
"triggered": false,
"id": "rule2",
"name": "to update rule",
"sql": "SELECT * FROM simStream",
"actions": [
{
"nop":{}
}
],
"options": {
"sendError": false,
"bufferLength": 2
}
}`
resp, err = client.CreateRule(ruleSql2)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)
})
s.Run("check no buffer is not full exp", func() {
// Get metrics
metrics, err := client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok := metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0)
sinkOut1, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
// Get 2nd metrics
time.Sleep(50 * time.Millisecond)
metrics, err = client.GetRuleStatus("rule1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
exp, ok = metrics["source_simStream_0_exceptions_total"]
s.True(ok)
s.Require().True(exp.(float64) == 0, "has exception")
sinkOut2, ok := metrics["source_simStream_0_records_in_total"]
s.True(ok)
s.Require().True(sinkOut2.(float64)-sinkOut1.(float64) > 0)
})
s.Run("clean up", func() {
res, e := client.Delete("rules/rule2")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("rules/rule1")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)

res, e = client.Delete("streams/simStream")
s.NoError(e)
s.Equal(http.StatusOK, res.StatusCode)
})
}
13 changes: 13 additions & 0 deletions fvt/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ func (sdk *SDK) RestartRule(ruleId string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath("rules", ruleId, "restart").String(), ContentTypeJson, nil)
}

func (sdk *SDK) StopRule(ruleId string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath("rules", ruleId, "stop").String(), ContentTypeJson, nil)
}

func (sdk *SDK) UpdateRule(name, ruleJson string) (resp *http.Response, err error) {
req, err := http.NewRequest(http.MethodPut, sdk.baseUrl.JoinPath("rules", name).String(), bytes.NewBufferString(ruleJson))
if err != nil {
fmt.Println(err)
return
}
return sdk.httpClient.Do(req)
}

func (sdk *SDK) DeleteRule(name string) (resp *http.Response, err error) {
req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath("rules", name).String(), nil)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions internal/server/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error)
// create state and save
rs := rule.NewState(r)
// Validate the topo
err = rs.Validate()
tp, err := rs.Validate()
if err != nil {
return r.Id, err
}
Expand All @@ -124,6 +124,7 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error)
}
// Start the rule asyncly
if r.Triggered {
rs.WithTopo(tp)
go func() {
panicOrError := infra.SafeRun(func() error {
// Start the rule which runs async
Expand All @@ -133,6 +134,8 @@ func (rr *RuleRegistry) CreateRule(name, ruleJson string) (id string, err error)
logger.Errorf("Rule %s start failed: %s", r.Id, panicOrError)
}
}()
} else if tp != nil {
tp.Cancel()
}
return r.Id, nil
}
Expand Down Expand Up @@ -174,7 +177,7 @@ func (rr *RuleRegistry) UpdateRule(ruleId, ruleJson string) error {
oldRule := rs.Rule
rs.Rule = r
// validateRule only check plan is valid, topology shouldn't be changed before ruleState stop
newTopo, err := rs.ValidateRule()
newTopo, err := rs.Validate()
if err != nil {
rs.Rule = oldRule
return err
Expand All @@ -189,6 +192,8 @@ func (rr *RuleRegistry) UpdateRule(ruleId, ruleJson string) error {
if err2 != nil {
return err2
}
} else if newTopo != nil {
newTopo.Cancel()
}
return err1
}
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/planner/planner_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option
index++
}

if t.streamStmt.Options.SHARED && len(ops) > 0 {
if t.streamStmt.Options.SHARED {
// Create subtopo in the end to avoid errors in the middle
srcSubtopo, existed := topo.GetOrCreateSubTopo(string(t.name))
if !existed {
Expand Down
37 changes: 14 additions & 23 deletions internal/topo/rule/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,33 +113,24 @@ func (s *State) WithTopo(topo *topo.Topo) *State {
return s
}

// Validate is the second level validation
// It tries to plan and return any errors
// Only run when creating the Rule
func (s *State) Validate() error {
// Validate tries to plan and return the planned topo and any errors
// Need to cancel the topo if it is of no use because the input/output channels are set
// Otherwise, the shared source may send to these channels and hang
func (s *State) Validate() (*topo.Topo, error) {
s.Lock()
defer s.Unlock()
err := infra.SafeRun(func() error {
if tp, err := planner.Plan(s.Rule); err != nil {
return err
} else {
s.topology = tp
}
return nil
})
return err
}

func (s *State) ValidateRule() (*topo.Topo, error) {
s.Lock()
defer s.Unlock()
var topo *topo.Topo
var err error
infra.SafeRun(func() error {
topo, err = planner.Plan(s.Rule)
var (
tp *topo.Topo
err error
)
err = infra.SafeRun(func() error {
tp, err = planner.Plan(s.Rule)
return err
})
return topo, err
if err != nil {
return nil, err
}
return tp, err
}

func (s *State) transit(newState RunState, err error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/rule/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestAPIs(t *testing.T) {
assert.Equal(t, Stopped, st.currentState)
// Update rule
st.Rule = def.GetDefaultRule("testAPI", "select abc from demo where a > 3")
e = st.Validate()
_, e = st.Validate()
assert.NoError(t, e)
e = st.Start()
assert.NoError(t, e)
Expand Down
Loading