-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 2241: support compression #2843
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
f943830
Allow enabling compression with env var when live loading.
d752441
Merge changes from master.
8fd6fff
Enable gzipped response over HTTP.
1394b9a
Add test for query response compression.
1b8433f
Support gzipped request over HTTP.
fafa5dc
Add test for compressed HTTP request.
72ab9a1
Merge in master.
5795cf6
Add --use_compression option with live loader.
03ca90c
Always use no compression with zero server.
dfbe54c
Merge branch 'master' into javier/issue2241_support_compression
6e28a65
Minor wording change.
1831c2d
Minor wording change.
316c1cc
Add simple test for gRPC compression support.
6ce90e0
Fix calls to connection setup.
7800f43
Better test function name.
bcffff2
Refactor HTTP handlers to share common functionality instead of dupli…
7d5c3a3
Merge branch 'master' into javier/issue2241_support_compression
b18b689
Remove debug code & minor cleanup.
0b96034
Fixes from PR review.
5c5b906
Merge branch 'master' into javier/issue2241_support_compression
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,11 @@ package alpha | |
|
||
import ( | ||
"bytes" | ||
"compress/gzip" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
"net/http" | ||
"sort" | ||
|
@@ -34,7 +36,9 @@ import ( | |
"github.com/dgraph-io/dgraph/query" | ||
"github.com/dgraph-io/dgraph/worker" | ||
"github.com/dgraph-io/dgraph/x" | ||
|
||
"github.com/golang/glog" | ||
|
||
"google.golang.org/grpc/metadata" | ||
) | ||
|
||
|
@@ -60,19 +64,71 @@ func extractStartTs(urlPath string) (uint64, error) { | |
} | ||
} | ||
|
||
// This method should just build the request and proxy it to the Query method of dgraph.Server. | ||
// It can then encode the response as appropriate before sending it back to the user. | ||
func queryHandler(w http.ResponseWriter, r *http.Request) { | ||
// Common functionality for these request handlers. Returns true if the request is completely | ||
// handled here and nothing further needs to be done. | ||
func commonHandler(w http.ResponseWriter, r *http.Request) bool { | ||
// Do these requests really need CORS headers? Doesn't seem like it, but they are probably | ||
// harmless aside from the extra size they add to each response. | ||
x.AddCorsHeaders(w) | ||
w.Header().Set("Content-Type", "application/json") | ||
|
||
if r.Method == "OPTIONS" { | ||
return | ||
} | ||
|
||
if !allowed(r.Method) { | ||
return true | ||
} else if !allowed(r.Method) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method") | ||
return true | ||
} | ||
|
||
return false | ||
} | ||
|
||
// Read request body, transparently decompressing if necessary. Return nil on error. | ||
func readRequest(w http.ResponseWriter, r *http.Request) []byte { | ||
var in io.Reader = r.Body | ||
|
||
if enc := r.Header.Get("Content-Encoding"); enc != "" && enc != "identity" { | ||
if enc == "gzip" { | ||
gz, err := gzip.NewReader(r.Body) | ||
if err != nil { | ||
x.SetStatus(w, x.Error, "Unable to create decompressor") | ||
return nil | ||
} | ||
defer gz.Close() | ||
in = gz | ||
} else { | ||
x.SetStatus(w, x.ErrorInvalidRequest, "Unsupported content encoding") | ||
return nil | ||
} | ||
} | ||
|
||
body, err := ioutil.ReadAll(in) | ||
if err != nil { | ||
x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) | ||
return nil | ||
} | ||
|
||
return body | ||
} | ||
|
||
// Write response body, transparently compressing if necessary. | ||
func writeResponse(w http.ResponseWriter, r *http.Request, b []byte) (int, error) { | ||
var out io.Writer = w | ||
|
||
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { | ||
w.Header().Set("Content-Encoding", "gzip") | ||
gzw := gzip.NewWriter(w) | ||
defer gzw.Close() | ||
out = gzw | ||
} | ||
|
||
return out.Write(b) | ||
} | ||
|
||
// This method should just build the request and proxy it to the Query method of dgraph.Server. | ||
// It can then encode the response as appropriate before sending it back to the user. | ||
func queryHandler(w http.ResponseWriter, r *http.Request) { | ||
if commonHandler(w, r) { | ||
return | ||
} | ||
|
||
|
@@ -93,13 +149,11 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { | |
} | ||
} | ||
|
||
defer r.Body.Close() | ||
q, err := ioutil.ReadAll(r.Body) | ||
if err != nil { | ||
x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) | ||
body := readRequest(w, r) | ||
if body == nil { | ||
return | ||
} | ||
req.Query = string(q) | ||
req.Query = string(body) | ||
|
||
d := r.URL.Query().Get("debug") | ||
ctx := context.WithValue(context.Background(), "debug", d) | ||
|
@@ -159,32 +213,24 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { | |
writeEntry("data", resp.Json) | ||
} | ||
out.WriteRune('}') | ||
w.Write(out.Bytes()) | ||
|
||
writeResponse(w, r, out.Bytes()) | ||
} | ||
|
||
func mutationHandler(w http.ResponseWriter, r *http.Request) { | ||
x.AddCorsHeaders(w) | ||
w.Header().Set("Content-Type", "application/json") | ||
|
||
if r.Method == "OPTIONS" { | ||
if commonHandler(w, r) { | ||
return | ||
} | ||
|
||
if !allowed(r.Method) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method") | ||
return | ||
} | ||
defer r.Body.Close() | ||
m, err := ioutil.ReadAll(r.Body) | ||
if err != nil { | ||
x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) | ||
m := readRequest(w, r) | ||
if m == nil { | ||
return | ||
} | ||
|
||
parseStart := time.Now() | ||
|
||
var mu *api.Mutation | ||
var err error | ||
if mType := r.Header.Get("X-Dgraph-MutationType"); mType == "json" { | ||
// Parse JSON. | ||
ms := make(map[string]*skipJSONUnmarshal) | ||
|
@@ -261,20 +307,12 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { | |
x.SetStatusWithData(w, x.Error, err.Error()) | ||
return | ||
} | ||
w.Write(js) | ||
|
||
writeResponse(w, r, js) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error return value of |
||
} | ||
|
||
func commitHandler(w http.ResponseWriter, r *http.Request) { | ||
x.AddCorsHeaders(w) | ||
w.Header().Set("Content-Type", "application/json") | ||
|
||
if r.Method == "OPTIONS" { | ||
return | ||
} | ||
|
||
if !allowed(r.Method) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method") | ||
if commonHandler(w, r) { | ||
return | ||
} | ||
|
||
|
@@ -296,10 +334,8 @@ func commitHandler(w http.ResponseWriter, r *http.Request) { | |
tc.StartTs = ts | ||
|
||
// Keys are sent as an array in the body. | ||
defer r.Body.Close() | ||
keys, err := ioutil.ReadAll(r.Body) | ||
if err != nil { | ||
x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) | ||
keys := readRequest(w, r) | ||
if keys == nil { | ||
return | ||
} | ||
|
||
|
@@ -335,20 +371,12 @@ func commitHandler(w http.ResponseWriter, r *http.Request) { | |
x.SetStatusWithData(w, x.Error, err.Error()) | ||
return | ||
} | ||
w.Write(js) | ||
|
||
writeResponse(w, r, js) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error return value of |
||
} | ||
|
||
func abortHandler(w http.ResponseWriter, r *http.Request) { | ||
x.AddCorsHeaders(w) | ||
w.Header().Set("Content-Type", "application/json") | ||
|
||
if r.Method == "OPTIONS" { | ||
return | ||
} | ||
|
||
if !allowed(r.Method) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method") | ||
if commonHandler(w, r) { | ||
return | ||
} | ||
|
||
|
@@ -385,33 +413,23 @@ func abortHandler(w http.ResponseWriter, r *http.Request) { | |
x.SetStatusWithData(w, x.Error, err.Error()) | ||
return | ||
} | ||
w.Write(js) | ||
|
||
writeResponse(w, r, js) | ||
} | ||
|
||
func alterHandler(w http.ResponseWriter, r *http.Request) { | ||
x.AddCorsHeaders(w) | ||
w.Header().Set("Content-Type", "application/json") | ||
|
||
if r.Method == "OPTIONS" { | ||
return | ||
} | ||
|
||
if !allowed(r.Method) { | ||
w.WriteHeader(http.StatusBadRequest) | ||
x.SetStatus(w, x.ErrorInvalidMethod, "Invalid method") | ||
if commonHandler(w, r) { | ||
return | ||
} | ||
|
||
op := &api.Operation{} | ||
|
||
defer r.Body.Close() | ||
b, err := ioutil.ReadAll(r.Body) | ||
if err != nil { | ||
x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) | ||
b := readRequest(w, r) | ||
if b == nil { | ||
return | ||
} | ||
|
||
err = json.Unmarshal(b, &op) | ||
err := json.Unmarshal(b, &op) | ||
if err != nil { | ||
op.Schema = string(b) | ||
} | ||
|
@@ -442,7 +460,8 @@ func alterHandler(w http.ResponseWriter, r *http.Request) { | |
x.SetStatus(w, x.Error, err.Error()) | ||
return | ||
} | ||
w.Write(js) | ||
|
||
writeResponse(w, r, js) | ||
} | ||
|
||
// skipJSONUnmarshal stores the raw bytes as is while JSON unmarshaling. | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error return value of
writeResponse
is not checked (fromerrcheck
)