Skip to content

Commit

Permalink
Include data alongside errors for a service's partial success (#213)
Browse files Browse the repository at this point in the history
* Bump nautilus/graphql to pick up partial success changes
* Add failing tests for partial success with one service
* Fix tests
    - Remove unused field
    - Extract channel sending from execution of a single step
    - Extract wait group handling from single step execution
    - Remove impossible condition with nil result

Fixes #212
  • Loading branch information
JohnStarich authored Aug 20, 2024
1 parent fb2d006 commit 6496791
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 60 deletions.
2 changes: 1 addition & 1 deletion cmd/gateway/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ go 1.16

require (
github.com/nautilus/gateway v0.3.17
github.com/nautilus/graphql v0.0.25
github.com/nautilus/graphql v0.0.26
github.com/spf13/cobra v0.0.5
)
2 changes: 2 additions & 0 deletions cmd/gateway/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/nautilus/graphql v0.0.23 h1:uiQV2SxOfl63xNr4E5Qk9IxecD2llM/gVqKbM2lEH
github.com/nautilus/graphql v0.0.23/go.mod h1:eoqPxo/+IdRSfLQKZ2V0al2vYeoMJKO72XpTsUYOmFI=
github.com/nautilus/graphql v0.0.25 h1:N+wwpxnlob3nQK/+HTUPeGfSR0ayehhNqeJq1cdcqSU=
github.com/nautilus/graphql v0.0.25/go.mod h1:ex8FB+RiAJoWGlL9iUcCWvetFOICVpvYP2jWi0cxp9E=
github.com/nautilus/graphql v0.0.26 h1:Mfv0Eazj+xKWlSSIcXgJN1zFWhbSQkPsz4IWSk0G5EQ=
github.com/nautilus/graphql v0.0.26/go.mod h1:ex8FB+RiAJoWGlL9iUcCWvetFOICVpvYP2jWi0cxp9E=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
Expand Down
115 changes: 57 additions & 58 deletions execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type ParallelExecutor struct{}
type queryExecutionResult struct {
InsertionPoint []string
Result map[string]interface{}
StripNode bool
Err error
}

// execution is broken up into two phases:
Expand Down Expand Up @@ -83,7 +83,7 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
// the root step could have multiple steps that have to happen
for _, step := range ctx.Plan.RootStep.Then {
stepWg.Add(1)
go executeStep(ctx, ctx.Plan, step, []string{}, resultLock, ctx.Variables, resultCh, errCh, stepWg)
go executeStep(ctx, ctx.Plan, step, []string{}, resultLock, ctx.Variables, resultCh, stepWg)
}

