diff --git a/dgraph/cmd/bulk/chunk.go b/chunker/chunk.go similarity index 59% rename from dgraph/cmd/bulk/chunk.go rename to chunker/chunk.go index 359a1b7b933..ae25ef6472e 100644 --- a/dgraph/cmd/bulk/chunk.go +++ b/chunker/chunk.go @@ -14,54 +14,61 @@ * limitations under the License. */ -package bulk +package chunker import ( "bufio" "bytes" + "compress/gzip" + encjson "encoding/json" "fmt" "io" + "net/http" + "os" + "path/filepath" "strings" "unicode" - "github.com/dgraph-io/dgraph/edgraph" - "github.com/dgraph-io/dgraph/gql" - "github.com/dgraph-io/dgraph/rdf" - "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/dgo/protos/api" + "github.com/dgraph-io/dgo/x" + "github.com/dgraph-io/dgraph/chunker/json" + "github.com/dgraph-io/dgraph/chunker/rdf" + "github.com/pkg/errors" ) -type chunker interface { - begin(r *bufio.Reader) error - chunk(r *bufio.Reader) (*bytes.Buffer, error) - end(r *bufio.Reader) error - parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) +type Chunker interface { + Begin(r *bufio.Reader) error + Chunk(r *bufio.Reader) (*bytes.Buffer, error) + End(r *bufio.Reader) error + Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) } type rdfChunker struct{} type jsonChunker struct{} const ( - rdfInput int = iota - jsonInput + RdfInput int = iota + JsonInput ) -func newChunker(inputFormat int) chunker { +func NewChunker(inputFormat int) Chunker { switch inputFormat { - case rdfInput: + case RdfInput: return &rdfChunker{} - case jsonInput: + case JsonInput: return &jsonChunker{} default: - panic("unknown loader type") + panic("unknown chunker type") } } -func (rdfChunker) begin(r *bufio.Reader) error { +// RDF files don't require any special processing at the beginning of the file. +func (rdfChunker) Begin(r *bufio.Reader) error { return nil } -func (rdfChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) { +func (rdfChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) { batch := new(bytes.Buffer) batch.Grow(1 << 20) for lineCount := 0; lineCount < 1e5; lineCount++ { @@ -93,63 +100,36 @@ func (rdfChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) { return batch, nil } -func (rdfChunker) end(r *bufio.Reader) error { - return nil -} - -func (rdfChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) { - str, readErr := chunkBuf.ReadString('\n') - if readErr != nil && readErr != io.EOF { - x.Check(readErr) - } - - nq, parseErr := rdf.Parse(strings.TrimSpace(str)) - if parseErr == rdf.ErrEmpty { - return nil, readErr - } else if parseErr != nil { - return nil, errors.Wrapf(parseErr, "while parsing line %q", str) +func (rdfChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) { + if chunkBuf.Len() == 0 { + return nil, io.EOF } - return []gql.NQuad{{NQuad: &nq}}, readErr -} - -func slurpSpace(r *bufio.Reader) error { - for { - ch, _, err := r.ReadRune() - if err != nil { - return err + nqs := make([]*api.NQuad, 0) + for chunkBuf.Len() > 0 { + str, err := chunkBuf.ReadString('\n') + if err != nil && err != io.EOF { + x.Check(err) } - if !unicode.IsSpace(ch) { - x.Check(r.UnreadRune()) - return nil + + nq, err := rdf.Parse(strings.TrimSpace(str)) + if err == rdf.ErrEmpty { + continue // blank line or comment + } else if err != nil { + return nil, errors.Wrapf(err, "while parsing line %q", str) } + nqs = append(nqs, &nq) } -} -func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error { - for { - ch, _, err := r.ReadRune() - if err != nil { - return err - } - x.Check2(out.WriteRune(ch)) + return nqs, nil +} - if ch == '\\' { - // Pick one more rune. - esc, _, err := r.ReadRune() - if err != nil { - return err - } - x.Check2(out.WriteRune(esc)) - continue - } - if ch == '"' { - return nil - } - } +// RDF files don't require any special processing at the end of the file. +func (rdfChunker) End(r *bufio.Reader) error { + return nil } -func (jsonChunker) begin(r *bufio.Reader) error { +func (jsonChunker) Begin(r *bufio.Reader) error { // The JSON file to load must be an array of maps (that is, '[ { ... }, { ... }, ... ]'). // This function must be called before calling readJSONChunk for the first time to advance // the Reader past the array start token ('[') so that calls to readJSONChunk can read @@ -167,7 +147,7 @@ func (jsonChunker) begin(r *bufio.Reader) error { return nil } -func (jsonChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) { +func (jsonChunker) Chunk(r *bufio.Reader) (*bytes.Buffer, error) { out := new(bytes.Buffer) out.Grow(1 << 20) @@ -231,27 +211,103 @@ func (jsonChunker) chunk(r *bufio.Reader) (*bytes.Buffer, error) { return out, nil } -func (jsonChunker) end(r *bufio.Reader) error { +func (jsonChunker) Parse(chunkBuf *bytes.Buffer) ([]*api.NQuad, error) { + if chunkBuf.Len() == 0 { + return nil, io.EOF + } + + nqs, err := json.Parse(chunkBuf.Bytes(), json.SetNquads) + if err != nil && err != io.EOF { + x.Check(err) + } + chunkBuf.Reset() + + return nqs, err +} + +func (jsonChunker) End(r *bufio.Reader) error { if slurpSpace(r) == io.EOF { return nil } - return errors.New("Not all of json file consumed") + return errors.New("Not all of JSON file consumed") } -func (jsonChunker) parse(chunkBuf *bytes.Buffer) ([]gql.NQuad, error) { - if chunkBuf.Len() == 0 { - return nil, io.EOF +func slurpSpace(r *bufio.Reader) error { + for { + ch, _, err := r.ReadRune() + if err != nil { + return err + } + if !unicode.IsSpace(ch) { + x.Check(r.UnreadRune()) + return nil + } } +} - nqs, err := edgraph.NquadsFromJson(chunkBuf.Bytes()) - if err != nil && err != io.EOF { +func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error { + for { + ch, _, err := r.ReadRune() + if err != nil { + return err + } + x.Check2(out.WriteRune(ch)) + + if ch == '\\' { + // Pick one more rune. + esc, _, err := r.ReadRune() + if err != nil { + return err + } + x.Check2(out.WriteRune(esc)) + continue + } + if ch == '"' { + return nil + } + } +} + +// FileReader returns an open reader and file on the given file. Gzip-compressed input is detected +// and decompressed automatically even without the gz extension. The caller is responsible for +// calling the returned cleanup function when done with the reader. +func FileReader(file string) (rd *bufio.Reader, cleanup func()) { + f, err := os.Open(file) + x.Check(err) + + cleanup = func() { f.Close() } + + if filepath.Ext(file) == ".gz" { + gzr, err := gzip.NewReader(f) x.Check(err) + rd = bufio.NewReader(gzr) + cleanup = func() { f.Close(); gzr.Close() } + } else { + rd = bufio.NewReader(f) + buf, _ := rd.Peek(512) + + typ := http.DetectContentType(buf) + if typ == "application/x-gzip" { + gzr, err := gzip.NewReader(rd) + x.Check(err) + rd = bufio.NewReader(gzr) + cleanup = func() { f.Close(); gzr.Close() } + } } - chunkBuf.Reset() - gqlNq := make([]gql.NQuad, len(nqs)) - for i, nq := range nqs { - gqlNq[i] = gql.NQuad{NQuad: nq} + return rd, cleanup +} + +// IsJSONData returns true if the reader, which should be at the start of the stream, is reading +// a JSON stream, false otherwise. +func IsJSONData(r *bufio.Reader) (bool, error) { + buf, err := r.Peek(512) + if err != nil && err != io.EOF { + return false, err } - return gqlNq, err + + de := encjson.NewDecoder(bytes.NewReader(buf)) + _, err = de.Token() + + return err == nil, nil } diff --git a/dgraph/cmd/bulk/chunk_test.go b/chunker/chunk_test.go similarity index 90% rename from dgraph/cmd/bulk/chunk_test.go rename to chunker/chunk_test.go index eeee102e18f..230cb0ff9cf 100644 --- a/dgraph/cmd/bulk/chunk_test.go +++ b/chunker/chunk_test.go @@ -13,7 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package bulk + +package chunker import ( "bufio" @@ -45,8 +46,8 @@ func TestJSONLoadStart(t *testing.T) { } for _, test := range tests { - chunker := newChunker(jsonInput) - require.Error(t, chunker.begin(bufioReader(test.json)), test.desc) + chunker := NewChunker(JsonInput) + require.Error(t, chunker.Begin(bufioReader(test.json)), test.desc) } } @@ -63,11 +64,11 @@ func TestJSONLoadReadNext(t *testing.T) { {"[{}", "malformed array"}, } for _, test := range tests { - chunker := newChunker(jsonInput) + chunker := NewChunker(JsonInput) reader := bufioReader(test.json) - require.NoError(t, chunker.begin(reader), test.desc) + require.NoError(t, chunker.Begin(reader), test.desc) - json, err := chunker.chunk(reader) + json, err := chunker.Chunk(reader) //fmt.Fprintf(os.Stderr, "err = %v, json = %v\n", err, json) require.Nil(t, json, test.desc) require.Error(t, err, test.desc) @@ -112,11 +113,11 @@ func TestJSONLoadSuccessFirst(t *testing.T) { }, } for _, test := range tests { - chunker := newChunker(jsonInput) + chunker := NewChunker(JsonInput) reader := bufioReader(test.json) - require.NoError(t, chunker.begin(reader), test.desc) + require.NoError(t, chunker.Begin(reader), test.desc) - json, err := chunker.chunk(reader) + json, err := chunker.Chunk(reader) if err == io.EOF { // pass } else { @@ -175,23 +176,23 @@ func TestJSONLoadSuccessAll(t *testing.T) { }`, } - chunker := newChunker(jsonInput) + chunker := NewChunker(JsonInput) reader := bufioReader(testDoc) var json *bytes.Buffer var idx int - err := chunker.begin(reader) + err := chunker.Begin(reader) require.NoError(t, err, "begin reading JSON document") for idx = 0; err == nil; idx++ { desc := fmt.Sprintf("reading chunk #%d", idx+1) - json, err = chunker.chunk(reader) + json, err = chunker.Chunk(reader) //fmt.Fprintf(os.Stderr, "err = %v, json = %v\n", err, json) if err != io.EOF { require.NoError(t, err, desc) require.Equal(t, testChunks[idx], json.String(), desc) } } - err = chunker.end(reader) + err = chunker.End(reader) require.NoError(t, err, "end reading JSON document") } diff --git a/edgraph/nquads_from_json.go b/chunker/json/parse.go similarity index 96% rename from edgraph/nquads_from_json.go rename to chunker/json/parse.go index 7648154db18..98b2815abe1 100644 --- a/edgraph/nquads_from_json.go +++ b/chunker/json/parse.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package edgraph +package json import ( "bytes" @@ -132,7 +132,7 @@ func handleBasicType(k string, v interface{}, op int, nq *api.NQuad) error { nq.Predicate, nq.Lang = x.PredicateLang(k) // Default value is considered as S P * deletion. - if v == "" && op == delete { + if v == "" && op == DeleteNquads { nq.ObjectValue = &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}} return nil } @@ -143,14 +143,14 @@ func handleBasicType(k string, v interface{}, op int, nq *api.NQuad) error { nq.ObjectValue = &api.Value{Val: &api.Value_StrVal{StrVal: v}} case float64: - if v == 0 && op == delete { + if v == 0 && op == DeleteNquads { nq.ObjectValue = &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}} return nil } nq.ObjectValue = &api.Value{Val: &api.Value_DoubleVal{DoubleVal: v}} case bool: - if v == false && op == delete { + if v == false && op == DeleteNquads { nq.ObjectValue = &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}} return nil } @@ -165,7 +165,7 @@ func handleBasicType(k string, v interface{}, op int, nq *api.NQuad) error { func checkForDeletion(mr *mapResponse, m map[string]interface{}, op int) { // Since uid is the only key, this must be S * * deletion. - if op == delete && len(mr.uid) > 0 && len(m) == 1 { + if op == DeleteNquads && len(mr.uid) > 0 && len(m) == 1 { mr.nquads = append(mr.nquads, &api.NQuad{ Subject: mr.uid, Predicate: x.Star, @@ -240,7 +240,7 @@ func mapToNquads(m map[string]interface{}, idx *int, op int, parentPred string) } if len(mr.uid) == 0 { - if op == delete { + if op == DeleteNquads { // Delete operations with a non-nil value must have a uid specified. return mr, x.Errorf("UID must be present and non-zero while deleting edges.") } @@ -258,7 +258,7 @@ func mapToNquads(m map[string]interface{}, idx *int, op int, parentPred string) continue } - if op == delete { + if op == DeleteNquads { // This corresponds to edge deletion. if v == nil { mr.nquads = append(mr.nquads, &api.NQuad{ @@ -285,7 +285,7 @@ func mapToNquads(m map[string]interface{}, idx *int, op int, parentPred string) } if v == nil { - if op == delete { + if op == DeleteNquads { nq.ObjectValue = &api.Value{Val: &api.Value_DefaultVal{DefaultVal: x.Star}} mr.nquads = append(mr.nquads, &nq) } @@ -372,11 +372,11 @@ func mapToNquads(m map[string]interface{}, idx *int, op int, parentPred string) } const ( - set = iota - delete + SetNquads = iota + DeleteNquads ) -func nquadsFromJson(b []byte, op int) ([]*api.NQuad, error) { +func Parse(b []byte, op int) ([]*api.NQuad, error) { buffer := bytes.NewBuffer(b) dec := json.NewDecoder(buffer) dec.UseNumber() @@ -417,7 +417,3 @@ func nquadsFromJson(b []byte, op int) ([]*api.NQuad, error) { checkForDeletion(&mr, ms, op) return mr.nquads, err } - -func NquadsFromJson(b []byte) ([]*api.NQuad, error) { - return nquadsFromJson(b, set) -} diff --git a/chunker/json/parse_test.go b/chunker/json/parse_test.go new file mode 100644 index 00000000000..630e78cd6a0 --- /dev/null +++ b/chunker/json/parse_test.go @@ -0,0 +1,402 @@ +/* + * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package json + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "math" + "testing" + "time" + + "github.com/dgraph-io/dgo/protos/api" + "github.com/dgraph-io/dgraph/tok" + "github.com/dgraph-io/dgraph/types" + "github.com/golang/glog" + "github.com/stretchr/testify/require" + "github.com/twpayne/go-geom" + "github.com/twpayne/go-geom/encoding/geojson" +) + +func makeNquad(sub, pred string, val *api.Value) *api.NQuad { + return &api.NQuad{ + Subject: sub, + Predicate: pred, + ObjectValue: val, + } +} + +func makeNquadEdge(sub, pred, obj string) *api.NQuad { + return &api.NQuad{ + Subject: sub, + Predicate: pred, + ObjectId: obj, + } +} + +type School struct { + Name string `json:",omitempty"` +} + +type address struct { + Type string `json:"type,omitempty"` + Coords []float64 `json:"coordinates,omitempty"` +} + +type Person struct { + Uid string `json:"uid,omitempty"` + Name string `json:"name,omitempty"` + Age int `json:"age,omitempty"` + Married *bool `json:"married,omitempty"` + Now *time.Time `json:"now,omitempty"` + Address address `json:"address,omitempty"` // geo value + Friends []Person `json:"friend,omitempty"` + School *School `json:"school,omitempty"` +} + +func TestNquadsFromJson1(t *testing.T) { + tn := time.Now().UTC() + geoVal := `{"Type":"Point", "Coordinates":[1.1,2.0]}` + m := true + p := Person{ + Name: "Alice", + Age: 26, + Married: &m, + Now: &tn, + Address: address{ + Type: "Point", + Coords: []float64{1.1, 2.0}, + }, + } + + b, err := json.Marshal(p) + require.NoError(t, err) + + nq, err := Parse(b, SetNquads) + require.NoError(t, err) + + require.Equal(t, 5, len(nq)) + + oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Alice"}} + require.Contains(t, nq, makeNquad("_:blank-0", "name", oval)) + + oval = &api.Value{Val: &api.Value_IntVal{IntVal: 26}} + require.Contains(t, nq, makeNquad("_:blank-0", "age", oval)) + + oval = &api.Value{Val: &api.Value_BoolVal{BoolVal: true}} + require.Contains(t, nq, makeNquad("_:blank-0", "married", oval)) + + oval = &api.Value{Val: &api.Value_StrVal{StrVal: tn.Format(time.RFC3339Nano)}} + require.Contains(t, nq, makeNquad("_:blank-0", "now", oval)) + + var g geom.T + err = geojson.Unmarshal([]byte(geoVal), &g) + require.NoError(t, err) + geo, err := types.ObjectValue(types.GeoID, g) + require.NoError(t, err) + + require.Contains(t, nq, makeNquad("_:blank-0", "address", geo)) +} + +func TestNquadsFromJson2(t *testing.T) { + m := false + + p := Person{ + Name: "Alice", + Friends: []Person{{ + Name: "Charlie", + Married: &m, + }, { + Uid: "1000", + Name: "Bob", + }}, + } + + b, err := json.Marshal(p) + require.NoError(t, err) + + nq, err := Parse(b, SetNquads) + require.NoError(t, err) + + require.Equal(t, 6, len(nq)) + require.Contains(t, nq, makeNquadEdge("_:blank-0", "friend", "_:blank-1")) + require.Contains(t, nq, makeNquadEdge("_:blank-0", "friend", "1000")) + + oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Charlie"}} + require.Contains(t, nq, makeNquad("_:blank-1", "name", oval)) + + oval = &api.Value{Val: &api.Value_BoolVal{BoolVal: false}} + require.Contains(t, nq, makeNquad("_:blank-1", "married", oval)) + + oval = &api.Value{Val: &api.Value_StrVal{StrVal: "Bob"}} + require.Contains(t, nq, makeNquad("1000", "name", oval)) +} + +func TestNquadsFromJson3(t *testing.T) { + p := Person{ + Name: "Alice", + School: &School{ + Name: "Wellington Public School", + }, + } + + b, err := json.Marshal(p) + require.NoError(t, err) + + nq, err := Parse(b, SetNquads) + require.NoError(t, err) + + require.Equal(t, 3, len(nq)) + require.Contains(t, nq, makeNquadEdge("_:blank-0", "school", "_:blank-1")) + + oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Wellington Public School"}} + require.Contains(t, nq, makeNquad("_:blank-1", "Name", oval)) +} + +func TestNquadsFromJson4(t *testing.T) { + json := `[{"name":"Alice","mobile":"040123456","car":"MA0123", "age": 21, "weight": 58.7}]` + + nq, err := Parse([]byte(json), SetNquads) + require.NoError(t, err) + require.Equal(t, 5, len(nq)) + oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Alice"}} + require.Contains(t, nq, makeNquad("_:blank-0", "name", oval)) + require.Contains(t, nq, makeNquad("_:blank-0", "age", &api.Value{Val: &api.Value_IntVal{IntVal: 21}})) + require.Contains(t, nq, makeNquad("_:blank-0", "weight", + &api.Value{Val: &api.Value_DoubleVal{DoubleVal: 58.7}})) +} + +func TestJsonNumberParsing(t *testing.T) { + tests := []struct { + in string + out *api.Value + }{ + {`{"uid": "1", "key": 9223372036854775299}`, &api.Value{Val: &api.Value_IntVal{IntVal: 9223372036854775299}}}, + {`{"uid": "1", "key": 9223372036854775299.0}`, &api.Value{Val: &api.Value_DoubleVal{DoubleVal: 9223372036854775299.0}}}, + {`{"uid": "1", "key": 27670116110564327426}`, nil}, + {`{"uid": "1", "key": "23452786"}`, &api.Value{Val: &api.Value_StrVal{StrVal: "23452786"}}}, + {`{"uid": "1", "key": "23452786.2378"}`, &api.Value{Val: &api.Value_StrVal{StrVal: "23452786.2378"}}}, + {`{"uid": "1", "key": -1e10}`, &api.Value{Val: &api.Value_DoubleVal{DoubleVal: -1e+10}}}, + {`{"uid": "1", "key": 0E-0}`, &api.Value{Val: &api.Value_DoubleVal{DoubleVal: 0}}}, + } + + for _, test := range tests { + nqs, err := Parse([]byte(test.in), SetNquads) + if test.out != nil { + require.NoError(t, err, "%T", err) + require.Equal(t, makeNquad("1", "key", test.out), nqs[0]) + } else { + require.Error(t, err) + } + } +} + +func TestNquadsFromJson_UidOutofRangeError(t *testing.T) { + json := `{"uid":"0xa14222b693e4ba34123","name":"Name","following":[{"name":"Bob"}],"school":[{"uid":"","name@en":"Crown Public School"}]}` + + _, err := Parse([]byte(json), SetNquads) + require.Error(t, err) +} + +func TestNquadsFromJson_NegativeUidError(t *testing.T) { + json := `{"uid":"-100","name":"Name","following":[{"name":"Bob"}],"school":[{"uid":"","name@en":"Crown Public School"}]}` + + _, err := Parse([]byte(json), SetNquads) + require.Error(t, err) +} + +func TestNquadsFromJson_EmptyUid(t *testing.T) { + json := `{"uid":"","name":"Name","following":[{"name":"Bob"}],"school":[{"uid":"","name":"Crown Public School"}]}` + + nq, err := Parse([]byte(json), SetNquads) + require.NoError(t, err) + + require.Equal(t, 5, len(nq)) + oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Name"}} + require.Contains(t, nq, makeNquad("_:blank-0", "name", oval)) +} + +func TestNquadsFromJson_BlankNodes(t *testing.T) { + json := `{"uid":"_:alice","name":"Alice","following":[{"name":"Bob"}],"school":[{"uid":"_:school","name":"Crown Public School"}]}` + + nq, err := Parse([]byte(json), SetNquads) + require.NoError(t, err) + + require.Equal(t, 5, len(nq)) + require.Contains(t, nq, makeNquadEdge("_:alice", "school", "_:school")) +} + +func TestNquadsDeleteEdges(t *testing.T) { + json := `[{"uid": "0x1","name":null,"mobile":null,"car":null}]` + nq, err := Parse([]byte(json), DeleteNquads) + require.NoError(t, err) + require.Equal(t, 3, len(nq)) +} + +func checkCount(t *testing.T, nq []*api.NQuad, pred string, count int) { + for _, n := range nq { + if n.Predicate == pred { + require.Equal(t, count, len(n.Facets)) + break + } + } +} + +func getMapOfFacets(facets []*api.Facet) map[string]*api.Facet { + res := make(map[string]*api.Facet) + for _, f := range facets { + res[f.Key] = f + } + return res +} + +func checkFacets(t *testing.T, nq []*api.NQuad, pred string, facets []*api.Facet) { + for _, n := range nq { + if n.Predicate == pred { + require.Equal(t, len(facets), len(n.Facets), + fmt.Sprintf("expected %d facets, got %d", len(facets), len(n.Facets))) + + expectedFacets := getMapOfFacets(facets) + actualFacets := getMapOfFacets(n.Facets) + for key, f := range expectedFacets { + actualF, ok := actualFacets[key] + if !ok { + t.Fatalf("facet for key %s not found", key) + } + require.Equal(t, f, actualF, fmt.Sprintf("expected:%v\ngot:%v", f, actualF)) + } + } + } +} + +func TestNquadsFromJsonFacets1(t *testing.T) { + // test the 5 data types on facets, string, bool, int, float and datetime + operation := "READ WRITE" + operationTokens, err := tok.GetTermTokens([]string{operation}) + require.NoError(t, err, "unable to get tokens from the string %s", operation) + + timeStr := "2006-01-02T15:04:05Z" + time, err := types.ParseTime(timeStr) + if err != nil { + t.Fatalf("unable to convert string %s to time", timeStr) + } + timeBinary, err := time.MarshalBinary() + if err != nil { + t.Fatalf("unable to marshal time %v to binary", time) + } + + carPrice := 30000.56 + var priceBytes [8]byte + u := math.Float64bits(float64(carPrice)) + binary.LittleEndian.PutUint64(priceBytes[:], u) + + carAge := 3 + var ageBytes [8]byte + binary.LittleEndian.PutUint64(ageBytes[:], uint64(carAge)) + + json := fmt.Sprintf(`[{"name":"Alice","mobile":"040123456","car":"MA0123",`+ + `"mobile|operation": "%s", + "car|first":true, + "car|age": %d, + "car|price": %f, + "car|since": "%s" +}]`, operation, carAge, carPrice, timeStr) + + nq, err := Parse([]byte(json), SetNquads) + require.NoError(t, err) + require.Equal(t, 3, len(nq)) + + for _, n := range nq { + glog.Infof("%v", n) + + } + + checkFacets(t, nq, "mobile", []*api.Facet{ + { + Key: "operation", + Value: []byte(operation), + ValType: api.Facet_STRING, + Tokens: operationTokens, + }, + }) + + checkFacets(t, nq, "car", []*api.Facet{ + { + Key: "first", + Value: []byte{1}, + ValType: api.Facet_BOOL, + }, + { + Key: "age", + Value: ageBytes[:], + ValType: api.Facet_INT, + }, + { + Key: "price", + Value: priceBytes[:], + ValType: api.Facet_FLOAT, + }, + { + Key: "since", + Value: timeBinary, + ValType: api.Facet_DATETIME, + }, + }) +} + +func TestNquadsFromJsonFacets2(t *testing.T) { + // Dave has uid facets which should go on the edge between Alice and Dave + json := `[{"name":"Alice","friend":[{"name":"Dave","friend|close":"true"}]}]` + + nq, err := Parse([]byte(json), SetNquads) + require.NoError(t, err) + require.Equal(t, 3, len(nq)) + checkCount(t, nq, "friend", 1) +} + +func TestNquadsFromJsonError1(t *testing.T) { + p := Person{ + Name: "Alice", + School: &School{ + Name: "Wellington Public School", + }, + } + + b, err := json.Marshal(p) + require.NoError(t, err) + + _, err = Parse(b, DeleteNquads) + require.Error(t, err) + require.Contains(t, err.Error(), "UID must be present and non-zero while deleting edges.") +} + +func TestNquadsFromJsonList(t *testing.T) { + json := `{"address":["Riley Street","Redfern"],"phone_number":[123,9876],"points":[{"type":"Point", "coordinates":[1.1,2.0]},{"type":"Point", "coordinates":[2.0,1.1]}]}` + + nq, err := Parse([]byte(json), SetNquads) + require.NoError(t, err) + require.Equal(t, 6, len(nq)) +} + +func TestNquadsFromJsonDelete(t *testing.T) { + json := `{"uid":1000,"friend":[{"uid":1001}]}` + + nq, err := Parse([]byte(json), DeleteNquads) + require.NoError(t, err) + require.Equal(t, nq[0], makeNquadEdge("1000", "friend", "1001")) +} diff --git a/rdf/README.txt b/chunker/rdf/README.txt similarity index 100% rename from rdf/README.txt rename to chunker/rdf/README.txt diff --git a/rdf/parse.go b/chunker/rdf/parse.go similarity index 100% rename from rdf/parse.go rename to chunker/rdf/parse.go diff --git a/rdf/parse_test.go b/chunker/rdf/parse_test.go similarity index 100% rename from rdf/parse_test.go rename to chunker/rdf/parse_test.go diff --git a/rdf/state.go b/chunker/rdf/state.go similarity index 100% rename from rdf/state.go rename to chunker/rdf/state.go diff --git a/contrib/scripts/functions.sh b/contrib/scripts/functions.sh index 53b1b6d1b7c..c05ab6d94e2 100755 --- a/contrib/scripts/functions.sh +++ b/contrib/scripts/functions.sh @@ -16,7 +16,7 @@ function restartCluster { echo "Rebuilding dgraph ..." make install docker ps -a --filter label="cluster=test" --format "{{.Names}}" | xargs -r docker rm -f - docker-compose -p dgraph -f $compose_file up --force-recreate --remove-orphans --detach + docker-compose -p dgraph -f $compose_file up --force-recreate --remove-orphans --detach || exit 1 popd >/dev/null $basedir/contrib/wait-for-it.sh -t 60 localhost:6080 || exit 1 diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index c20c6c370d1..b52e002dac4 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -17,7 +17,6 @@ package bulk import ( - "bufio" "bytes" "compress/gzip" "context" @@ -27,16 +26,17 @@ import ( "os" "path/filepath" "runtime" - "strings" "sync" "time" "github.com/dgraph-io/badger" bo "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/dgraph/chunker" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/xidmap" + "google.golang.org/grpc" ) @@ -144,25 +144,6 @@ func readSchema(filename string) []*pb.SchemaUpdate { return initialSchema } -func findDataFiles(dir string, ext string) []string { - var files []string - x.Check(filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if strings.HasSuffix(path, ext) || strings.HasSuffix(path, ext+".gz") { - files = append(files, path) - } - return nil - })) - return files -} - -type uidRangeResponse struct { - uids *pb.AssignedIds - err error -} - func (ld *loader) mapStage() { ld.prog.setPhase(mapPhase) @@ -181,21 +162,21 @@ func (ld *loader) mapStage() { LRUSize: 1 << 19, }) - var files []string - var ext string + var dir, ext string var loaderType int if ld.opt.RDFDir != "" { - loaderType = rdfInput + loaderType = chunker.RdfInput + dir = ld.opt.RDFDir ext = ".rdf" - files = findDataFiles(ld.opt.RDFDir, ext) } else { - loaderType = jsonInput + loaderType = chunker.JsonInput + dir = ld.opt.JSONDir ext = ".json" - files = findDataFiles(ld.opt.JSONDir, ext) - } + } + files := x.FindDataFiles(dir, []string{ext, ext + ".gz"}) if len(files) == 0 { - fmt.Printf("No *%s files found.\n", ext) + fmt.Printf("No *%s files found under %s.\n", ext, dir) os.Exit(1) } @@ -213,25 +194,17 @@ func (ld *loader) mapStage() { for i, file := range files { thr.Start() fmt.Printf("Processing file (%d out of %d): %s\n", i+1, len(files), file) - chunker := newChunker(loaderType) + go func(file string) { defer thr.Done() - f, err := os.Open(file) - x.Check(err) - defer f.Close() + r, cleanup := chunker.FileReader(file) + defer cleanup() - var r *bufio.Reader - if !strings.HasSuffix(file, ".gz") { - r = bufio.NewReaderSize(f, 1<<20) - } else { - gzr, err := gzip.NewReader(f) - x.Checkf(err, "Could not create gzip reader for file %q.", file) - r = bufio.NewReaderSize(gzr, 1<<20) - } - x.Check(chunker.begin(r)) + chunker := chunker.NewChunker(loaderType) + x.Check(chunker.Begin(r)) for { - chunkBuf, err := chunker.chunk(r) + chunkBuf, err := chunker.Chunk(r) if chunkBuf != nil && chunkBuf.Len() > 0 { ld.readerChunkCh <- chunkBuf } @@ -241,7 +214,7 @@ func (ld *loader) mapStage() { x.Check(err) } } - x.Check(chunker.end(r)) + x.Check(chunker.End(r)) }(file) } thr.Wait() diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index e5ef793b82f..ba4b4e53a38 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -31,6 +31,7 @@ import ( "sync/atomic" "github.com/dgraph-io/dgo/protos/api" + "github.com/dgraph-io/dgraph/chunker" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" @@ -117,11 +118,11 @@ func (m *mapper) writeMapEntriesToFile(entriesBuf []byte, shardIdx int) { } func (m *mapper) run(inputFormat int) { - chunker := newChunker(inputFormat) + chunker := chunker.NewChunker(inputFormat) for chunkBuf := range m.readerChunkCh { done := false for !done { - nqs, err := chunker.parse(chunkBuf) + nqs, err := chunker.Parse(chunkBuf) if err == io.EOF { done = true } else if err != nil { @@ -139,7 +140,7 @@ func (m *mapper) run(inputFormat int) { } } - m.processNQuad(nq) + m.processNQuad(gql.NQuad{NQuad: nq}) atomic.AddInt64(&m.prog.nquadCount, 1) } diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 5d6c59d07e0..046ecf100b8 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -48,10 +48,10 @@ func init() { flag := Bulk.Cmd.Flags() flag.StringP("rdfs", "r", "", - "Directory containing *.rdf or *.rdf.gz files to load.") + "Location of RDF data to load.") // would be nice to use -j to match -r, but already used by --num_go_routines flag.String("jsons", "", - "Directory containing *.json or *.json.gz files to load.") + "Location of JSON data to load.") flag.StringP("schema_file", "s", "", "Location of schema file to load.") flag.String("out", "out", @@ -121,7 +121,7 @@ func run() { os.Exit(0) } if opt.SchemaFile == "" { - fmt.Fprint(os.Stderr, "schema file must be specified.\n") + fmt.Fprint(os.Stderr, "Schema file must be specified.\n") os.Exit(1) } if opt.RDFDir == "" && opt.JSONDir == "" { diff --git a/dgraph/cmd/live/batch.go b/dgraph/cmd/live/batch.go index 1ecb28999ae..5ef92ba9209 100644 --- a/dgraph/cmd/live/batch.go +++ b/dgraph/cmd/live/batch.go @@ -69,13 +69,13 @@ type loader struct { retryRequestsWg sync.WaitGroup // Miscellaneous information to print counters. - // Num of RDF's sent - rdfs uint64 + // Num of N-Quads sent + nquads uint64 // Num of txns sent txns uint64 // Num of aborts aborts uint64 - // To get time elapsel. + // To get time elapsed start time.Time reqs chan api.Mutation @@ -109,8 +109,8 @@ func (p *uidProvider) ReserveUidRange() (uint64, uint64, error) { // Counter keeps a track of various parameters about a batch mutation. Running totals are printed // if BatchMutationOptions PrintCounters is set to true. type Counter struct { - // Number of RDF's processed by server. - Rdfs uint64 + // Number of N-Quads processed by server. + Nquads uint64 // Number of mutations processed by the server. TxnsDone uint64 // Number of Aborts @@ -148,7 +148,7 @@ func (l *loader) infinitelyRetry(req api.Mutation) { req.CommitNow = true _, err := txn.Mutate(l.opts.Ctx, &req) if err == nil { - atomic.AddUint64(&l.rdfs, uint64(len(req.Set))) + atomic.AddUint64(&l.nquads, uint64(len(req.Set))) atomic.AddUint64(&l.txns, 1) return } @@ -167,7 +167,7 @@ func (l *loader) request(req api.Mutation) { _, err := txn.Mutate(l.opts.Ctx, &req) if err == nil { - atomic.AddUint64(&l.rdfs, uint64(len(req.Set))) + atomic.AddUint64(&l.nquads, uint64(len(req.Set))) atomic.AddUint64(&l.txns, 1) return } @@ -193,17 +193,17 @@ func (l *loader) printCounters() { for range l.ticker.C { counter := l.Counter() - rate := float64(counter.Rdfs) / counter.Elapsed.Seconds() + rate := float64(counter.Nquads) / counter.Elapsed.Seconds() elapsed := time.Since(start).Round(time.Second) - fmt.Printf("[%6s] Txns: %d RDFs: %d RDFs/sec: %5.0f Aborts: %d\n", - elapsed, counter.TxnsDone, counter.Rdfs, rate, counter.Aborts) + fmt.Printf("[%6s] Txns: %d N-Quads: %d N-Quads/sec: %5.0f Aborts: %d\n", + elapsed, counter.TxnsDone, counter.Nquads, rate, counter.Aborts) } } // Counter returns the current state of the BatchMutation. func (l *loader) Counter() Counter { return Counter{ - Rdfs: atomic.LoadUint64(&l.rdfs), + Nquads: atomic.LoadUint64(&l.nquads), TxnsDone: atomic.LoadUint64(&l.txns), Elapsed: time.Since(l.start), Aborts: atomic.LoadUint64(&l.aborts), diff --git a/dgraph/cmd/live/load_json_test.go b/dgraph/cmd/live/load_json_test.go new file mode 100644 index 00000000000..5d716dbba5a --- /dev/null +++ b/dgraph/cmd/live/load_json_test.go @@ -0,0 +1,153 @@ +/* + * Copyright 2017-2018 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package live + +import ( + "context" + "io/ioutil" + "os" + "path" + "runtime" + "strings" + "testing" + + "github.com/dgraph-io/dgo" + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgraph/x" + "github.com/dgraph-io/dgraph/z" +) + +const alphaService = ":9180" + +var ( + testDataDir string + dg *dgo.Dgraph + tmpDir string +) + +// Just check the first and last entries and assumes everything in between is okay. +func checkLoadedData(t *testing.T) { + resp, err := dg.NewTxn().Query(context.Background(), ` + { + q(func: anyofterms(name, "Homer")) { + name + age + role + } + } + `) + require.NoError(t, err) + z.CompareJSON(t, ` + { + "q": [ + { + "name": "Homer", + "age": 38, + "role": "father" + } + ] + } + `, string(resp.GetJson())) + + resp, err = dg.NewTxn().Query(context.Background(), ` + { + q(func: anyofterms(name, "Maggie")) { + name + role + carries + } + } + `) + require.NoError(t, err) + z.CompareJSON(t, ` + { + "q": [ + { + "name": "Maggie", + "role": "daughter", + "carries": "pacifier" + } + ] + } + `, string(resp.GetJson())) +} + +func TestLiveLoadJSONFile(t *testing.T) { + z.DropAll(t, dg) + + pipeline := [][]string{ + {os.ExpandEnv("$GOPATH/bin/dgraph"), "live", + "--schema", testDataDir + "/family.schema", "--files", testDataDir + "/family.json", + "--dgraph", alphaService}, + } + err := z.Pipeline(pipeline) + require.NoError(t, err, "live loading JSON file ran successfully") + + checkLoadedData(t) +} + +func TestLiveLoadJSONCompressedStream(t *testing.T) { + z.DropAll(t, dg) + + pipeline := [][]string{ + {"gzip", "-c", testDataDir + "/family.json"}, + {os.ExpandEnv("$GOPATH/bin/dgraph"), "live", + "--schema", testDataDir + "/family.schema", "--files", "/dev/stdin", + "--dgraph", alphaService}, + } + err := z.Pipeline(pipeline) + require.NoError(t, err, "live loading JSON stream ran successfully") + + checkLoadedData(t) +} + +func TestLiveLoadJSONMultipleFiles(t *testing.T) { + z.DropAll(t, dg) + + files := []string{ + testDataDir + "/family1.json", + testDataDir + "/family2.json", + testDataDir + "/family3.json", + } + fileList := strings.Join(files, ",") + + pipeline := [][]string{ + {os.ExpandEnv("$GOPATH/bin/dgraph"), "live", + "--schema", testDataDir + "/family.schema", "--files", fileList, + "--dgraph", alphaService}, + } + err := z.Pipeline(pipeline) + require.NoError(t, err, "live loading multiple JSON files ran successfully") + + checkLoadedData(t) +} +func TestMain(m *testing.M) { + _, thisFile, _, _ := runtime.Caller(0) + testDataDir = path.Dir(thisFile) + "/test_data" + + dg = z.DgraphClient(alphaService) + + // Try to create any files in a dedicated temp directory that gets cleaned up + // instead of all over /tmp or the working directory. + tmpDir, err := ioutil.TempDir("", "test.tmp-") + x.Check(err) + os.Chdir(tmpDir) + defer os.RemoveAll(tmpDir) + + os.Exit(m.Run()) +} diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 70b9de64ad2..5882e8f298e 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -29,7 +29,6 @@ import ( "net/http" _ "net/http/pprof" // http profiler "os" - "path/filepath" "strconv" "strings" "time" @@ -40,29 +39,32 @@ import ( bopt "github.com/dgraph-io/badger/options" "github.com/dgraph-io/dgo" "github.com/dgraph-io/dgo/protos/api" - "github.com/dgraph-io/dgraph/rdf" + + "github.com/dgraph-io/dgraph/chunker" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/xidmap" + "github.com/spf13/cobra" ) type options struct { - files string + dataFiles string schemaFile string dgraph string zero string concurrent int - numRdf int + batchSize int clientDir string ignoreIndexConflict bool authToken string useCompression bool } -var opt options -var tlsConf x.TLSHelperConfig - -var Live x.SubCommand +var ( + opt options + tlsConf x.TLSHelperConfig + Live x.SubCommand +) func init() { Live.Cmd = &cobra.Command{ @@ -78,14 +80,14 @@ func init() { Live.EnvPrefix = "DGRAPH_LIVE" flag := Live.Cmd.Flags() - flag.StringP("rdfs", "r", "", "Location of rdf files to load") + flag.StringP("files", "f", "", "Location of *.rdf(.gz) or *.json(.gz) file(s) to load") flag.StringP("schema", "s", "", "Location of schema file") flag.StringP("dgraph", "d", "127.0.0.1:9080", "Dgraph alpha gRPC server address") flag.StringP("zero", "z", "127.0.0.1:5080", "Dgraph zero gRPC server address") flag.IntP("conc", "c", 10, "Number of concurrent requests to make to Dgraph") flag.IntP("batch", "b", 1000, - "Number of RDF N-Quads to send as part of a mutation.") + "Number of N-Quads to send as part of a mutation.") flag.StringP("xidmap", "x", "", "Directory to store xid to uid mapping") flag.BoolP("ignore_index_conflict", "i", true, "Ignores conflicts on index keys during transaction") @@ -121,7 +123,7 @@ func readLine(r *bufio.Reader, buf *bytes.Buffer) error { // processSchemaFile process schema for a given gz file. func processSchemaFile(ctx context.Context, file string, dgraphClient *dgo.Dgraph) error { - fmt.Printf("\nProcessing %s\n", file) + fmt.Printf("\nProcessing schema file %q\n", file) if len(opt.authToken) > 0 { md := metadata.New(nil) md.Append("auth-token", opt.authToken) @@ -166,80 +168,76 @@ func (l *loader) uid(val string) string { return fmt.Sprintf("%#x", uint64(uid)) } -func fileReader(file string) (io.Reader, *os.File) { - f, err := os.Open(file) - x.Check(err) +// processFile forwards a file to the RDF or JSON processor as appropriate +func (l *loader) processFile(ctx context.Context, file string) error { + fmt.Printf("Processing data file %q\n", file) - var r io.Reader - if filepath.Ext(file) == ".gz" { - r, err = gzip.NewReader(f) - x.Check(err) - } else { - r = bufio.NewReader(f) + rd, cleanup := chunker.FileReader(file) + defer cleanup() + + var err error + var isJson bool + if strings.HasSuffix(file, ".rdf") || strings.HasSuffix(file, ".rdf.gz") { + err = l.processLoadFile(ctx, rd, chunker.NewChunker(chunker.RdfInput)) + } else if strings.HasSuffix(file, ".json") || strings.HasSuffix(file, ".json.gz") { + err = l.processLoadFile(ctx, rd, chunker.NewChunker(chunker.JsonInput)) + } else if isJson, err = chunker.IsJSONData(rd); err == nil { + if isJson { + err = l.processLoadFile(ctx, rd, chunker.NewChunker(chunker.JsonInput)) + } else { + err = fmt.Errorf("Unable to determine file content format: %s", file) + } } - return r, f + + return err } -// processFile sends mutations for a given gz file. -func (l *loader) processFile(ctx context.Context, file string) error { - fmt.Printf("\nProcessing %s\n", file) - gr, f := fileReader(file) - var buf bytes.Buffer - bufReader := bufio.NewReader(gr) - defer f.Close() +func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker) error { + x.CheckfNoTrace(ck.Begin(rd)) - var line uint64 - mu := api.Mutation{} - var batchSize int + batch := make([]*api.NQuad, 0, 2*opt.batchSize) for { select { case <-ctx.Done(): return ctx.Err() default: } - err := readLine(bufReader, &buf) - if err != nil { - if err != io.EOF { - return err + + var nqs []*api.NQuad + chunkBuf, err := ck.Chunk(rd) + if chunkBuf != nil && chunkBuf.Len() > 0 { + nqs, err = ck.Parse(chunkBuf) + x.CheckfNoTrace(err) + + for _, nq := range nqs { + nq.Subject = l.uid(nq.Subject) + if len(nq.ObjectId) > 0 { + nq.ObjectId = l.uid(nq.ObjectId) + } } - break - } - line++ - - nq, err := rdf.Parse(buf.String()) - if err == rdf.ErrEmpty { // special case: comment/empty line - buf.Reset() - continue - } else if err != nil { - return fmt.Errorf("Error while parsing RDF: %v, on line:%v %v", err, line, buf.String()) - } - batchSize++ - buf.Reset() - nq.Subject = l.uid(nq.Subject) - if len(nq.ObjectId) > 0 { - nq.ObjectId = l.uid(nq.ObjectId) + batch = append(batch, nqs...) + for len(batch) >= opt.batchSize { + mu := api.Mutation{Set: batch[:opt.batchSize]} + l.reqs <- mu + // The following would create a new batch slice. We should not use batch = + // batch[opt.batchSize:], because it would end up modifying the batch array passed + // to l.reqs above. + batch = append([]*api.NQuad{}, batch[opt.batchSize:]...) + } } - mu.Set = append(mu.Set, &nq) - - if batchSize >= opt.numRdf { - l.reqs <- mu - batchSize = 0 - mu = api.Mutation{} + if err == io.EOF { + if len(batch) > 0 { + l.reqs <- api.Mutation{Set: batch} + } + break + } else { + x.Check(err) } } - if batchSize > 0 { - l.reqs <- mu - mu = api.Mutation{} - } - return nil -} + x.CheckfNoTrace(ck.End(rd)) -func fileList(files string) []string { - if len(files) == 0 { - return []string{} - } - return strings.Split(files, ",") + return nil } func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader { @@ -288,12 +286,12 @@ func setup(opts batchMutationOptions, dc *dgo.Dgraph) *loader { func run() error { x.PrintVersion() opt = options{ - files: Live.Conf.GetString("rdfs"), + dataFiles: Live.Conf.GetString("files"), schemaFile: Live.Conf.GetString("schema"), dgraph: Live.Conf.GetString("dgraph"), zero: Live.Conf.GetString("zero"), concurrent: Live.Conf.GetInt("conc"), - numRdf: Live.Conf.GetInt("batch"), + batchSize: Live.Conf.GetInt("batch"), clientDir: Live.Conf.GetString("xidmap"), ignoreIndexConflict: Live.Conf.GetBool("ignore_index_conflict"), authToken: Live.Conf.GetString("auth_token"), @@ -305,7 +303,7 @@ func run() error { go http.ListenAndServe("localhost:6060", nil) ctx := context.Background() bmOpts := batchMutationOptions{ - Size: opt.numRdf, + Size: opt.batchSize, Pending: opt.concurrent, PrintCounters: true, Ctx: ctx, @@ -345,13 +343,16 @@ func run() error { fmt.Printf("Error while processing schema file %q: %s\n", opt.schemaFile, err) return err } - fmt.Printf("Processed schema file %q\n", opt.schemaFile) + fmt.Printf("Processed schema file %q\n\n", opt.schemaFile) } - filesList := fileList(opt.files) + filesList := x.FindDataFiles(opt.dataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"}) totalFiles := len(filesList) if totalFiles == 0 { + fmt.Printf("No data files to process\n") return nil + } else { + fmt.Printf("Found %d data file(s) to process\n", totalFiles) } // x.Check(dgraphClient.NewSyncMarks(filesList)) @@ -370,7 +371,7 @@ func run() error { for i := 0; i < totalFiles; i++ { if err := <-errCh; err != nil { - fmt.Printf("Error while processing file %q: %s\n", filesList[i], err) + fmt.Printf("Error while processing data file %q: %s\n", filesList[i], err) return err } } @@ -384,17 +385,17 @@ func run() error { c := l.Counter() var rate uint64 if c.Elapsed.Seconds() < 1 { - rate = c.Rdfs + rate = c.Nquads } else { - rate = c.Rdfs / uint64(c.Elapsed.Seconds()) + rate = c.Nquads / uint64(c.Elapsed.Seconds()) } // Lets print an empty line, otherwise Interrupted or Number of Mutations overwrites the // previous printed line. fmt.Printf("%100s\r", "") - fmt.Printf("Number of TXs run : %d\n", c.TxnsDone) - fmt.Printf("Number of RDFs processed : %d\n", c.Rdfs) - fmt.Printf("Time spent : %v\n", c.Elapsed) - fmt.Printf("RDFs processed per second : %d\n", rate) + fmt.Printf("Number of TXs run : %d\n", c.TxnsDone) + fmt.Printf("Number of N-Quads processed : %d\n", c.Nquads) + fmt.Printf("Time spent : %v\n", c.Elapsed) + fmt.Printf("N-Quads processed per second : %d\n", rate) return nil } diff --git a/dgraph/cmd/live/test_data/family.json b/dgraph/cmd/live/test_data/family.json new file mode 100644 index 00000000000..967652c3ce4 --- /dev/null +++ b/dgraph/cmd/live/test_data/family.json @@ -0,0 +1,69 @@ +[ + { + "uid":"_:h", + "name":"Homer", + "age":"38", + "role":"father", + "parent_to": [ + { "uid":"_:b" }, + { "uid":"_:l" }, + { "uid":"_:m2" } + ] + }, + { + "uid":"_:m1", + "name":"Marge", + "age":"34", + "role":"mother", + "aka":"Midge", + "parent_to": [ + { "uid":"_:b" }, + { "uid":"_:l" }, + { "uid":"_:m2" } + ] + }, + { + "uid":"_:b", + "name":"Bart", + "age":"10", + "role":"son", + "aka":"El Barto", + "carries":"slingshot", + "sibling_of": [ + { "uid":"_:l" }, + { "uid":"_:m2" } + ] + }, + { + "uid":"_:l", + "name":"Lisa", + "age":"8", + "role":"daughter", + "carries":"saxomophone", + "sibling_of": [ + { "uid":"_:b" }, + { "uid":"_:m2" } + ] + }, + { + "uid":"_:m2", + "name":"Maggie", + "age":"1", + "role":"daughter", + "carries":"pacifier", + "sibling_of": [ + { "uid":"_:b" }, + { "uid":"_:l" } + ] + }, + { + "uid":"_:a", + "name":"Abraham", + "age":"83", + "role":"father", + "aka":"Grampa", + "parent_to": [ + { "uid":"_:h" } + ] + } +] diff --git a/dgraph/cmd/live/test_data/family.schema b/dgraph/cmd/live/test_data/family.schema new file mode 100644 index 00000000000..e45ba0b313d --- /dev/null +++ b/dgraph/cmd/live/test_data/family.schema @@ -0,0 +1,7 @@ +name:string @index(term) . +age: int . +role:string @index(term) . +aka:string @index(term) . +carries:string @index(term) . +parent_to: [uid] @reverse . +sibling_of: [uid] @reverse . diff --git a/dgraph/cmd/live/test_data/family1.json b/dgraph/cmd/live/test_data/family1.json new file mode 100644 index 00000000000..16eb067b46f --- /dev/null +++ b/dgraph/cmd/live/test_data/family1.json @@ -0,0 +1,25 @@ +[ + { + "uid":"_:h", + "name":"Homer", + "age":"38", + "role":"father", + "parent_to": [ + { "uid":"_:b" }, + { "uid":"_:l" }, + { "uid":"_:m2" } + ] + }, + { + "uid":"_:m1", + "name":"Marge", + "age":"34", + "role":"mother", + "aka":"Midge", + "parent_to": [ + { "uid":"_:b" }, + { "uid":"_:l" }, + { "uid":"_:m2" } + ] + } +] diff --git a/dgraph/cmd/live/test_data/family2.json b/dgraph/cmd/live/test_data/family2.json new file mode 100644 index 00000000000..b7fd43beed2 --- /dev/null +++ b/dgraph/cmd/live/test_data/family2.json @@ -0,0 +1,36 @@ +[ + { + "uid":"_:b", + "name":"Bart", + "age":"10", + "role":"son", + "aka":"El Barto", + "carries":"slingshot", + "sibling_of": [ + { "uid":"_:l" }, + { "uid":"_:m2" } + ] + }, + { + "uid":"_:l", + "name":"Lisa", + "age":"8", + "role":"daughter", + "carries":"saxomophone", + "sibling_of": [ + { "uid":"_:b" }, + { "uid":"_:m2" } + ] + }, + { + "uid":"_:m2", + "name":"Maggie", + "age":"1", + "role":"daughter", + "carries":"pacifier", + "sibling_of": [ + { "uid":"_:b" }, + { "uid":"_:l" } + ] + } +] diff --git a/dgraph/cmd/live/test_data/family3.json b/dgraph/cmd/live/test_data/family3.json new file mode 100644 index 00000000000..23cb11324a5 --- /dev/null +++ b/dgraph/cmd/live/test_data/family3.json @@ -0,0 +1,12 @@ +[ + { + "uid":"_:a", + "name":"Abraham", + "age":"83", + "role":"father", + "aka":"Grampa", + "parent_to": [ + { "uid":"_:h" } + ] + } +] diff --git a/edgraph/server.go b/edgraph/server.go index 4e52e7c5dbd..2709a5bedef 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -32,11 +32,12 @@ import ( "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgo/y" + nqjson "github.com/dgraph-io/dgraph/chunker/json" + "github.com/dgraph-io/dgraph/chunker/rdf" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/gql" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/query" - "github.com/dgraph-io/dgraph/rdf" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/worker" @@ -645,14 +646,14 @@ func parseNQuads(b []byte) ([]*api.NQuad, error) { func parseMutationObject(mu *api.Mutation) (*gql.Mutation, error) { res := &gql.Mutation{} if len(mu.SetJson) > 0 { - nqs, err := nquadsFromJson(mu.SetJson, set) + nqs, err := nqjson.Parse(mu.SetJson, nqjson.SetNquads) if err != nil { return nil, err } res.Set = append(res.Set, nqs...) } if len(mu.DeleteJson) > 0 { - nqs, err := nquadsFromJson(mu.DeleteJson, delete) + nqs, err := nqjson.Parse(mu.DeleteJson, nqjson.DeleteNquads) if err != nil { return nil, err } diff --git a/edgraph/server_test.go b/edgraph/server_test.go index 038a41da937..3abd208e8b6 100644 --- a/edgraph/server_test.go +++ b/edgraph/server_test.go @@ -17,21 +17,11 @@ package edgraph import ( - "encoding/binary" - "encoding/json" - "fmt" - "math" "testing" - "time" "github.com/dgraph-io/dgo/protos/api" - "github.com/dgraph-io/dgraph/tok" - "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" - "github.com/golang/glog" "github.com/stretchr/testify/require" - geom "github.com/twpayne/go-geom" - "github.com/twpayne/go-geom/encoding/geojson" ) func makeNquad(sub, pred string, val *api.Value) *api.NQuad { @@ -50,358 +40,6 @@ func makeNquadEdge(sub, pred, obj string) *api.NQuad { } } -type School struct { - Name string `json:",omitempty"` -} - -type address struct { - Type string `json:"type,omitempty"` - Coords []float64 `json:"coordinates,omitempty"` -} - -type Person struct { - Uid string `json:"uid,omitempty"` - Name string `json:"name,omitempty"` - Age int `json:"age,omitempty"` - Married *bool `json:"married,omitempty"` - Now *time.Time `json:"now,omitempty"` - Address address `json:"address,omitempty"` // geo value - Friends []Person `json:"friend,omitempty"` - School *School `json:"school,omitempty"` -} - -func TestNquadsFromJson1(t *testing.T) { - tn := time.Now().UTC() - geoVal := `{"Type":"Point", "Coordinates":[1.1,2.0]}` - m := true - p := Person{ - Name: "Alice", - Age: 26, - Married: &m, - Now: &tn, - Address: address{ - Type: "Point", - Coords: []float64{1.1, 2.0}, - }, - } - - b, err := json.Marshal(p) - require.NoError(t, err) - - nq, err := nquadsFromJson(b, set) - require.NoError(t, err) - - require.Equal(t, 5, len(nq)) - - oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Alice"}} - require.Contains(t, nq, makeNquad("_:blank-0", "name", oval)) - - oval = &api.Value{Val: &api.Value_IntVal{IntVal: 26}} - require.Contains(t, nq, makeNquad("_:blank-0", "age", oval)) - - oval = &api.Value{Val: &api.Value_BoolVal{BoolVal: true}} - require.Contains(t, nq, makeNquad("_:blank-0", "married", oval)) - - oval = &api.Value{Val: &api.Value_StrVal{StrVal: tn.Format(time.RFC3339Nano)}} - require.Contains(t, nq, makeNquad("_:blank-0", "now", oval)) - - var g geom.T - err = geojson.Unmarshal([]byte(geoVal), &g) - require.NoError(t, err) - geo, err := types.ObjectValue(types.GeoID, g) - require.NoError(t, err) - - require.Contains(t, nq, makeNquad("_:blank-0", "address", geo)) -} - -func TestNquadsFromJson2(t *testing.T) { - m := false - - p := Person{ - Name: "Alice", - Friends: []Person{{ - Name: "Charlie", - Married: &m, - }, { - Uid: "1000", - Name: "Bob", - }}, - } - - b, err := json.Marshal(p) - require.NoError(t, err) - - nq, err := nquadsFromJson(b, set) - require.NoError(t, err) - - require.Equal(t, 6, len(nq)) - require.Contains(t, nq, makeNquadEdge("_:blank-0", "friend", "_:blank-1")) - require.Contains(t, nq, makeNquadEdge("_:blank-0", "friend", "1000")) - - oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Charlie"}} - require.Contains(t, nq, makeNquad("_:blank-1", "name", oval)) - - oval = &api.Value{Val: &api.Value_BoolVal{BoolVal: false}} - require.Contains(t, nq, makeNquad("_:blank-1", "married", oval)) - - oval = &api.Value{Val: &api.Value_StrVal{StrVal: "Bob"}} - require.Contains(t, nq, makeNquad("1000", "name", oval)) -} - -func TestNquadsFromJson3(t *testing.T) { - p := Person{ - Name: "Alice", - School: &School{ - Name: "Wellington Public School", - }, - } - - b, err := json.Marshal(p) - require.NoError(t, err) - - nq, err := nquadsFromJson(b, set) - require.NoError(t, err) - - require.Equal(t, 3, len(nq)) - require.Contains(t, nq, makeNquadEdge("_:blank-0", "school", "_:blank-1")) - - oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Wellington Public School"}} - require.Contains(t, nq, makeNquad("_:blank-1", "Name", oval)) -} - -func TestNquadsFromJson4(t *testing.T) { - json := `[{"name":"Alice","mobile":"040123456","car":"MA0123", "age": 21, "weight": 58.7}]` - - nq, err := nquadsFromJson([]byte(json), set) - require.NoError(t, err) - require.Equal(t, 5, len(nq)) - oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Alice"}} - require.Contains(t, nq, makeNquad("_:blank-0", "name", oval)) - require.Contains(t, nq, makeNquad("_:blank-0", "age", &api.Value{Val: &api.Value_IntVal{IntVal: 21}})) - require.Contains(t, nq, makeNquad("_:blank-0", "weight", - &api.Value{Val: &api.Value_DoubleVal{DoubleVal: 58.7}})) -} - -func TestJsonNumberParsing(t *testing.T) { - tests := []struct { - in string - out *api.Value - }{ - {`{"uid": "1", "key": 9223372036854775299}`, &api.Value{Val: &api.Value_IntVal{IntVal: 9223372036854775299}}}, - {`{"uid": "1", "key": 9223372036854775299.0}`, &api.Value{Val: &api.Value_DoubleVal{DoubleVal: 9223372036854775299.0}}}, - {`{"uid": "1", "key": 27670116110564327426}`, nil}, - {`{"uid": "1", "key": "23452786"}`, &api.Value{Val: &api.Value_StrVal{StrVal: "23452786"}}}, - {`{"uid": "1", "key": "23452786.2378"}`, &api.Value{Val: &api.Value_StrVal{StrVal: "23452786.2378"}}}, - {`{"uid": "1", "key": -1e10}`, &api.Value{Val: &api.Value_DoubleVal{DoubleVal: -1e+10}}}, - {`{"uid": "1", "key": 0E-0}`, &api.Value{Val: &api.Value_DoubleVal{DoubleVal: 0}}}, - } - - for _, test := range tests { - nqs, err := nquadsFromJson([]byte(test.in), set) - if test.out != nil { - require.NoError(t, err, "%T", err) - require.Equal(t, makeNquad("1", "key", test.out), nqs[0]) - } else { - require.Error(t, err) - } - } -} - -func TestNquadsFromJson_UidOutofRangeError(t *testing.T) { - json := `{"uid":"0xa14222b693e4ba34123","name":"Name","following":[{"name":"Bob"}],"school":[{"uid":"","name@en":"Crown Public School"}]}` - - _, err := nquadsFromJson([]byte(json), set) - require.Error(t, err) -} - -func TestNquadsFromJson_NegativeUidError(t *testing.T) { - json := `{"uid":"-100","name":"Name","following":[{"name":"Bob"}],"school":[{"uid":"","name@en":"Crown Public School"}]}` - - _, err := nquadsFromJson([]byte(json), set) - require.Error(t, err) -} - -func TestNquadsFromJson_EmptyUid(t *testing.T) { - json := `{"uid":"","name":"Name","following":[{"name":"Bob"}],"school":[{"uid":"","name":"Crown Public School"}]}` - - nq, err := nquadsFromJson([]byte(json), set) - require.NoError(t, err) - - require.Equal(t, 5, len(nq)) - oval := &api.Value{Val: &api.Value_StrVal{StrVal: "Name"}} - require.Contains(t, nq, makeNquad("_:blank-0", "name", oval)) -} - -func TestNquadsFromJson_BlankNodes(t *testing.T) { - json := `{"uid":"_:alice","name":"Alice","following":[{"name":"Bob"}],"school":[{"uid":"_:school","name":"Crown Public School"}]}` - - nq, err := nquadsFromJson([]byte(json), set) - require.NoError(t, err) - - require.Equal(t, 5, len(nq)) - require.Contains(t, nq, makeNquadEdge("_:alice", "school", "_:school")) -} - -func TestNquadsDeleteEdges(t *testing.T) { - json := `[{"uid": "0x1","name":null,"mobile":null,"car":null}]` - nq, err := nquadsFromJson([]byte(json), delete) - require.NoError(t, err) - require.Equal(t, 3, len(nq)) -} - -func checkCount(t *testing.T, nq []*api.NQuad, pred string, count int) { - for _, n := range nq { - if n.Predicate == pred { - require.Equal(t, count, len(n.Facets)) - break - } - } -} - -func getMapOfFacets(facets []*api.Facet) map[string]*api.Facet { - res := make(map[string]*api.Facet) - for _, f := range facets { - res[f.Key] = f - } - return res -} - -func checkFacets(t *testing.T, nq []*api.NQuad, pred string, facets []*api.Facet) { - for _, n := range nq { - if n.Predicate == pred { - require.Equal(t, len(facets), len(n.Facets), - fmt.Sprintf("expected %d facets, got %d", len(facets), len(n.Facets))) - - expectedFacets := getMapOfFacets(facets) - actualFacets := getMapOfFacets(n.Facets) - for key, f := range expectedFacets { - actualF, ok := actualFacets[key] - if !ok { - t.Fatalf("facet for key %s not found", key) - } - require.Equal(t, f, actualF, fmt.Sprintf("expected:%v\ngot:%v", f, actualF)) - } - } - } -} - -func TestNquadsFromJsonFacets1(t *testing.T) { - // test the 5 data types on facets, string, bool, int, float and datetime - operation := "READ WRITE" - operationTokens, err := tok.GetTermTokens([]string{operation}) - require.NoError(t, err, "unable to get tokens from the string %s", operation) - - timeStr := "2006-01-02T15:04:05Z" - time, err := types.ParseTime(timeStr) - if err != nil { - t.Fatalf("unable to convert string %s to time", timeStr) - } - timeBinary, err := time.MarshalBinary() - if err != nil { - t.Fatalf("unable to marshal time %v to binary", time) - } - - carPrice := 30000.56 - var priceBytes [8]byte - u := math.Float64bits(float64(carPrice)) - binary.LittleEndian.PutUint64(priceBytes[:], u) - - carAge := 3 - var ageBytes [8]byte - binary.LittleEndian.PutUint64(ageBytes[:], uint64(carAge)) - - json := fmt.Sprintf(`[{"name":"Alice","mobile":"040123456","car":"MA0123",`+ - `"mobile|operation": "%s", - "car|first":true, - "car|age": %d, - "car|price": %f, - "car|since": "%s" -}]`, operation, carAge, carPrice, timeStr) - - nq, err := nquadsFromJson([]byte(json), set) - require.NoError(t, err) - require.Equal(t, 3, len(nq)) - - for _, n := range nq { - glog.Infof("%v", n) - - } - - checkFacets(t, nq, "mobile", []*api.Facet{ - { - Key: "operation", - Value: []byte(operation), - ValType: api.Facet_STRING, - Tokens: operationTokens, - }, - }) - - checkFacets(t, nq, "car", []*api.Facet{ - { - Key: "first", - Value: []byte{1}, - ValType: api.Facet_BOOL, - }, - { - Key: "age", - Value: ageBytes[:], - ValType: api.Facet_INT, - }, - { - Key: "price", - Value: priceBytes[:], - ValType: api.Facet_FLOAT, - }, - { - Key: "since", - Value: timeBinary, - ValType: api.Facet_DATETIME, - }, - }) -} - -func TestNquadsFromJsonFacets2(t *testing.T) { - // Dave has uid facets which should go on the edge between Alice and Dave - json := `[{"name":"Alice","friend":[{"name":"Dave","friend|close":"true"}]}]` - - nq, err := nquadsFromJson([]byte(json), set) - require.NoError(t, err) - require.Equal(t, 3, len(nq)) - checkCount(t, nq, "friend", 1) -} - -func TestNquadsFromJsonError1(t *testing.T) { - p := Person{ - Name: "Alice", - School: &School{ - Name: "Wellington Public School", - }, - } - - b, err := json.Marshal(p) - require.NoError(t, err) - - _, err = nquadsFromJson(b, delete) - require.Error(t, err) - require.Contains(t, err.Error(), "UID must be present and non-zero while deleting edges.") -} - -func TestNquadsFromJsonList(t *testing.T) { - json := `{"address":["Riley Street","Redfern"],"phone_number":[123,9876],"points":[{"type":"Point", "coordinates":[1.1,2.0]},{"type":"Point", "coordinates":[2.0,1.1]}]}` - - nq, err := nquadsFromJson([]byte(json), set) - require.NoError(t, err) - require.Equal(t, 6, len(nq)) -} - -func TestNquadsFromJsonDelete(t *testing.T) { - json := `{"uid":1000,"friend":[{"uid":1001}]}` - - nq, err := nquadsFromJson([]byte(json), delete) - require.NoError(t, err) - require.Equal(t, nq[0], makeNquadEdge("1000", "friend", "1001")) -} - func TestParseNQuads(t *testing.T) { nquads := ` _:a "A" . diff --git a/gql/parser_test.go b/gql/parser_test.go index 3c9c3daf9c9..ea221bb49c1 100644 --- a/gql/parser_test.go +++ b/gql/parser_test.go @@ -23,7 +23,7 @@ import ( "testing" "github.com/dgraph-io/dgo/protos/api" - "github.com/dgraph-io/dgraph/rdf" + "github.com/dgraph-io/dgraph/chunker/rdf" "github.com/stretchr/testify/require" ) diff --git a/systest/bulk_live_fixture_test.go b/systest/bulk_live_fixture_test.go index 058aead39a6..34275435cbb 100644 --- a/systest/bulk_live_fixture_test.go +++ b/systest/bulk_live_fixture_test.go @@ -112,7 +112,7 @@ func (s *suite) setup(schemaFile, rdfFile string) { )) liveCmd := exec.Command(os.ExpandEnv("$GOPATH/bin/dgraph"), "live", - "--rdfs", rdfFile, + "--files", rdfFile, "--schema", schemaFile, "--dgraph", ":9180", ) diff --git a/systest/cluster_test.go b/systest/cluster_test.go index 0dc0638c909..211930627fa 100644 --- a/systest/cluster_test.go +++ b/systest/cluster_test.go @@ -174,7 +174,7 @@ func DONOTRUNTestClusterSnapshot(t *testing.T) { data := os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata_first_200k.rdf.gz") liveCmd := exec.Command(os.ExpandEnv("$GOPATH/bin/dgraph"), "live", - "--rdfs", data, + "--files", data, "--schema", schema, "--dgraph", ":"+cluster.dgraphPort, "--zero", ":"+cluster.zeroPort, diff --git a/systest/loader_test.go b/systest/loader_test.go index c0b91b8f21a..28341a69e6d 100644 --- a/systest/loader_test.go +++ b/systest/loader_test.go @@ -38,7 +38,7 @@ func TestLoaderXidmap(t *testing.T) { data := os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/first.rdf.gz") liveCmd := exec.Command(os.ExpandEnv("$GOPATH/bin/dgraph"), "live", - "--rdfs", data, + "--files", data, "--dgraph", ":"+cluster.dgraphPort, "--zero", ":"+cluster.zeroPort, "-x", "x", @@ -52,7 +52,7 @@ func TestLoaderXidmap(t *testing.T) { // Load another file, live should reuse the xidmap. data = os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/second.rdf.gz") liveCmd = exec.Command(os.ExpandEnv("$GOPATH/bin/dgraph"), "live", - "--rdfs", data, + "--files", data, "--dgraph", ":"+cluster.dgraphPort, "--zero", ":"+cluster.zeroPort, "-x", "x", diff --git a/test.sh b/test.sh index 228e6043690..bfbea47d5d1 100755 --- a/test.sh +++ b/test.sh @@ -58,6 +58,7 @@ function FindDefaultClusterTests { function Run { set -o pipefail + echo -en "...\r" go test ${GO_TEST_OPTS[*]} $@ \ | GREP_COLORS='mt=01;32' egrep --line-buffered --color=always '^ok\ .*|$' \ | GREP_COLORS='mt=00;38;5;226' egrep --line-buffered --color=always '^\?\ .*|$' \ diff --git a/worker/export_test.go b/worker/export_test.go index 7469901a7bf..17e4e102608 100644 --- a/worker/export_test.go +++ b/worker/export_test.go @@ -37,7 +37,7 @@ import ( "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/types/facets" - "github.com/dgraph-io/dgraph/rdf" + "github.com/dgraph-io/dgraph/chunker/rdf" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" ) diff --git a/x/file.go b/x/file.go index 311b89c0185..e1132606fc6 100644 --- a/x/file.go +++ b/x/file.go @@ -19,7 +19,9 @@ package x import ( "os" "path/filepath" + "strings" + "github.com/dgraph-io/dgo/x" "github.com/golang/glog" ) @@ -57,3 +59,36 @@ func FindFilesFunc(dir string, f func(string) bool) []string { } return files } + +// FindDataFiles returns a list of data files as a string array. If str is a comma-separated list +// of paths, it returns that list. If str is a single path that is not a directory, it returns that +// path. If str is a directory, it returns the files in it that have one of the extensions in ext. +func FindDataFiles(str string, ext []string) []string { + if len(str) == 0 { + return []string{} + } + + list := strings.Split(str, ",") + if len(list) == 1 { + fi, err := os.Stat(str) + if os.IsNotExist(err) { + glog.Errorf("File or directory does not exist: %s", str) + return []string{} + } + x.Check(err) + + if fi.IsDir() { + match_fn := func(f string) bool { + for _, e := range ext { + if strings.HasSuffix(f, e) { + return true + } + } + return false + } + list = FindFilesFunc(str, match_fn) + } + } + + return list +} diff --git a/z/client.go b/z/client.go new file mode 100644 index 00000000000..316aeeeef33 --- /dev/null +++ b/z/client.go @@ -0,0 +1,72 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package z + +import ( + "context" + "encoding/json" + "testing" + + "github.com/dgraph-io/dgo" + "github.com/dgraph-io/dgo/protos/api" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + "github.com/dgraph-io/dgraph/x" +) + +func DgraphClient(serviceAddr string) *dgo.Dgraph { + conn, err := grpc.Dial(serviceAddr, grpc.WithInsecure()) + x.Check(err) + + dg := dgo.NewDgraphClient(api.NewDgraphClient(conn)) + err = dg.Alter(context.Background(), &api.Operation{DropAll: true}) + x.Check(err) + + return dg +} + +func DropAll(t *testing.T, dg *dgo.Dgraph) { + err := dg.Alter(context.Background(), &api.Operation{DropAll: true}) + x.Check(err) + + nodes := DbNodeCount(t, dg) + require.Equal(t, 0, nodes) +} + +func DbNodeCount(t *testing.T, dg *dgo.Dgraph) int { + resp, err := dg.NewTxn().Query(context.Background(), ` + { + q(func: has(_predicate_)) { + count(uid) + } + } + `) + x.Check(err) + + type count struct { + Count int + } + type root struct { + Q []count + } + var response root + err = json.Unmarshal(resp.GetJson(), &response) + x.Check(err) + + return response.Q[0].Count +} diff --git a/z/exec.go b/z/exec.go new file mode 100644 index 00000000000..a8b976cd1a0 --- /dev/null +++ b/z/exec.go @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package z + +import ( + "fmt" + "io" + "os" + "os/exec" + + "github.com/dgraph-io/dgraph/x" +) + +// for debugging the tests +const showOutput = false +const showCommands = false + +// Pipeline runs several commands such that the output of one command becomes the input of the next. +// The first argument should be an two-dimensional array containing the commands. +// TODO: allow capturing output, sending to terminal, etc +func Pipeline(cmds [][]string) error { + var p io.ReadCloser + var numCmds = len(cmds) + + cmd := make([]*exec.Cmd, numCmds) + + // Run all commands in parallel, connecting stdin of each to the stdout of the previous. + for i, c := range cmds { + lastCmd := i == numCmds-1 + if showCommands { + fmt.Fprintf(os.Stderr, "%+v", c) + } + + cmd[i] = exec.Command(c[0], c[1:]...) + cmd[i].Stdin = p + if !lastCmd { + p, _ = cmd[i].StdoutPipe() + } + + if showOutput { + cmd[i].Stderr = os.Stderr + if lastCmd { + cmd[i].Stdout = os.Stdout + } + } + + if showCommands { + if lastCmd { + fmt.Fprintf(os.Stderr, "\n") + } else { + fmt.Fprintf(os.Stderr, "\n| ") + } + } + + err := cmd[i].Start() + x.Check(err) + } + + // Make sure to properly reap all spawned processes, but only save the error from the + // earliest stage of the pipeline. + var err error + for i, _ := range cmds { + e := cmd[i].Wait() + if e != nil && err == nil { + err = e + } + } + + return err +} diff --git a/z/json.go b/z/json.go new file mode 100644 index 00000000000..43553ef9d4d --- /dev/null +++ b/z/json.go @@ -0,0 +1,104 @@ +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package z + +import ( + "encoding/json" + "fmt" + "hash/crc64" + "reflect" + "sort" + "testing" +) + +func CompareJSON(t *testing.T, want, got string) { + wantMap := map[string]interface{}{} + err := json.Unmarshal([]byte(want), &wantMap) + if err != nil { + t.Fatalf("Could not unmarshal want JSON: %v", err) + } + gotMap := map[string]interface{}{} + err = json.Unmarshal([]byte(got), &gotMap) + if err != nil { + t.Fatalf("Could not unmarshal got JSON: %v", err) + } + + sortJSON(wantMap) + sortJSON(gotMap) + + if !reflect.DeepEqual(wantMap, gotMap) { + wantBuf, err := json.MarshalIndent(wantMap, "", " ") + if err != nil { + t.Error("Could not marshal JSON:", err) + } + gotBuf, err := json.MarshalIndent(gotMap, "", " ") + if err != nil { + t.Error("Could not marshal JSON:", err) + } + t.Errorf("Want JSON and Got JSON not equal\nWant:\n%v\nGot:\n%v", + string(wantBuf), string(gotBuf)) + } +} + +// sortJSON looks for any arrays in the unmarshalled JSON and sorts them in an +// arbitrary but deterministic order based on their content. +func sortJSON(i interface{}) uint64 { + if i == nil { + return 0 + } + switch i := i.(type) { + case map[string]interface{}: + return sortJSONMap(i) + case []interface{}: + return sortJSONArray(i) + default: + h := crc64.New(crc64.MakeTable(crc64.ISO)) + fmt.Fprint(h, i) + return h.Sum64() + } +} + +func sortJSONMap(m map[string]interface{}) uint64 { + var h uint64 + for _, k := range m { + // Because xor is commutative, it doesn't matter that map iteration + // is in random order. + h ^= sortJSON(k) + } + return h +} + +type arrayElement struct { + elem interface{} + sortBy uint64 +} + +func sortJSONArray(a []interface{}) uint64 { + var h uint64 + elements := make([]arrayElement, len(a)) + for i, elem := range a { + elements[i] = arrayElement{elem, sortJSON(elem)} + h ^= elements[i].sortBy + } + sort.Slice(elements, func(i, j int) bool { + return elements[i].sortBy < elements[j].sortBy + }) + for i := range a { + a[i] = elements[i].elem + } + return h +}