Skip to content

Commit

Permalink
Use QueryContext obj with context.Context embedded instead of passing…
Browse files Browse the repository at this point in the history
… two args
  • Loading branch information
andrewmains12 committed Jan 24, 2019
1 parent 888c501 commit c74939d
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/query/executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (e *Engine) ExecuteExpr(ctx context.Context, parser parser.Parser, opts *En
result := state.resultNode
results <- Query{Result: result}

if err := state.Execute(ctx, models.NewQueryContext(e.costScope, perQueryEnforcer)); err != nil {
if err := state.Execute(models.NewQueryContext(ctx, e.costScope, perQueryEnforcer)); err != nil {
result.abort(err)
} else {
result.done()
Expand Down
7 changes: 4 additions & 3 deletions src/query/executor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *ExecutionState) createNode(
}

// Execute the sources in parallel and return the first error
func (s *ExecutionState) Execute(ctx context.Context, queryCtx *models.QueryContext) error {
func (s *ExecutionState) Execute(queryCtx *models.QueryContext) error {
requests := make([]execution.Request, len(s.sources))
for idx, source := range s.sources {
requests[idx] = sourceRequest{
Expand All @@ -194,7 +194,7 @@ func (s *ExecutionState) Execute(ctx context.Context, queryCtx *models.QueryCont
}
}

return execution.ExecuteParallel(ctx, requests)
return execution.ExecuteParallel(queryCtx.Ctx, requests)
}

// String representation of the state
Expand All @@ -208,5 +208,6 @@ type sourceRequest struct {
}

func (s sourceRequest) Process(ctx context.Context) error {
return s.source.Execute(ctx, s.queryCtx)
// make sure to propagate the new context.Context object down.
return s.source.Execute(s.queryCtx.WithContext(ctx))
}
5 changes: 1 addition & 4 deletions src/query/executor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
package executor

import (
"context"
"testing"
"time"

"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/functions"
"github.com/m3db/m3/src/query/functions/aggregation"
"github.com/m3db/m3/src/query/models"
Expand All @@ -35,7 +33,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
)

func TestValidState(t *testing.T) {
Expand All @@ -59,7 +56,7 @@ func TestValidState(t *testing.T) {
state, err := GenerateExecutionState(p, store)
require.NoError(t, err)
require.Len(t, state.sources, 1)
err = state.Execute(context.Background(), models.NewQueryContext(tally.NoopScope, cost.NoopChainedEnforcer()))
err = state.Execute(models.NoopQueryContext())
assert.NoError(t, err)
}

Expand Down
5 changes: 3 additions & 2 deletions src/query/functions/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package functions

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -88,11 +87,13 @@ func (o FetchOp) Node(controller *transform.Controller, storage storage.Storage,
}

// Execute runs the fetch node operation
func (n *FetchNode) Execute(ctx context.Context, queryCtx *models.QueryContext) error {
func (n *FetchNode) Execute(queryCtx *models.QueryContext) error {
timeSpec := n.timespec
// No need to adjust start and ends since physical plan already considers the offset, range
startTime := timeSpec.Start
endTime := timeSpec.End
ctx := queryCtx.Ctx

blockResult, err := n.storage.FetchBlocks(ctx, &storage.FetchQuery{
Start: startTime,
End: endTime,
Expand Down
6 changes: 1 addition & 5 deletions src/query/functions/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
package functions

import (
"context"
"testing"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser"
Expand All @@ -44,9 +42,7 @@ func TestFetch(t *testing.T) {
mockStorage := mock.NewMockStorage()
mockStorage.SetFetchBlocksResult(block.Result{Blocks: []block.Block{b}}, nil)
source := (&FetchOp{}).Node(c, mockStorage, transform.Options{})
err := source.Execute(context.TODO(), &models.QueryContext{
Enforcer: cost.NoopChainedEnforcer(),
})
err := source.Execute(models.NoopQueryContext())
require.NoError(t, err)
expected := values
assert.Len(t, sink.Values, 2)
Expand Down
5 changes: 2 additions & 3 deletions src/query/functions/scalar/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package scalar

import (
"context"
"fmt"

"github.com/m3db/m3/src/query/block"
Expand Down Expand Up @@ -87,15 +86,15 @@ type baseNode struct {
}

// Execute runs the scalar node operation
func (n *baseNode) Execute(ctx context.Context, queryCtx *models.QueryContext) error {
func (n *baseNode) Execute(queryCtx *models.QueryContext) error {
bounds := n.timespec.Bounds()

block := block.NewScalar(n.op.fn, bounds)
if n.debug {
// Ignore any errors
iter, _ := block.StepIter()
if iter != nil {
logging.WithContext(ctx).Info("scalar node", zap.Any("meta", iter.Meta()))
logging.WithContext(queryCtx.Ctx).Info("scalar node", zap.Any("meta", iter.Meta()))
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/query/functions/scalar/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package scalar

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -52,7 +51,7 @@ func TestScalarTime(t *testing.T) {
Step: step,
},
})
err := node.Execute(context.Background(), models.NoopQueryContext())
err := node.Execute(models.NoopQueryContext())
require.NoError(t, err)
assert.Len(t, sink.Values, 1)

Expand Down
22 changes: 20 additions & 2 deletions src/query/models/query_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package models

import (
"context"

"github.com/m3db/m3/src/query/cost"

"github.com/uber-go/tally"
Expand All @@ -29,19 +31,35 @@ import (
// QueryContext provides all external state needed to execute and track a query. It acts as a hook back into the
// execution engine for things like cost accounting.
type QueryContext struct {
Ctx context.Context
Enforcer cost.ChainedEnforcer
Scope tally.Scope
}

// NewQueryContext constructs a QueryContext using the given Enforcer to enforce per query limits.
func NewQueryContext(scope tally.Scope, enforcer cost.ChainedEnforcer) *QueryContext {
func NewQueryContext(ctx context.Context, scope tally.Scope, enforcer cost.ChainedEnforcer) *QueryContext {
return &QueryContext{
Ctx: ctx,
Scope: scope,
Enforcer: enforcer,
}
}

// NoopQueryContext returns a query context with no active components.
func NoopQueryContext() *QueryContext {
return NewQueryContext(tally.NoopScope, cost.NoopChainedEnforcer())
return NewQueryContext(context.Background(), tally.NoopScope, cost.NoopChainedEnforcer())
}

// WithContext creates a shallow copy of this QueryContext using the new context. Sample usage:
// ctx, cancel := context.WithTimeout(qc.Ctx, 5*time.Second)
// defer cancel()
// qc = qc.WithContext(ctx)
func (qc *QueryContext) WithContext(ctx context.Context) *QueryContext {
if qc == nil {
return nil
}

clone := *qc
clone.Ctx = ctx
return &clone
}
48 changes: 48 additions & 0 deletions src/query/models/query_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package models

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestQueryContext_WithContext(t *testing.T) {
t.Run("passes along new context without modifying old", func(t *testing.T) {
qc := NoopQueryContext()

testKey := struct{}{}

newCtx := context.WithValue(qc.Ctx, testKey, "bar")

newQc := qc.WithContext(newCtx)

assert.Equal(t, "bar", newQc.Ctx.Value(testKey), "new context should be present")
assert.Equal(t, nil, qc.Ctx.Value(testKey), "old context should be the same")
})

t.Run("returns nil on nil", func(t *testing.T) {
var qc *QueryContext
assert.Nil(t, qc.WithContext(context.TODO()))
})
}
3 changes: 1 addition & 2 deletions src/query/parser/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package parser

import (
"context"
"fmt"

"github.com/m3db/m3/src/query/models"
Expand Down Expand Up @@ -79,5 +78,5 @@ func NewTransformFromOperation(Op Params, nextID int) Node {

// Source represents data sources which are handled differently than other transforms as they are always independent and can always be parallelized
type Source interface {
Execute(ctx context.Context, queryCtx *models.QueryContext) error
Execute(queryCtx *models.QueryContext) error
}
5 changes: 1 addition & 4 deletions src/query/test/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ import (
"time"

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"

"github.com/uber-go/tally"
)

// ValueMod can be used to modify provided values for testing
Expand Down Expand Up @@ -133,7 +130,7 @@ func NewBlockFromValuesWithMetaAndSeriesMeta(
seriesMeta []block.SeriesMeta,
seriesValues [][]float64,
) block.Block {
columnBuilder := block.NewColumnBlockBuilder(meta, seriesMeta, models.NewQueryContext(tally.NoopScope, cost.NoopChainedEnforcer()))
columnBuilder := block.NewColumnBlockBuilder(meta, seriesMeta, models.NoopQueryContext())
columnBuilder.AddCols(len(seriesValues[0]))
for _, seriesVal := range seriesValues {
for idx, val := range seriesVal {
Expand Down

0 comments on commit c74939d

Please sign in to comment.