Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
Includes changes to:
* Remove unused field
* Extract channel sending from execution of a single step
* Extract wait group handling from single step execution
* Remove impossible condition with nil result
  • Loading branch information
JohnStarich committed Aug 20, 2024
1 parent a91c055 commit 009179a
Showing 1 changed file with 56 additions and 55 deletions.
111 changes: 56 additions & 55 deletions execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type ParallelExecutor struct{}
type queryExecutionResult struct {
InsertionPoint []string
Result map[string]interface{}
StripNode bool
Err error
}

// execution is broken up into two phases:
Expand Down Expand Up @@ -83,7 +83,7 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
// the root step could have multiple steps that have to happen
for _, step := range ctx.Plan.RootStep.Then {
stepWg.Add(1)
go executeStep(ctx, ctx.Plan, step, []string{}, resultLock, ctx.Variables, resultCh, errCh, stepWg)
go executeStep(ctx, ctx.Plan, step, []string{}, resultLock, ctx.Variables, resultCh, stepWg)
}

// the list of errors we have encountered while executing the plan
Expand All @@ -95,24 +95,23 @@ func (executor *ParallelExecutor) Execute(ctx *ExecutionContext) (map[string]int
select {
// we have a new result
case payload := <-resultCh:
if payload == nil {
continue
}
ctx.logger.Debug("Inserting result into ", payload.InsertionPoint)
ctx.logger.Debug("Result: ", payload.Result)

// we have to grab the value in the result and write it to the appropriate spot in the
// acumulator.
err := executorInsertObject(ctx, result, resultLock, payload.InsertionPoint, payload.Result)
if err != nil {
errCh <- err
continue
insertErr := executorInsertObject(ctx, result, resultLock, payload.InsertionPoint, payload.Result)

switch {
case payload.Err != nil: // response errors are the highest priority to return
errCh <- payload.Err
case insertErr != nil:
errCh <- insertErr
default:
ctx.logger.Debug("Done. ", result)
// one of the queries is done
stepWg.Done()
}

ctx.logger.Debug("Done. ", result)
// one of the queries is done
stepWg.Done()

case err := <-errCh:
if err != nil {
errMutex.Lock()
Expand Down Expand Up @@ -158,9 +157,40 @@ func executeStep(
resultLock *sync.Mutex,
queryVariables map[string]interface{},
resultCh chan *queryExecutionResult,
errCh chan error,
stepWg *sync.WaitGroup,
) {
queryResult, dependentSteps, queryErr := executeOneStep(ctx, plan, step, insertionPoint, resultLock, queryVariables)
// before publishing the current result, tell the wait-group about the dependent steps to wait for
stepWg.Add(len(dependentSteps))
// send the result to be stitched in with our accumulator
resultCh <- &queryExecutionResult{
InsertionPoint: insertionPoint,
Result: queryResult,
Err: queryErr,
}
// We need to collect all the dependent steps and execute them after emitting the parent result in this function.
// This avoids a race condition, where the result of a dependent request is published to the
// result channel even before the result created in this iteration.
// Execute dependent steps after the main step has been published.
for _, sr := range dependentSteps {
ctx.logger.Info("Spawn ", sr.insertionPoint)
go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, stepWg)
}
}

type dependentStepArgs struct {
step *QueryPlanStep
insertionPoint []string
}

func executeOneStep(
ctx *ExecutionContext,
plan *QueryPlan,
step *QueryPlanStep,
insertionPoint []string,
resultLock *sync.Mutex,
queryVariables map[string]interface{},
) (map[string]interface{}, []dependentStepArgs, error) {
ctx.logger.Debug("Executing step to be inserted in ", step.ParentType, ". Insertion point: ", insertionPoint)

ctx.logger.Debug(step.SelectionSet)
Expand All @@ -186,14 +216,12 @@ func executeStep(
// get the data of the point
pointData, err := executorGetPointData(head)
if err != nil {
errCh <- err
return
return nil, nil, err
}

// if we dont have an id
if pointData.ID == "" {
errCh <- fmt.Errorf("Could not find id in path")
return
return nil, nil, fmt.Errorf("Could not find id in path")
}

// save the id as a variable to the query
Expand All @@ -202,8 +230,7 @@ func executeStep(

// if there is no queryer
if step.Queryer == nil {
errCh <- errors.New(" could not find queryer for step")
return
return nil, nil, errors.New(" could not find queryer for step")
}

// the query we will use
Expand Down Expand Up @@ -233,8 +260,7 @@ func executeStep(
}, &queryResult)
if err != nil {
ctx.logger.Warn("Network Error: ", err)
errCh <- err
return
return queryResult, nil, err
}

// NOTE: this insertion point could point to a list of values. If it did, we have to have
Expand All @@ -249,36 +275,19 @@ func executeStep(
// get the result from the response that we have to stitch there
extractedResult, err := executorExtractValue(ctx, queryResult, resultLock, []string{"node"})
if err != nil {
errCh <- err
return
return nil, nil, err
}

resultObj, ok := extractedResult.(map[string]interface{})
if !ok {
errCh <- fmt.Errorf("Query result of node query was not an object: %v", queryResult)
return
return nil, nil, fmt.Errorf("Query result of node query was not an object: %v", queryResult)
}

queryResult = resultObj
}

// we need to collect all the dependent steps and execute them at last in this function
// to avoid a race condition, where the result of a dependent request is published to the
// result channel even before the result created in this iteration
type stepArgs struct {
step *QueryPlanStep
insertionPoint []string
}
var dependentSteps []stepArgs
// defer the execution of the dependent steps after the main step has been published
defer func() {
for _, sr := range dependentSteps {
ctx.logger.Info("Spawn ", sr.insertionPoint)
go executeStep(ctx, plan, sr.step, sr.insertionPoint, resultLock, queryVariables, resultCh, errCh, stepWg)
}
}()

// if there are next steps
var dependentSteps []dependentStepArgs
if len(step.Then) > 0 {
ctx.logger.Debug("Kicking off child queries")
// we need to find the ids of the objects we are inserting into and then kick of the worker with the right
Expand All @@ -288,30 +297,22 @@ func executeStep(
copy(copiedInsertionPoint, insertionPoint)
insertPoints, err := executorFindInsertionPoints(ctx, resultLock, dependent.InsertionPoint, step.SelectionSet, queryResult, [][]string{copiedInsertionPoint}, step.FragmentDefinitions)
if err != nil {
// reset dependent steps - result would be discarded anyways
dependentSteps = nil
errCh <- err
return
return nil, nil, err
}

// this dependent needs to fire for every object that the insertion point references
for _, insertionPoint := range insertPoints {
dependentSteps = append(dependentSteps, stepArgs{
dependentSteps = append(dependentSteps, dependentStepArgs{
step: dependent,
insertionPoint: insertionPoint,
})
}
}
}

// before publishing the current result, tell the wait-group about the dependent steps to wait for
stepWg.Add(len(dependentSteps))
ctx.logger.Debug("Pushing Result. Insertion point: ", insertionPoint, ". Value: ", queryResult)
// send the result to be stitched in with our accumulator
resultCh <- &queryExecutionResult{
InsertionPoint: insertionPoint,
Result: queryResult,
}
return queryResult, dependentSteps, nil
}

func max(a, b int) int {
Expand Down Expand Up @@ -386,7 +387,7 @@ func executorFindInsertionPoints(ctx *ExecutionContext, resultLock *sync.Mutex,
// make sure we are looking at the top of the selection set next time
selectionSetRoot = foundSelection.SelectionSet

var value = resultChunk
value := resultChunk

// the bit of result chunk with the appropriate key should be a list
rootValue, ok := value[point]
Expand Down

0 comments on commit 009179a

Please sign in to comment.