Skip to content

Commit

Permalink
Support passing GraphQL schema to bulk loader. (#5509)
Browse files Browse the repository at this point in the history
Support passing a GraphQL schema file to the bulk loader. The bulk loader will generate
the triples relevant triples expected by Dgraph.

Related to DGRAPH-1283
  • Loading branch information
martinmr authored May 26, 2020
1 parent 36be40c commit cc97957
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 7 deletions.
50 changes: 50 additions & 0 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"log"
"os"
"path/filepath"
"strconv"
"sync"
"time"

Expand All @@ -47,6 +48,7 @@ type options struct {
DataFiles string
DataFormat string
SchemaFile string
GqlSchemaFile string
OutDir string
ReplaceOutDir bool
TmpDir string
Expand Down Expand Up @@ -237,6 +239,9 @@ func (ld *loader) mapStage() {
}
x.Check(thr.Finish())

// Send the graphql triples
ld.processGqlSchema(loadType)

close(ld.readerChunkCh)
mapperWg.Wait()

Expand All @@ -251,6 +256,51 @@ func (ld *loader) mapStage() {
ld.xids = nil
}

func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
if ld.opt.GqlSchemaFile == "" {
return
}

f, err := os.Open(ld.opt.GqlSchemaFile)
x.Check(err)
defer f.Close()

key := ld.opt.EncryptionKey
if !ld.opt.Encrypted {
key = nil
}
r, err := enc.GetReader(key, f)
x.Check(err)
if filepath.Ext(ld.opt.GqlSchemaFile) == ".gz" {
r, err = gzip.NewReader(r)
x.Check(err)
}

buf, err := ioutil.ReadAll(r)
x.Check(err)

rdfSchema := `_:gqlschema <dgraph.type> "dgraph.graphql" .
_:gqlschema <dgraph.graphql.xid> "dgraph.graphql.schema" .
_:gqlschema <dgraph.graphql.schema> %s .
`

jsonSchema := `{
"dgraph.type": "dgraph.graphql",
"dgraph.graphql.xid": "dgraph.graphql.schema",
"dgraph.graphql.schema": %s
}`

gqlBuf := &bytes.Buffer{}
schema := strconv.Quote(string(buf))
switch loadType {
case chunker.RdfFormat:
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(rdfSchema, schema))))
case chunker.JsonFormat:
x.Check2(gqlBuf.Write([]byte(fmt.Sprintf(jsonSchema, schema))))
}
ld.readerChunkCh <- gqlBuf
}

func (ld *loader) reduceStage() {
ld.prog.setPhase(reducePhase)

Expand Down
2 changes: 2 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func init() {
"Location of *.rdf(.gz) or *.json(.gz) file(s) to load.")
flag.StringP("schema", "s", "",
"Location of schema file.")
flag.StringP("graphql_schema", "g", "", "Location of the GraphQL schema file.")
flag.String("format", "",
"Specify file format (rdf or json) instead of getting it from filename.")
flag.Bool("encrypted", false,
Expand Down Expand Up @@ -116,6 +117,7 @@ func run() {
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
Expand Down
27 changes: 25 additions & 2 deletions systest/bulk_live_cases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestBulkSingleUid(t *testing.T) {
_:erin <name> "Erin" .
_:frank <name> "Frank" .
_:grace <name> "Grace" .
`)
`, "")
defer s.cleanup()

// Ensures that the index keys are written to disk after commit.
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestDeleteEdgeWithStar(t *testing.T) {
<0x2> <name> "Alice" .
<0x3> <name> "Bob" .
`)
`, "")
defer s.cleanup()

_, err := s.bulkCluster.client.NewTxn().Mutate(context.Background(), &api.Mutation{
Expand All @@ -497,6 +497,28 @@ func TestDeleteEdgeWithStar(t *testing.T) {

}

func TestGqlSchema(t *testing.T) {
s := newBulkOnlySuite(t, "", "", "abc")
defer s.cleanup()

t.Run("Get GraphQL schema", s.testCase(`
{
schema(func: has(dgraph.graphql.schema)) {
dgraph.graphql.schema
dgraph.graphql.xid
dgraph.type
}
}`, `
{
"schema": [{
"dgraph.graphql.schema": "abc",
"dgraph.graphql.xid": "dgraph.graphql.schema",
"dgraph.type": ["dgraph.graphql"]
}]
}`))

}

// TODO: Fix this later.
func DONOTRUNTestGoldenData(t *testing.T) {
if testing.Short() {
Expand All @@ -506,6 +528,7 @@ func DONOTRUNTestGoldenData(t *testing.T) {
s := newSuiteFromFile(t,
os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.schema"),
os.ExpandEnv("$GOPATH/src/github.com/dgraph-io/dgraph/systest/data/goldendata.rdf.gz"),
"",
)
defer s.cleanup()

Expand Down
15 changes: 10 additions & 5 deletions systest/bulk_live_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type suite struct {

type suiteOpts struct {
schema string
gqlSchema string
rdfs string
skipBulkLoader bool
skipLiveLoader bool
Expand Down Expand Up @@ -85,7 +86,9 @@ func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
s.checkFatal(ioutil.WriteFile(rdfFile, []byte(opts.rdfs), 0644))
schemaFile := filepath.Join(rootDir, "schema.txt")
s.checkFatal(ioutil.WriteFile(schemaFile, []byte(opts.schema), 0644))
s.setup(schemaFile, rdfFile)
gqlSchemaFile := filepath.Join(rootDir, "gql_schema.txt")
s.checkFatal(ioutil.WriteFile(gqlSchemaFile, []byte(opts.gqlSchema), 0644))
s.setup(schemaFile, rdfFile, gqlSchemaFile)
return s
}

Expand All @@ -97,26 +100,27 @@ func newSuite(t *testing.T, schema, rdfs string) *suite {
return newSuiteInternal(t, opts)
}

func newBulkOnlySuite(t *testing.T, schema, rdfs string) *suite {
func newBulkOnlySuite(t *testing.T, schema, rdfs, gqlSchema string) *suite {
opts := suiteOpts{
schema: schema,
gqlSchema: gqlSchema,
rdfs: rdfs,
skipLiveLoader: true,
}
return newSuiteInternal(t, opts)
}

func newSuiteFromFile(t *testing.T, schemaFile, rdfFile string) *suite {
func newSuiteFromFile(t *testing.T, schemaFile, rdfFile, gqlSchemaFile string) *suite {
if testing.Short() {
t.Skip("Skipping system test with long runtime.")
}
s := &suite{t: t}

s.setup(schemaFile, rdfFile)
s.setup(schemaFile, rdfFile, gqlSchemaFile)
return s
}

func (s *suite) setup(schemaFile, rdfFile string) {
func (s *suite) setup(schemaFile, rdfFile, gqlSchemaFile string) {
var (
bulkDir = filepath.Join(rootDir, "bulk")
liveDir = filepath.Join(rootDir, "live")
Expand All @@ -137,6 +141,7 @@ func (s *suite) setup(schemaFile, rdfFile string) {
bulkCmd := exec.Command(testutil.DgraphBinaryPath(), "bulk",
"-f", rdfFile,
"-s", schemaFile,
"-g", gqlSchemaFile,
"--http", "localhost:"+strconv.Itoa(freePort(0)),
"-j=1",
"--store_xids=true",
Expand Down

0 comments on commit cc97957

Please sign in to comment.