Skip to content

Commit

Permalink
Implement parallel handling of batch query nautilus#218
Browse files Browse the repository at this point in the history
  • Loading branch information
obukhov committed Oct 17, 2024
1 parent 0ac544a commit 94289be
Showing 1 changed file with 50 additions and 30 deletions.
80 changes: 50 additions & 30 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"strconv"
"strings"
"sync"

"github.com/nautilus/graphql"
)
Expand All @@ -27,6 +28,8 @@ type HTTPOperation struct {
} `json:"extensions"`
}

type setResultFunc func(r map[string]interface{})

func formatErrors(err error) map[string]interface{} {
return formatErrorsWithCode(nil, err, "UNKNOWN_ERROR")
}
Expand Down Expand Up @@ -70,12 +73,14 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
/// Handle the operations regardless of the request method

// we have to respond to each operation in the right order
results := []map[string]interface{}{}
results := make([]map[string]interface{}, len(operations))
opWg := new(sync.WaitGroup)
opMutex := new(sync.Mutex)

// the status code to report
statusCode := http.StatusOK

for _, operation := range operations {
for opNum, operation := range operations {
// there might be a query plan cache key embedded in the operation
cacheKey := ""
if operation.Extensions.QueryPlanCache != nil {
Expand All @@ -85,10 +90,8 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
// if there is no query or cache key
if operation.Query == "" && cacheKey == "" {
statusCode = http.StatusUnprocessableEntity
results = append(
results,
formatErrorsWithCode(nil, errors.New("could not find query body"), "BAD_USER_INPUT"),
)
results[opNum] = formatErrorsWithCode(nil, errors.New("could not find query body"), "BAD_USER_INPUT")

continue
}

Expand Down Expand Up @@ -116,32 +119,12 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
return
}

// fire the query with the request context passed through to execution
result, err := g.Execute(requestContext, plan)
if err != nil {
results = append(results, formatErrorsWithCode(result, err, "INTERNAL_SERVER_ERROR"))

continue
}

// the result for this operation
payload := map[string]interface{}{"data": result}

// if there was a cache key associated with this query
if requestContext.CacheKey != "" {
// embed the cache key in the response
payload["extensions"] = map[string]interface{}{
"persistedQuery": map[string]interface{}{
"sha265Hash": requestContext.CacheKey,
"version": "1",
},
}
}

// add this result to the list
results = append(results, payload)
opWg.Add(1)
go g.executeRequest(requestContext, plan, opWg, g.setResultFunc(opNum, results, opMutex))
}

opWg.Wait()

// the final result depends on whether we are executing in batch mode or not
var finalResponse interface{}
if batchMode {
Expand All @@ -165,6 +148,43 @@ func (g *Gateway) GraphQLHandler(w http.ResponseWriter, r *http.Request) {
emitResponse(w, statusCode, string(response))
}

func (g *Gateway) setResultFunc(opNum int, results []map[string]interface{}, opMutex *sync.Mutex) setResultFunc {
return func(r map[string]interface{}) {
opMutex.Lock()
defer opMutex.Unlock()
results[opNum] = r
}
}

func (g *Gateway) executeRequest(requestContext *RequestContext, plan QueryPlanList, opWg *sync.WaitGroup, setResult setResultFunc) {
defer opWg.Done()

// fire the query with the request context passed through to execution
result, err := g.Execute(requestContext, plan)
if err != nil {
setResult(formatErrorsWithCode(result, err, "INTERNAL_SERVER_ERROR"))

return
}

// the result for this operation
payload := map[string]interface{}{"data": result}

// if there was a cache key associated with this query
if requestContext.CacheKey != "" {
// embed the cache key in the response
payload["extensions"] = map[string]interface{}{
"persistedQuery": map[string]interface{}{
"sha265Hash": requestContext.CacheKey,
"version": "1",
},
}
}

// add this result to the list
setResult(payload)
}

// Parses request to operations (single or batch mode).
// Returns an error and an error status code if the request is invalid.
func parseRequest(r *http.Request) (operations []*HTTPOperation, batchMode bool, errStatusCode int, payloadErr error) {
Expand Down

0 comments on commit 94289be

Please sign in to comment.