diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 5dc3b50cb5a..3e0a8cc6485 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -27,6 +27,7 @@ import ( "log" "os" "path/filepath" + "strconv" "sync" "time" @@ -47,6 +48,7 @@ type options struct { DataFiles string DataFormat string SchemaFile string + GqlSchemaFile string OutDir string ReplaceOutDir bool TmpDir string @@ -237,6 +239,9 @@ func (ld *loader) mapStage() { } x.Check(thr.Finish()) + // Send the graphql triples + ld.processGqlSchema(loadType) + close(ld.readerChunkCh) mapperWg.Wait() @@ -251,6 +256,51 @@ func (ld *loader) mapStage() { ld.xids = nil } +func (ld *loader) processGqlSchema(loadType chunker.InputFormat) { + if ld.opt.GqlSchemaFile == "" { + return + } + + f, err := os.Open(ld.opt.GqlSchemaFile) + x.Check(err) + defer f.Close() + + key := ld.opt.EncryptionKey + if !ld.opt.Encrypted { + key = nil + } + r, err := enc.GetReader(key, f) + x.Check(err) + if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" { + r, err = gzip.NewReader(r) + x.Check(err) + } + + buf, err := ioutil.ReadAll(r) + x.Check(err) + + rdfSchema := `_:gqlschema "dgraph.graphql" . + _:gqlschema "dgraph.graphql.schema" . + _:gqlschema %s . + ` + + jsonSchema := `{ + "dgraph.type": "dgraph.graphql", + "dgraph.graphql.xid": "dgraph.graphql.schema", + "dgraph.graphql.schema": %s + }` + + gqlBuf := &bytes.Buffer{} + schema := strconv.Quote(string(buf)) + switch loadType { + case chunker.RdfFormat: + x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(rdfSchema, schema)))) + case chunker.JsonFormat: + x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(jsonSchema, schema)))) + } + ld.readerChunkCh <- gqlBuf +} + func (ld *loader) reduceStage() { ld.prog.setPhase(reducePhase) diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 75e9d613264..96fecceb388 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -56,6 +56,7 @@ func init() { "Location of *.rdf(.gz) or *.json(.gz) file(s) to load.") flag.StringP("schema", "s", "", "Location of schema file.") + flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.") flag.String("format", "", "Specify file format (rdf or json) instead of getting it from filename.") flag.Bool("encrypted", false, @@ -116,6 +117,7 @@ func run() { DataFiles: Bulk.Conf.GetString("files"), DataFormat: Bulk.Conf.GetString("format"), SchemaFile: Bulk.Conf.GetString("schema"), + GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"), Encrypted: Bulk.Conf.GetBool("encrypted"), OutDir: Bulk.Conf.GetString("out"), ReplaceOutDir: Bulk.Conf.GetBool("replace_out"), diff --git a/systest/bulk_live_cases_test.go b/systest/bulk_live_cases_test.go index 5eca77398f7..e9dcdcc0274 100644 --- a/systest/bulk_live_cases_test.go +++ b/systest/bulk_live_cases_test.go @@ -343,7 +343,7 @@ func TestBulkSingleUid(t *testing.T) { _:erin "Erin" . _:frank "Frank" . _:grace "Grace" . - `) + `, "") defer s.cleanup() // Ensures that the index keys are written to disk after commit. @@ -474,7 +474,7 @@ func TestDeleteEdgeWithStar(t *testing.T) { <0x2> "Alice" . <0x3> "Bob" . - `) + `, "") defer s.cleanup() _, err := s.bulkCluster.client.NewTxn().Mutate(context.Background(), &api.Mutation{ @@ -497,6 +497,28 @@ func TestDeleteEdgeWithStar(t *testing.T) { } +func TestGqlSchema(t *testing.T) { + s := newBulkOnlySuite(t, "", "", "abc") + defer s.cleanup() + + t.Run("Get GraphQL schema", s.testCase(` + { + schema(func: has(dgraph.graphql.schema)) { + dgraph.graphql.schema + dgraph.graphql.xid + dgraph.type + } + }`, ` + { + "schema": [{ + "dgraph.graphql.schema": "abc", + "dgraph.graphql.xid": "dgraph.graphql.schema", + "dgraph.type": ["dgraph.graphql"] + }] + }`)) + +} + // TODO: Fix this later. func DONOTRUNTestGoldenData(t *testing.T) { if testing.Short() { @@ -506,6 +528,7 @@ func DONOTRUNTestGoldenData(t *testing.T) { s := newSuiteFromFile(t, os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.schema"), os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.rdf.gz"), + "", ) defer s.cleanup() diff --git a/systest/bulk_live_fixture_test.go b/systest/bulk_live_fixture_test.go index 89eb1c2538f..ae5681dffbc 100644 --- a/systest/bulk_live_fixture_test.go +++ b/systest/bulk_live_fixture_test.go @@ -53,6 +53,7 @@ type suite struct { type suiteOpts struct { schema string + gqlSchema string rdfs string skipBulkLoader bool skipLiveLoader bool @@ -85,7 +86,9 @@ func newSuiteInternal(t *testing.T, opts suiteOpts) *suite { s.checkFatal(ioutil.WriteFile(rdfFile, []byte(opts.rdfs), 0644)) schemaFile := filepath.Join(rootDir, "schema.txt") s.checkFatal(ioutil.WriteFile(schemaFile, []byte(opts.schema), 0644)) - s.setup(schemaFile, rdfFile) + gqlSchemaFile := filepath.Join(rootDir, "gql_schema.txt") + s.checkFatal(ioutil.WriteFile(gqlSchemaFile, []byte(opts.gqlSchema), 0644)) + s.setup(schemaFile, rdfFile, gqlSchemaFile) return s } @@ -97,26 +100,27 @@ func newSuite(t *testing.T, schema, rdfs string) *suite { return newSuiteInternal(t, opts) } -func newBulkOnlySuite(t *testing.T, schema, rdfs string) *suite { +func newBulkOnlySuite(t *testing.T, schema, rdfs, gqlSchema string) *suite { opts := suiteOpts{ schema: schema, + gqlSchema: gqlSchema, rdfs: rdfs, skipLiveLoader: true, } return newSuiteInternal(t, opts) } -func newSuiteFromFile(t *testing.T, schemaFile, rdfFile string) *suite { +func newSuiteFromFile(t *testing.T, schemaFile, rdfFile, gqlSchemaFile string) *suite { if testing.Short() { t.Skip("Skipping system test with long runtime.") } s := &suite{t: t} - s.setup(schemaFile, rdfFile) + s.setup(schemaFile, rdfFile, gqlSchemaFile) return s } -func (s *suite) setup(schemaFile, rdfFile string) { +func (s *suite) setup(schemaFile, rdfFile, gqlSchemaFile string) { var ( bulkDir = filepath.Join(rootDir, "bulk") liveDir = filepath.Join(rootDir, "live") @@ -137,6 +141,7 @@ func (s *suite) setup(schemaFile, rdfFile string) { bulkCmd := exec.Command(testutil.DgraphBinaryPath(), "bulk", "-f", rdfFile, "-s", schemaFile, + "-g", gqlSchemaFile, "--http", "localhost:"+strconv.Itoa(freePort(0)), "-j=1", "--store_xids=true",