Skip to content

Commit

Permalink
feat(planner): split scan table
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Jan 28, 2024
1 parent 30ed67c commit 763f447
Showing 1 changed file with 36 additions and 23 deletions.
59 changes: 36 additions & 23 deletions internal/topo/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,25 +324,7 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin
}
switch ss := si.(type) {
case api.SourceConnector:
// Create the connector node as source node
srcConnNode, err := node.NewSourceConnectorNode(string(t.name), ss, t.streamStmt.Options, options)
if err != nil {
return nil, nil, err
}
index++
// Create the decode node
decodeNode, err := node.NewDecodeOp(fmt.Sprintf("%d_decoder", index), ruleId, options, t.streamStmt.Options, t.isWildCard, t.isSchemaless, t.streamFields)
if err != nil {
return nil, nil, err
}
index++
ops := []node.OperatorNode{decodeNode}
// Create the preprocessor node if needed
if pp != nil {
ops = append(ops, Transform(pp, fmt.Sprintf("%d_preprocessor", index), options))
index++
}
return srcConnNode, ops, nil
return splitSource(t, ss, options, index, ruleId, pp)
default:
srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options, t.isWildCard, t.isSchemaless, t.streamFields)
if isMock {
Expand All @@ -351,6 +333,10 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin
return srcNode, nil, nil
}
case ast.TypeTable:
si, err := io.Source(t.streamStmt.Options.TYPE)
if err != nil {
return nil, nil, err
}

Check warning on line 339 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L338-L339

Added lines #L338 - L339 were not covered by tests
pp, err := operator.NewTableProcessor(isSchemaless, string(t.name), t.streamFields, t.streamStmt.Options)
if err != nil {
return nil, nil, err

Check warning on line 342 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L342

Added line #L342 was not covered by tests
Expand All @@ -360,15 +346,42 @@ func transformSourceNode(t *DataSourcePlan, mockSourcesProp map[string]map[strin
if t.isSchemaless {
schema = nil
}
srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options, t.isWildCard, t.isSchemaless, schema)
if isMock {
srcNode.SetProps(mockSourceConf)
switch ss := si.(type) {
case api.SourceConnector:
return splitSource(t, ss, options, index, ruleId, pp)

Check warning on line 351 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L350-L351

Added lines #L350 - L351 were not covered by tests
default:
srcNode := node.NewSourceNode(string(t.name), t.streamStmt.StreamType, pp, t.streamStmt.Options, options, t.isWildCard, t.isSchemaless, schema)
if isMock {
srcNode.SetProps(mockSourceConf)
}

Check warning on line 356 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L355-L356

Added lines #L355 - L356 were not covered by tests
return srcNode, nil, nil
}
return srcNode, nil, nil
}
return nil, nil, fmt.Errorf("unknown stream type %d", t.streamStmt.StreamType)

Check warning on line 360 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L360

Added line #L360 was not covered by tests
}

func splitSource(t *DataSourcePlan, ss api.SourceConnector, options *api.RuleOption, index int, ruleId string, pp node.UnOperation) (*node.SourceConnectorNode, []node.OperatorNode, error) {
// Create the connector node as source node
srcConnNode, err := node.NewSourceConnectorNode(string(t.name), ss, t.streamStmt.Options, options)
if err != nil {
return nil, nil, err
}

Check warning on line 368 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L367-L368

Added lines #L367 - L368 were not covered by tests
index++
// Create the decode node
decodeNode, err := node.NewDecodeOp(fmt.Sprintf("%d_decoder", index), ruleId, options, t.streamStmt.Options, t.isWildCard, t.isSchemaless, t.streamFields)
if err != nil {
return nil, nil, err
}

Check warning on line 374 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L373-L374

Added lines #L373 - L374 were not covered by tests
index++
ops := []node.OperatorNode{decodeNode}
// Create the preprocessor node if needed
if pp != nil {
ops = append(ops, Transform(pp, fmt.Sprintf("%d_preprocessor", index), options))
index++
}

Check warning on line 381 in internal/topo/planner/planner.go

View check run for this annotation

Codecov / codecov/patch

internal/topo/planner/planner.go#L379-L381

Added lines #L379 - L381 were not covered by tests
return srcConnNode, ops, nil
}

func createLogicalPlan(stmt *ast.SelectStatement, opt *api.RuleOption, store kv.KeyValue) (LogicalPlan, error) {
dimensions := stmt.Dimensions
var (
Expand Down

0 comments on commit 763f447

Please sign in to comment.