Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix variadic functions #1846

Merged
merged 5 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci
65 changes: 27 additions & 38 deletions src/query/executor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ import (
"github.com/pkg/errors"
)

// ExecutionState represents the execution hierarchy
// ExecutionState represents the execution hierarchy.
type ExecutionState struct {
plan plan.PhysicalPlan
sources []parser.Source
resultNode Result
storage storage.Storage
}

// CreateSource creates a source node
// CreateSource creates a source node.
func CreateSource(
ID parser.NodeID,
params SourceParams, storage storage.Storage,
Expand All @@ -53,7 +53,7 @@ func CreateSource(
return params.Node(controller, storage, options), controller
}

// CreateScalarSource creates a scalar source node
// CreateScalarSource creates a scalar source node.
func CreateScalarSource(
ID parser.NodeID,
params ScalarParams,
Expand All @@ -63,47 +63,31 @@ func CreateScalarSource(
return params.Node(controller, options), controller
}

// CreateTransform creates a transform node which works on functions and contains state
// CreateTransform creates a transform node which works on functions and
// contains state.
func CreateTransform(
ID parser.NodeID,
params transform.Params,
options transform.Options,
) (transform.OpNode, *transform.Controller) {
controller := &transform.Controller{ID: ID}
node := params.Node(controller, options)

switch node.(type) {
case transform.SeriesNode:
return transform.NewLazyNode(node, controller)

case transform.StepNode:
return transform.NewLazyNode(node, controller)

default:
return node, controller
}
return params.Node(controller, options), controller
}

// SourceParams are defined by sources
// SourceParams are defined by sources.
type SourceParams interface {
parser.Params
Node(
controller *transform.Controller,
storage storage.Storage,
options transform.Options,
) parser.Source
Node(ctrl *transform.Controller, storage storage.Storage,
opts transform.Options) parser.Source
}

// ScalarParams are defined by sources
// ScalarParams are defined by sources.
type ScalarParams interface {
parser.Params
Node(
controller *transform.Controller,
options transform.Options,
) parser.Source
Node(ctrl *transform.Controller, opts transform.Options) parser.Source
}

// GenerateExecutionState creates an execution state from the physical plan
// GenerateExecutionState creates an execution state from the physical plan.
func GenerateExecutionState(
pplan plan.PhysicalPlan,
storage storage.Storage,
Expand All @@ -118,7 +102,8 @@ func GenerateExecutionState(

step, ok := pplan.Step(result.Parent)
if !ok {
return nil, fmt.Errorf("incorrect parent reference in result node, parentId: %s", result.Parent)
return nil, fmt.Errorf("incorrect parent reference in result node, "+
"parentId: %s", result.Parent)
}

options, err := transform.NewOptions(transform.OptionsParams{
Expand Down Expand Up @@ -148,16 +133,16 @@ func GenerateExecutionState(
return state, nil
}

// createNode helps to create an execution node recursively
// TODO: consider modifying this function so that ExecutionState can have a non pointer receiver
// createNode helps to create an execution node recursively.
func (s *ExecutionState) createNode(
step plan.LogicalStep,
options transform.Options,
) (*transform.Controller, error) {
// TODO: consider using a registry instead of casting to an interface
// TODO: consider using a registry instead of casting to an interface.
sourceParams, ok := step.Transform.Op.(SourceParams)
if ok {
source, controller := CreateSource(step.ID(), sourceParams, s.storage, options)
source, controller := CreateSource(step.ID(), sourceParams,
s.storage, options)
s.sources = append(s.sources, source)
return controller, nil
}
Expand All @@ -174,11 +159,13 @@ func (s *ExecutionState) createNode(
return nil, fmt.Errorf("invalid transform step: %s", step)
}

transformNode, controller := CreateTransform(step.ID(), transformParams, options)
transformNode, controller := CreateTransform(step.ID(),
transformParams, options)
for _, parentID := range step.Parents {
parentStep, ok := s.plan.Step(parentID)
if !ok {
return nil, fmt.Errorf("incorrect parent reference, parentId: %s, node: %s", parentID, step.ID())
return nil, fmt.Errorf("incorrect parent reference, parentId: "+
"%s, node: %s", parentID, step.ID())
}

parentController, err := s.createNode(parentStep, options)
Expand All @@ -192,7 +179,7 @@ func (s *ExecutionState) createNode(
return controller, nil
}

// Execute the sources in parallel and return the first error
// Execute the sources in parallel and return the first error.
func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error {
requests := make([]execution.Request, len(s.sources))
for idx, source := range s.sources {
Expand All @@ -205,16 +192,18 @@ func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error {
return execution.ExecuteParallel(queryCtx.Ctx, requests)
}

// String representation of the state
// String representation of the state.
func (s *ExecutionState) String() string {
return fmt.Sprintf("plan: %s\nsources: %s\nresult: %s", s.plan, s.sources, s.resultNode)
return fmt.Sprintf("plan: %s\nsources: %s\nresult: %s",
s.plan, s.sources, s.resultNode)
}

type sourceRequest struct {
source parser.Source
queryCtx *models.QueryContext
}

// Process processes the new request.
func (s sourceRequest) Process(ctx context.Context) error {
// make sure to propagate the new context.Context object down.
return s.source.Execute(s.queryCtx.WithContext(ctx))
Expand Down
Loading