From b9e1c905b377465c507c34da6cc3095f0da83baf Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Thu, 22 Aug 2019 04:13:24 +0530 Subject: [PATCH] Merge Query and Mutate endpoint for grpc --- contrib/integration/bigdata/main.go | 2 +- contrib/integration/testtxn/main_test.go | 9 +- dgraph/cmd/alpha/http.go | 27 ++-- dgraph/cmd/alpha/upsert_test.go | 178 +++++++++++++++++++++++ edgraph/access.go | 2 +- edgraph/access_ee.go | 36 +++-- edgraph/server.go | 94 +++++++----- gql/parser_mutation.go | 25 ++-- gql/parser_test.go | 3 +- wiki/content/clients/index.md | 6 +- wiki/content/design-concepts/index.md | 4 +- 11 files changed, 294 insertions(+), 92 deletions(-) diff --git a/contrib/integration/bigdata/main.go b/contrib/integration/bigdata/main.go index a0f1b5c0bfb..3587dedd550 100644 --- a/contrib/integration/bigdata/main.go +++ b/contrib/integration/bigdata/main.go @@ -54,7 +54,7 @@ func main() { // schema items. resp, err := c.NewTxn().Query(ctx, "schema {}") x.Check(err) - if len(resp.Schema) < 5 { + if len(resp.Json) < 5 { // Run each schema alter separately so that there is an even // distribution among all groups. for _, s := range schema() { diff --git a/contrib/integration/testtxn/main_test.go b/contrib/integration/testtxn/main_test.go index bfb95c726a2..eb6636b8b65 100644 --- a/contrib/integration/testtxn/main_test.go +++ b/contrib/integration/testtxn/main_test.go @@ -232,12 +232,15 @@ func TestTxnRead5(t *testing.T) { mu = &api.Mutation{} mu.SetJson = []byte(fmt.Sprintf("{\"uid\": \"%s\", \"name\": \"Manish2\"}", uid)) - mu.CommitNow = true - res, err := dc.Mutate(context.Background(), mu) + muReq := api.Request{ + Mutations: []*api.Mutation{mu}, + CommitNow: true, + } + res, err := dc.Query(context.Background(), &muReq) if err != nil { log.Fatalf("Error while running mutation: %v\n", err) } - x.AssertTrue(res.Context.StartTs > 0) + x.AssertTrue(res.Txn.StartTs > 0) resp, err = dc.Query(context.Background(), &req) if err != nil { log.Fatalf("Error while running query: %v\n", err) diff --git a/dgraph/cmd/alpha/http.go b/dgraph/cmd/alpha/http.go index f006d631c47..c3d698b0820 100644 --- a/dgraph/cmd/alpha/http.go +++ b/dgraph/cmd/alpha/http.go @@ -302,7 +302,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { // start parsing the query parseStart := time.Now() - var mu *api.Mutation + var req *api.Request contentType := r.Header.Get("Content-Type") switch strings.ToLower(contentType) { case "application/json": @@ -312,7 +312,8 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { return } - mu = &api.Mutation{} + mu := &api.Mutation{} + req = &api.Request{Mutations: []*api.Mutation{mu}} if setJSON, ok := ms["set"]; ok && setJSON != nil { mu.SetJson = setJSON.bs } @@ -320,7 +321,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { mu.DeleteJson = delJSON.bs } if queryText, ok := ms["query"]; ok && queryText != nil { - mu.Query, err = strconv.Unquote(string(queryText.bs)) + req.Query, err = strconv.Unquote(string(queryText.bs)) if err != nil { x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) return @@ -336,7 +337,7 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { case "application/rdf": // Parse N-Quads. - mu, err = gql.ParseMutation(string(body)) + req, err = gql.ParseMutation(string(body)) if err != nil { x.SetStatus(w, x.ErrorInvalidRequest, err.Error()) return @@ -351,11 +352,11 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { // end of query parsing parseEnd := time.Now() - mu.StartTs = startTs - mu.CommitNow = commitNow + req.StartTs = startTs + req.CommitNow = commitNow ctx := attachAccessJwt(context.Background(), r) - resp, err := (&edgraph.Server{}).Mutate(ctx, mu) + resp, err := (&edgraph.Server{}).Query(ctx, req) if err != nil { x.SetStatusWithData(w, x.ErrorInvalidRequest, err.Error()) return @@ -363,14 +364,14 @@ func mutationHandler(w http.ResponseWriter, r *http.Request) { resp.Latency.ParsingNs = uint64(parseEnd.Sub(parseStart).Nanoseconds()) e := query.Extensions{ - Txn: resp.Context, + Txn: resp.Txn, Latency: resp.Latency, } sort.Strings(e.Txn.Keys) sort.Strings(e.Txn.Preds) // Don't send keys array which is part of txn context if its commit immediately. - if mu.CommitNow { + if req.CommitNow { e.Txn.Keys = e.Txn.Keys[:0] } @@ -487,11 +488,11 @@ func handleCommit(startTs uint64, reqText []byte) (map[string]interface{}, error return nil, err } - resp := &api.Assigned{} - resp.Context = tc - resp.Context.CommitTs = cts + resp := &api.Response{} + resp.Txn = tc + resp.Txn.CommitTs = cts e := query.Extensions{ - Txn: resp.Context, + Txn: resp.Txn, } e.Txn.Keys = e.Txn.Keys[:0] response := map[string]interface{}{} diff --git a/dgraph/cmd/alpha/upsert_test.go b/dgraph/cmd/alpha/upsert_test.go index 34794b80831..7030d21d7c0 100644 --- a/dgraph/cmd/alpha/upsert_test.go +++ b/dgraph/cmd/alpha/upsert_test.go @@ -1244,3 +1244,181 @@ upsert { _, _, _, err := mutationWithTs(m1, "application/rdf", false, true, 0) require.Contains(t, err.Error(), "Matching brackets not found") } + +func TestUpsertDeleteOnlyYourPost(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(` +name: string @index(exact) . +content: string @index(exact) .`)) + + m1 := ` +{ + set { + _:user1 "user1" . + _:user2 "user2" . + _:user3 "user3" . + _:user4 "user4" . + + _:post1 "post1" . + _:post1 _:user1 . + + _:post2 "post2" . + _:post2 _:user1 . + + _:post3 "post3" . + _:post3 _:user2 . + + _:post4 "post4" . + _:post4 _:user3 . + + _:post5 "post5" . + _:post5 _:user3 . + + _:post6 "post6" . + _:post6 _:user3 . + } +}` + + _, _, _, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + + // user2 trying to delete the post4 + m2 := ` +upsert { + query { + var(func: eq(content, "post4")) { + p4 as uid + author { + n3 as name + } + } + + u2 as var(func: eq(val(n3), "user2")) + } + + mutation @if(eq(len(u2), 1)) { + delete { + uid(p4) * . + uid(p4) * . + } + } +}` + _, _, _, err = mutationWithTs(m2, "application/rdf", false, true, 0) + require.NoError(t, err) + + // post4 must still exist + q2 := ` +{ + post(func: eq(content, "post4")) { + content + } +}` + res, _, err := queryWithTs(q2, "application/graphql+-", "", 0) + require.NoError(t, err) + require.Contains(t, res, "post4") + + // user3 deleting the post4 + m3 := ` +upsert { + query { + var(func: eq(content, "post4")) { + p4 as uid + author { + n3 as name + } + } + + u4 as var(func: eq(val(n3), "user3")) + } + + mutation @if(eq(len(u4), 1)) { + delete { + uid(p4) * . + uid(p4) * . + } + } +}` + _, _, _, err = mutationWithTs(m3, "application/rdf", false, true, 0) + require.NoError(t, err) + + // post4 shouldn't exist anymore + res, _, err = queryWithTs(q2, "application/graphql+-", "", 0) + require.NoError(t, err) + require.NotContains(t, res, "post4") +} + +func TestUpsertBulkUpdateBranch(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(` +name: string @index(exact) . +branch: string .`)) + + m1 := ` +{ + set { + _:user1 "user1" . + _:user1 "Fuller Street, San Francisco" . + + _:user2 "user2" . + _:user2 "Fuller Street, San Francisco" . + + _:user3 "user3" . + _:user3 "Fuller Street, San Francisco" . + } +}` + + _, _, _, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + + // Bulk Update: update everyone's branch + m2 := ` +upsert { + query { + u as var(func: has(branch)) + } + + mutation { + set { + uid(u) "Fuller Street, SF" . + } + } +}` + _, _, _, err = mutationWithTs(m2, "application/rdf", false, true, 0) + require.NoError(t, err) + + q2 := ` +{ + q(func: has(branch)) { + name + branch + } +}` + res, _, err := queryWithTs(q2, "application/graphql+-", "", 0) + require.NoError(t, err) + require.NotContains(t, res, "San Francisco") + require.Contains(t, res, "user1") + require.Contains(t, res, "user2") + require.Contains(t, res, "user3") + require.Contains(t, res, "Fuller Street, SF") + + // Bulk Delete: delete everyone's branch + m3 := ` + upsert { + query { + u as var(func: has(branch)) + } + + mutation { + delete { + uid(u) * . + } + } + }` + _, _, _, err = mutationWithTs(m3, "application/rdf", false, true, 0) + require.NoError(t, err) + + res, _, err = queryWithTs(q2, "application/graphql+-", "", 0) + require.NoError(t, err) + require.NotContains(t, res, "San Francisco") + require.NotContains(t, res, "Fuller Street, SF") +} diff --git a/edgraph/access.go b/edgraph/access.go index 11b7a710530..7f01f9c2eeb 100644 --- a/edgraph/access.go +++ b/edgraph/access.go @@ -57,7 +57,7 @@ func authorizeMutation(ctx context.Context, gmu *gql.Mutation) error { return nil } -func authorizeQuery(ctx context.Context, req *api.Request) error { +func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error { // always allow access return nil } diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index e1f2fdad83f..1c51a362ae3 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -265,8 +265,9 @@ const queryUser = ` // authorizeUser queries the user with the given user id, and returns the associated uid, // acl groups, and whether the password stored in DB matches the supplied password -func authorizeUser(ctx context.Context, userid string, password string) (*acl.User, - error) { +func authorizeUser(ctx context.Context, userid string, password string) ( + *acl.User, error) { + queryVars := map[string]string{ "$userid": userid, "$password": password, @@ -276,7 +277,7 @@ func authorizeUser(ctx context.Context, userid string, password string) (*acl.Us Vars: queryVars, } - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize) if err != nil { glog.Errorf("Error while query user with id %s: %v", userid, err) return nil, err @@ -310,7 +311,7 @@ func RefreshAcls(closer *y.Closer) { ctx := context.Background() var err error - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize) if err != nil { return errors.Errorf("unable to retrieve acls: %v", err) } @@ -362,7 +363,7 @@ func ResetAcl() { Vars: queryVars, } - queryResp, err := (&Server{}).doQuery(ctx, &queryRequest) + queryResp, err := (&Server{}).doQuery(ctx, &queryRequest, NoAuthorize) if err != nil { return errors.Wrapf(err, "while querying user with id %s", x.GrootId) } @@ -379,13 +380,18 @@ func ResetAcl() { // Insert Groot. createUserNQuads := acl.CreateUserNQuads(x.GrootId, "password") - mu := &api.Mutation{ + req := &api.Request{ StartTs: startTs, CommitNow: true, - Set: createUserNQuads, + Mutations: []*api.Mutation{ + { + Set: createUserNQuads, + }, + }, } - if _, err := (&Server{}).doMutate(context.Background(), mu, false); err != nil { + _, err = (&Server{}).doMutate(context.Background(), req, NoAuthorize) + if err != nil { return err } glog.Infof("Successfully upserted the groot account") @@ -658,23 +664,15 @@ func logAccess(log *accessEntry) { } //authorizeQuery authorizes the query using the aclCachePtr -func authorizeQuery(ctx context.Context, req *api.Request) error { +func authorizeQuery(ctx context.Context, parsedReq *gql.Result) error { if len(Config.HmacSecret) == 0 { // the user has not turned on the acl feature return nil } - parsedReq, err := gql.Parse(gql.Request{ - Str: req.Query, - Variables: req.Vars, - }) - if err != nil { - return err - } - preds := parsePredsFromQuery(parsedReq.Query) - var userId string var groupIds []string + preds := parsePredsFromQuery(parsedReq.Query) doAuthorizeQuery := func() error { userData, err := extractUserAndGroups(ctx) if err == nil { @@ -710,7 +708,7 @@ func authorizeQuery(ctx context.Context, req *api.Request) error { return nil } - err = doAuthorizeQuery() + err := doAuthorizeQuery() if span := otrace.FromContext(ctx); span != nil { span.Annotatef(nil, (&accessEntry{ userId: userId, diff --git a/edgraph/server.go b/edgraph/server.go index 34983098505..b49eeb11a54 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -75,6 +75,14 @@ type ServerState struct { needTs chan tsReq } +const ( + // NeedAuthorize is used to indicate that the request needs to be authorized. + NeedAuthorize = iota + // NoAuthorize is used to indicate that authorization needs to be skipped. + // Used when ACL needs to query information for performing the authorization check. + NoAuthorize +) + // State is the instance of ServerState used by the current server. var State ServerState @@ -419,24 +427,24 @@ func annotateStartTs(span *otrace.Span, ts uint64) { span.Annotate([]otrace.Attribute{otrace.Int64Attribute("startTs", int64(ts))}, "") } -// Mutate handles requests to perform mutations. -func (s *Server) Mutate(ctx context.Context, mu *api.Mutation) (*api.Assigned, error) { - return s.doMutate(ctx, mu, true) -} - -func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) ( - resp *api.Assigned, rerr error) { +func (s *Server) doMutate(ctx context.Context, req *api.Request, authorize int) ( + resp *api.Response, rerr error) { if ctx.Err() != nil { return nil, ctx.Err() } + if len(req.Mutations) != 1 { + return nil, errors.Errorf("Only 1 mutation per request is supported") + } + mu := req.Mutations[0] + if !isMutationAllowed(ctx) { return resp, errors.Errorf("No mutations allowed.") } var parsingTime time.Duration - resp = &api.Assigned{} + resp = &api.Response{} start := time.Now() defer func() { @@ -466,7 +474,7 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } ostats.Record(ctx, x.NumMutations.M(1)) - if mu.Query != "" { + if req.Query != "" { span.Annotatef(nil, "Got Mutation with Upsert Block: %s", mu) } if len(mu.SetJson) > 0 { @@ -482,7 +490,7 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } parsingTime += time.Since(startParsingTime) - if authorize { + if authorize == NeedAuthorize { if err := authorizeMutation(ctx, gmu); err != nil { return resp, err } @@ -493,12 +501,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) return resp, errors.Errorf("Empty mutation") } - if mu.StartTs == 0 { - mu.StartTs = State.getTimestamp(false) + if req.StartTs == 0 { + req.StartTs = State.getTimestamp(false) } - annotateStartTs(span, mu.StartTs) + annotateStartTs(span, req.StartTs) - l, err := doQueryInUpsert(ctx, mu, gmu) + l, err := doQueryInUpsert(ctx, req, gmu) if err != nil { return resp, err } @@ -514,11 +522,11 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) return resp, err } - m := &pb.Mutations{Edges: edges, StartTs: mu.StartTs} + m := &pb.Mutations{Edges: edges, StartTs: req.StartTs} span.Annotatef(nil, "Applying mutations: %+v", m) - resp.Context, err = query.ApplyMutations(ctx, m) - span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Context, err) - if !mu.CommitNow { + resp.Txn, err = query.ApplyMutations(ctx, m) + span.Annotatef(nil, "Txn Context: %+v. Err=%v", resp.Txn, err) + if !req.CommitNow { if err == y.ErrConflict { err = status.Error(codes.FailedPrecondition, err.Error()) } @@ -531,12 +539,12 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) // ApplyMutations failed. We now want to abort the transaction, // ignoring any error that might occur during the abort (the user would // care more about the previous error). - if resp.Context == nil { - resp.Context = &api.TxnContext{StartTs: mu.StartTs} + if resp.Txn == nil { + resp.Txn = &api.TxnContext{StartTs: req.StartTs} } - resp.Context.Aborted = true - _, _ = worker.CommitOverNetwork(ctx, resp.Context) + resp.Txn.Aborted = true + _, _ = worker.CommitOverNetwork(ctx, resp.Txn) if err == y.ErrConflict { // We have already aborted the transaction, so the error message should reflect that. @@ -547,36 +555,37 @@ func (s *Server) doMutate(ctx context.Context, mu *api.Mutation, authorize bool) } span.Annotatef(nil, "Prewrites err: %v. Attempting to commit/abort immediately.", err) - ctxn := resp.Context + ctxn := resp.Txn // zero would assign the CommitTs cts, err := worker.CommitOverNetwork(ctx, ctxn) span.Annotatef(nil, "Status of commit at ts: %d: %v", ctxn.StartTs, err) if err != nil { if err == y.ErrAborted { err = status.Errorf(codes.Aborted, err.Error()) - resp.Context.Aborted = true + resp.Txn.Aborted = true } return resp, err } // CommitNow was true, no need to send keys. - resp.Context.Keys = resp.Context.Keys[:0] - resp.Context.CommitTs = cts + resp.Txn.Keys = resp.Txn.Keys[:0] + resp.Txn.CommitTs = cts return resp, nil } // doQueryInUpsert processes the query in upsert block. -func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( +func doQueryInUpsert(ctx context.Context, req *api.Request, gmu *gql.Mutation) ( *query.Latency, error) { l := &query.Latency{} - if mu.Query == "" { + if req.Query == "" { return l, nil } - upsertQuery := mu.Query + mu := req.Mutations[0] + upsertQuery := req.Query needVars := findVars(gmu) isCondUpsert := strings.TrimSpace(mu.Cond) != "" varName := fmt.Sprintf("__dgraph%d__", rand.Int()) @@ -600,7 +609,7 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( // The variable __dgraph0__ will - // * be empty if the condition is true // * have 1 UID (the 0 UID) if the condition is false - upsertQuery = strings.TrimSuffix(strings.TrimSpace(mu.Query), "}") + upsertQuery = strings.TrimSuffix(strings.TrimSpace(req.Query), "}") upsertQuery += varName + ` as var(func: uid(0)) ` + cond + `}` needVars = append(needVars, varName) } @@ -618,7 +627,11 @@ func doQueryInUpsert(ctx context.Context, mu *api.Mutation, gmu *gql.Mutation) ( return nil, errors.Wrapf(err, "while validating query: %q", upsertQuery) } - qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: mu.StartTs} + if err := authorizeQuery(ctx, &parsedReq); err != nil { + return nil, err + } + + qr := query.Request{Latency: l, GqlQuery: &parsedReq, ReadTs: req.StartTs} if err := qr.ProcessQuery(ctx); err != nil { return nil, errors.Wrapf(err, "while processing query: %q", upsertQuery) } @@ -751,19 +764,18 @@ func updateMutations(gmu *gql.Mutation, varToUID map[string][]string) { // Query handles queries and returns the data. func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) { - if err := authorizeQuery(ctx, req); err != nil { - return nil, err - } - if glog.V(3) { - glog.Infof("Got a query: %+v", req) + if len(req.Mutations) > 0 { + return s.doMutate(ctx, req, NeedAuthorize) } - return s.doQuery(ctx, req) + return s.doQuery(ctx, req, NeedAuthorize) } // This method is used to execute the query and return the response to the // client as a protocol buffer message. -func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Response, rerr error) { +func (s *Server) doQuery(ctx context.Context, req *api.Request, authorize int) ( + resp *api.Response, rerr error) { + if ctx.Err() != nil { return nil, ctx.Err() } @@ -815,6 +827,12 @@ func (s *Server) doQuery(ctx context.Context, req *api.Request) (resp *api.Respo return resp, err } + if authorize == NeedAuthorize { + if err := authorizeQuery(ctx, &parsedReq); err != nil { + return nil, err + } + } + var queryRequest = query.Request{ Latency: &l, GqlQuery: &parsedReq, diff --git a/gql/parser_mutation.go b/gql/parser_mutation.go index 76b6fca6f72..1eb1e724656 100644 --- a/gql/parser_mutation.go +++ b/gql/parser_mutation.go @@ -23,7 +23,7 @@ import ( // ParseMutation parses a block into a mutation. Returns an object with a mutation or // an upsert block with mutation, otherwise returns nil with an error. -func ParseMutation(mutation string) (mu *api.Mutation, err error) { +func ParseMutation(mutation string) (req *api.Request, err error) { var lexer lex.Lexer lexer.Reset(mutation) lexer.Run(lexIdentifyBlock) @@ -39,13 +39,15 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) { item := it.Item() switch item.Typ { case itemUpsertBlock: - if mu, err = parseUpsertBlock(it); err != nil { + if req, err = parseUpsertBlock(it); err != nil { return nil, err } case itemLeftCurl: - if mu, err = parseMutationBlock(it); err != nil { + mu, err := parseMutationBlock(it) + if err != nil { return nil, err } + req = &api.Request{Mutations: []*api.Mutation{mu}} default: return nil, it.Errorf("Unexpected token: [%s]", item.Val) } @@ -55,12 +57,12 @@ func ParseMutation(mutation string) (mu *api.Mutation, err error) { return nil, it.Errorf("Unexpected %s after the end of the block", it.Item().Val) } - return mu, nil + return req, nil } // parseUpsertBlock parses the upsert block -func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) { - var mu *api.Mutation +func parseUpsertBlock(it *lex.ItemIterator) (*api.Request, error) { + var req *api.Request var queryText, condText string var queryFound, condFound bool @@ -80,13 +82,13 @@ func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) { switch { // upsert {... ===>}<=== case item.Typ == itemRightCurl: - if mu == nil { + if req == nil { return nil, it.Errorf("Empty mutation block") } else if !queryFound { return nil, it.Errorf("Query op not found in upsert block") } else { - mu.Query = queryText - return mu, nil + req.Query = queryText + return req, nil } // upsert { mutation{...} ===>query<==={...}} @@ -124,11 +126,12 @@ func parseUpsertBlock(it *lex.ItemIterator) (*api.Mutation, error) { } // upsert @if(...) ===>{<=== ....} - var err error - if mu, err = parseMutationBlock(it); err != nil { + mu, err := parseMutationBlock(it) + if err != nil { return nil, err } mu.Cond = condText + req = &api.Request{Mutations: []*api.Mutation{mu}} // upsert { mutation{...} ===>fragment<==={...}} case item.Typ == itemUpsertBlockOp && item.Val == "fragment": diff --git a/gql/parser_test.go b/gql/parser_test.go index c57d1668802..617a11b2b6d 100644 --- a/gql/parser_test.go +++ b/gql/parser_test.go @@ -4466,8 +4466,9 @@ func TestParseMutation(t *testing.T) { } } ` - mu, err := ParseMutation(m) + req, err := ParseMutation(m) require.NoError(t, err) + mu := req.Mutations[0] require.NotNil(t, mu) sets, err := parseNquads(mu.SetNquads) require.NoError(t, err) diff --git a/wiki/content/clients/index.md b/wiki/content/clients/index.md index 3714f05c361..2c5ce8097d0 100644 --- a/wiki/content/clients/index.md +++ b/wiki/content/clients/index.md @@ -35,14 +35,14 @@ fashion, resulting in an initially semi random predicate distribution. Dgraph clients perform mutations and queries using transactions. A transaction bounds a sequence of queries and mutations that are committed by Dgraph as a single unit: that is, on commit, either all the changes are accepted -by Dgraph or none are. +by Dgraph or none are. -A transaction always sees the database state at the moment it began, plus any +A transaction always sees the database state at the moment it began, plus any changes it makes --- changes from concurrent transactions aren't visible. On commit, Dgraph will abort a transaction, rather than committing changes, when a conflicting, concurrently running transaction has already been committed. Two -transactions conflict when both transactions: +transactions conflict when both transactions: - write values to the same scalar predicate of the same node (e.g both attempting to set a particular node's `address` predicate); or diff --git a/wiki/content/design-concepts/index.md b/wiki/content/design-concepts/index.md index 7c58816f36a..ed4713c01ae 100644 --- a/wiki/content/design-concepts/index.md +++ b/wiki/content/design-concepts/index.md @@ -56,8 +56,8 @@ might see X, then Z (and not see Y at all). Dgraph supports server-side sequencing of updates, which provides linearizability. Unlike sequential consistency which provides sequencing per client, this provide sequencing across all clients. This is necessary to make -upserts work across clients. Thus, once a transaction is committed, it would be -visible to all future readers, irrespective of client boundaries. +transactions work across clients. Thus, once a transaction is committed, +it would be visible to all future readers, irrespective of client boundaries. - **Causal consistency:** Dgraph does not have a concept of dependencies among transactions. So, does NOT order based on dependencies. - **Serializable consistency:** Dgraph does NOT allow arbitrary reordering of transactions, but does provide a linear order per key.