Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix ruletest when shared stream #3539

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/topo/node/source_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ func (m *SourceNode) ingestEof(ctx api.StreamContext) {
m.Broadcast(xsql.EOFTuple(0))
}

// GetSource only used for test
func (m *SourceNode) GetSource() api.Source {
return m.s
}

const (
OffsetKey = "$$offset"
)
Expand Down
2 changes: 2 additions & 0 deletions internal/topo/planner/dataSourcePlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type DataSourcePlan struct {
fields map[string]*ast.JsonStreamField
metaMap map[string]string
pruneFields []string
// inRuleTest means whether in the rule test mode
inRuleTest bool
}

func (p DataSourcePlan) Init() *DataSourcePlan {
Expand Down
3 changes: 2 additions & 1 deletion internal/topo/planner/planner_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func transformSourceNode(ctx api.StreamContext, t *DataSourcePlan, mockSourcesPr
mockProps, isMock := mockSourcesProp[string(t.name)]
if isMock {
t.streamStmt.Options.TYPE = "simulator"
t.inRuleTest = true
}
strType := t.streamStmt.Options.TYPE
if strType == "" {
Expand Down Expand Up @@ -192,7 +193,7 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option
index++
}

if t.streamStmt.Options.SHARED {
if t.streamStmt.Options.SHARED && !t.inRuleTest {
// Create subtopo in the end to avoid errors in the middle
srcSubtopo, existed := topo.GetOrCreateSubTopo(string(t.name))
if !existed {
Expand Down
5 changes: 5 additions & 0 deletions internal/topo/topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func NewWithNameAndOptions(name string, options *def.RuleOption) (*Topo, error)
return tp, nil
}

// GetSourceNodes only for test
func (s *Topo) GetSourceNodes() []node.DataSourceNode {
return s.sources
}

func (s *Topo) SetStreams(streams []string) {
if s == nil {
return
Expand Down
37 changes: 36 additions & 1 deletion internal/trial/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,53 @@ import (

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/io/http/httpserver"
"github.com/lf-edge/ekuiper/v2/internal/io/simulator"
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
"github.com/lf-edge/ekuiper/v2/internal/processor"
"github.com/lf-edge/ekuiper/v2/internal/topo/node"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

func TestTrialRuleSharedStream(t *testing.T) {
ip := "127.0.0.1"
port := 10092
httpserver.InitGlobalServerManager(ip, port, nil)
defer httpserver.ShutDown()
connection.InitConnectionManager4Test()
conf.IsTesting = true
conf.InitConf()
dataDir, err := conf.GetDataLoc()
require.NoError(t, err)
require.NoError(t, store.SetupDefault(dataDir))
p := processor.NewStreamProcessor()
p.ExecStmt("DROP STREAM sharedemo876")

_, err = p.ExecStmt("CREATE STREAM sharedemo876 () WITH (DATASOURCE=\"sharedemo876\", SHARED=\"TRUE\")")
require.NoError(t, err)
defer p.ExecStmt("DROP STREAM sharedemo876")

mockDef1 := `{"id":"sharedrule876","sql":"select * from sharedemo876","mockSource":{"sharedemo876":{"data":[{"name":"demo876","value":1}],"interval":100,"loop":false}},"sinkProps":{"sendSingle":true}}`
id, err := TrialManager.CreateRule(mockDef1)
require.NoError(t, err)
require.Equal(t, "sharedrule876", id)
tp, ok := TrialManager.runs["sharedrule876"]
require.True(t, ok)
srcNodes := tp.topo.GetSourceNodes()
require.Len(t, srcNodes, 1)
srcNode, ok := srcNodes[0].(*node.SourceNode)
require.True(t, ok)
_, ok = srcNode.GetSource().(*simulator.SimulatorSource)
require.True(t, ok)
TrialManager.StopRule("sharedrule876")
}

// Run two test rules in parallel. Rerun one of the rules
func TestTrialRule(t *testing.T) {
ip := "127.0.0.1"
port := 10091
httpserver.InitGlobalServerManager(ip, port, nil)
defer httpserver.ShutDown()
connection.InitConnectionManager4Test()
conf.IsTesting = true
conf.InitConf()
Expand Down Expand Up @@ -70,7 +106,6 @@ func TestTrialRule(t *testing.T) {

// Test 4 Rule without mock
testRealSourceTrial(t)
require.Equal(t, 0, len(TrialManager.runs))
}

func testValidTrial(t *testing.T, mockDef1 string) {
Expand Down
Loading