// the list of errors we have encountered while executing the plan
Expand All @@ -95,24 +95,23 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
select {
// we have a new result
case payload := <-resultCh:
if payload == nil {
continue
}
ctx.logger.Debug("Inserting result into ", payload.InsertionPoint)
ctx.logger.Debug("Result: ", payload.Result)

// we have to grab the value in the result and write it to the appropriate spot in the
// acumulator.
err := executorInsertObject(ctx, result, resultLock, payload.InsertionPoint, payload.Result)
if err != nil {
errCh <- err
continue
insertErr := executorInsertObject(ctx, result, resultLock, payload.InsertionPoint, payload.Result)

switch {
case payload.Err != nil: // response errors are the highest priority to return
errCh <- payload.Err
case insertErr != nil:
errCh <- insertErr
default:
ctx.logger.Debug("Done. ", result)
// one of the queries is done
stepWg.Done()
}

ctx.logger.Debug("Done. ", result)
// one of the queries is done
stepWg.Done()

case err := <-errCh:
if err != nil {
errMutex.Lock()
Expand Down Expand Up @@ -158,9 +157,41 @@ func executeStep(
resultLock *sync.Mutex,
queryVariables map[string]interface{},
resultCh chan *queryExecutionResult,
errCh chan error,
stepWg *sync.WaitGroup,
) {
queryResult, dependentSteps, queryErr := executeOneStep(ctx, plan, step, insertionPoint, resultLock, queryVariables)
// before publishing the current result, tell the wait-group about the dependent steps to wait for
stepWg.Add(len(dependentSteps))
ctx.logger.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult)
// send the result to be stitched in with our accumulator
resultCh <- &queryExecutionResult{
InsertionPoint: insertionPoint,
Result: queryResult,
Err: queryErr,
}
// We need to collect all the dependent steps and execute them after emitting the parent result in this function.
// This avoids a race condition, where the result of a dependent request is published to the
// result channel even before the result created in this iteration.
// Execute dependent steps after the main step has been published.
for _, sr := range dependentSteps {
ctx.logger.Info("Spawn ", sr.insertionPoint)
go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, stepWg)
}
}

type dependentStepArgs struct {
step *QueryPlanStep
insertionPoint []string
}

func executeOneStep(
ctx *ExecutionContext,
plan *QueryPlan,
step *QueryPlanStep,
insertionPoint []string,
resultLock *sync.Mutex,
queryVariables map[string]interface{},
) (map[string]interface{}, []dependentStepArgs, error) {
ctx.logger.Debug("Executing step to be inserted in ", step.ParentType, ". Insertion point: ", insertionPoint)

ctx.logger.Debug(step.SelectionSet)
Expand All @@ -186,14 +217,12 @@ func executeStep(
// get the data of the point
pointData, err := executorGetPointData(head)
if err != nil {
errCh <- err
return
return nil, nil, err
}

// if we dont have an id
if pointData.ID == "" {
errCh <- fmt.Errorf("Could not find id in path")
return
return nil, nil, fmt.Errorf("Could not find id in path")
}

// save the id as a variable to the query
Expand All @@ -202,8 +231,7 @@ func executeStep(

// if there is no queryer
if step.Queryer == nil {
errCh <- errors.New(" could not find queryer for step")
return
return nil, nil, errors.New(" could not find queryer for step")
}

// the query we will use
Expand Down Expand Up @@ -233,8 +261,7 @@ func executeStep(
}, &queryResult)
if err != nil {
ctx.logger.Warn("Network Error: ", err)
errCh <- err
return
return queryResult, nil, err
}

// NOTE: this insertion point could point to a list of values. If it did, we have to have
Expand All @@ -249,36 +276,19 @@ func executeStep(
// get the result from the response that we have to stitch there
extractedResult, err := executorExtractValue(ctx, queryResult, resultLock, []string{"node"})
if err != nil {
errCh <- err
return
return nil, nil, err
}

resultObj, ok := extractedResult.(map[string]interface{})
if !ok {
errCh <- fmt.Errorf("Query result of node query was not an object: %v", queryResult)
return
return nil, nil, fmt.Errorf("Query result of node query was not an object: %v", queryResult)
}

queryResult = resultObj
}

// we need to collect all the dependent steps and execute them at last in this function
// to avoid a race condition, where the result of a dependent request is published to the
// result channel even before the result created in this iteration
type stepArgs struct {
step *QueryPlanStep
insertionPoint []string
}
var dependentSteps []stepArgs
// defer the execution of the dependent steps after the main step has been published
defer func() {
for _, sr := range dependentSteps {
ctx.logger.Info("Spawn ", sr.insertionPoint)
go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, errCh, stepWg)
}
}()

// if there are next steps
var dependentSteps []dependentStepArgs
if len(step.Then) > 0 {
ctx.logger.Debug("Kicking off child queries")
// we need to find the ids of the objects we are inserting into and then kick of the worker with the right
Expand All @@ -288,30 +298,19 @@ func executeStep(
copy(copiedInsertionPoint, insertionPoint)
insertPoints, err := executorFindInsertionPoints(ctx, resultLock, dependent.InsertionPoint, step.SelectionSet, queryResult, [][]string{copiedInsertionPoint}, step.FragmentDefinitions)
if err != nil {
// reset dependent steps - result would be discarded anyways
dependentSteps = nil
errCh <- err
return
return nil, nil, err
}

// this dependent needs to fire for every object that the insertion point references
for _, insertionPoint := range insertPoints {
dependentSteps = append(dependentSteps, stepArgs{
dependentSteps = append(dependentSteps, dependentStepArgs{
step: dependent,
insertionPoint: insertionPoint,
})
}
}
}

// before publishing the current result, tell the wait-group about the dependent steps to wait for
stepWg.Add(len(dependentSteps))
ctx.logger.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult)
// send the result to be stitched in with our accumulator
resultCh <- &queryExecutionResult{
InsertionPoint: insertionPoint,
Result: queryResult,
}
return queryResult, dependentSteps, nil
}

func max(a, b int) int {
Expand Down Expand Up @@ -386,7 +385,7 @@ func executorFindInsertionPoints(ctx *ExecutionContext, resultLock *sync.Mutex,
// make sure we are looking at the top of the selection set next time
selectionSetRoot = foundSelection.SelectionSet

var value = resultChunk
value := resultChunk

// the bit of result chunk with the appropriate key should be a list
rootValue, ok := value[point]
Expand Down
48 changes: 48 additions & 0 deletions gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,3 +753,51 @@ type User {
}
`, resp.Body.String())
}

func TestDataAndErrorsBothReturnFromOneServicePartialSuccess(t *testing.T) {
t.Parallel()
schema, err := graphql.LoadSchema(`
type Query {
foo: String
bar: String
}
`)
require.NoError(t, err)
queryerFactory := QueryerFactory(func(ctx *PlanningContext, url string) graphql.Queryer {
return graphql.QueryerFunc(func(input *graphql.QueryInput) (interface{}, error) {
return map[string]interface{}{
"foo": "foo",
"bar": nil,
}, graphql.ErrorList{
&graphql.Error{
Message: "bar is broken",
Path: []interface{}{"bar"},
},
}
})
})
gateway, err := New([]*graphql.RemoteSchema{
{Schema: schema, URL: "boo"},
}, WithQueryerFactory(&queryerFactory))
require.NoError(t, err)

req := httptest.NewRequest(http.MethodPost, "/", strings.NewReader(`{"query": "query { foo bar }"}`))
resp := httptest.NewRecorder()
gateway.GraphQLHandler(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)
assert.JSONEq(t, `
{
"data": {
"foo": "foo",
"bar": null
},
"errors": [
{
"message": "bar is broken",
"path": ["bar"],
"extensions": null
}
]
}
`, resp.Body.String())
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/99designs/gqlgen v0.17.15
github.com/mitchellh/mapstructure v1.4.1
github.com/nautilus/graphql v0.0.25
github.com/nautilus/graphql v0.0.26
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/vektah/gqlparser/v2 v2.5.16
Expand Down
Loading

0 comments on commit 6496791

Please sign in to comment.