Skip to content

Commit

Permalink
Codexnull/bulk schema (hypermodeinc#2762)
Browse files Browse the repository at this point in the history
* Don't skip predicates with value type of default when loading the schema. (hypermodeinc#2616)

* Allow running test.sh from another directory.

* Keep all predicates from bulk import schema, not just the ones used.

* Make set of predicates the union of predicates in the schema and rdf.

* Add test for schema after export/bulk load.

* Add more schema test cases.
  • Loading branch information
codexnull authored and dna2github committed Jul 19, 2019
1 parent d326e58 commit 32443a6
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 5 deletions.
227 changes: 227 additions & 0 deletions contrib/scripts/test-bulk-schema.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
#!/bin/bash
# verify fix of https://github.com/dgraph-io/dgraph/issues/2616

readonly ME=${0##*/}
readonly SRCDIR=$(readlink -f ${BASH_SOURCE[0]%/*})

declare -ri PORT_OFFSET=$((RANDOM % 1000))
declare -ri ZERO_PORT=$((5080+PORT_OFFSET))
declare -ri ALPHA_PORT=$((7080+PORT_OFFSET)) HTTP_PORT=$((8080+PORT_OFFSET))

INFO() { echo "$ME: $@"; }
ERROR() { echo >&2 "$ME: $@"; }
FATAL() { ERROR "$@"; exit 1; }

set -e

INFO "running bulk load schema test"

WORKDIR=$(mktemp --tmpdir -d $ME.tmp-XXXXXX)
INFO "using workdir $WORKDIR"
cd $WORKDIR

function StartZero
{
INFO "starting zero server on port $ZERO_PORT"
dgraph zero -o $PORT_OFFSET --my=localhost:$ZERO_PORT \
>zero.log 2>&1 </dev/null &
ZERO_PID=$!
sleep 1
$SRCDIR/../wait-for-it.sh -q -t 30 localhost:$ZERO_PORT \
|| FATAL "failed to start zero"
}

function BulkLoadSampleData
{
INFO "bulk loading sample data"
cat >1million.schema <<EOF
director.film: uid @reverse .
genre: uid @reverse .
initial_release_date: dateTime @index(year) .
name: string @index(term) @lang .
EOF
mkfifo 1million.rdf.gz
curl -LsS 'https://github.com/dgraph-io/tutorial/blob/master/resources/1million.rdf.gz?raw=true' >> 1million.rdf.gz &
dgraph bulk -z localhost:$ZERO_PORT -s 1million.schema -r 1million.rdf.gz \
>bulk.log 2>&1 </dev/null
}

function StartAlpha
{
INFO "starting alpha server on port $ALPHA_PORT"
dgraph alpha -o $PORT_OFFSET --my=localhost:$ALPHA_PORT --zero=localhost:$ZERO_PORT --lru_mb=2048 \
>alpha.log 2>&1 </dev/null &
ALPHA_PID=$!
sleep 1
$SRCDIR/../wait-for-it.sh -q -t 30 localhost:$ALPHA_PORT \
|| FATAL "failed to start alpha"
}

function UpdateDatabase
{
INFO "adding predicate with default type to schema"
curl localhost:$HTTP_PORT/alter -X POST -d$'
predicate_with_no_uid_count:string .
predicate_with_default_type:default .
predicate_with_index_no_uid_count:string @index(exact) .
' &>/dev/null

curl localhost:$HTTP_PORT/mutate -X POST -H 'X-Dgraph-CommitNow: true' -d $'
{
set {
_:company1 <predicate_with_default_type> "CompanyABC" .
}
}
' &>/dev/null
}

function QuerySchema
{
INFO "running schema query"
local out_file=${1:?no out file}
curl -sS localhost:$HTTP_PORT/query -XPOST -d'schema {}' | python -c "import json,sys; d=json.load(sys.stdin); json.dump(d['data'],sys.stdout,sort_keys=True,indent=2,separators=(',',': '))" > $out_file
echo >> $out_file
#INFO "schema is: " && cat $out_file
}

function DoExport
{
INFO "running export"
curl localhost:$HTTP_PORT/admin/export &>/dev/null
sleep 1
}

function BulkLoadExportedData
{
INFO "bulk loading exported data"
dgraph bulk -z localhost:$ZERO_PORT \
-s ../dir1/export/*/g01.schema.gz \
-r ../dir1/export/*/g01.rdf.gz \
>bulk.log 2>&1 </dev/null
mv out/0/p .
}

function BulkLoadFixtureData
{
INFO "bulk loading fixture data"

# schema test cases:
#
# 1. predicate with non-default type (name)
# 2. predicate with default type (genre)
# 3. predicate not used in rdf (language)
cat >fixture.schema <<EOF
name:string @index(term) .
genre:default .
language:string .
EOF

# rdf test cases:
#
# 4. predicate not in schema (revenue)
cat >fixture.rdf <<EOF
_:et <name> "E.T. the Extra-Terrestrial" .
_:et <genre> "Science Fiction" .
_:et <revenue> "792.9" .
EOF

dgraph bulk -z localhost:$ZERO_PORT -s fixture.schema -r fixture.rdf \
>bulk.log 2>&1 </dev/null
mv out/0/p .
}

function StopServers
{
INFO "killing zero server at pid $ZERO_PID"
INFO "killing alpha server at pid $ALPHA_PID"
kill $ZERO_PID $ALPHA_PID
sleep 1
}

function Cleanup
{
INFO "removing $WORKDIR"
rm -rf $WORKDIR
}

mkdir dir1
pushd dir1 >/dev/null

StartZero
BulkLoadSampleData
StartAlpha
UpdateDatabase
QuerySchema "schema.out"
DoExport
StopServers

popd >/dev/null
mkdir dir2
pushd dir2 >/dev/null

StartZero
BulkLoadExportedData
StartAlpha
QuerySchema "schema.out"
StopServers

popd >/dev/null

INFO "verifing schema is same before export and after bulk import"
diff dir1/schema.out dir2/schema.out || FATAL "schema incorrect"
INFO "schema is correct"

mkdir dir3
pushd dir3 >/dev/null

StartZero
BulkLoadFixtureData
StartAlpha
QuerySchema "schema.out"
StopServers

popd >/dev/null

# final schema should include *all* predicates regardless of whether they were
# introduced by the schema or rdf file, used or not used, or of default type
# or non-default type
INFO "verifying schema contains all predicates"
diff - dir3/schema.out <<EOF || FATAL "schema incorrect"
{
"schema": [
{
"list": true,
"predicate": "_predicate_",
"type": "string"
},
{
"predicate": "genre",
"type": "default"
},
{
"predicate": "language",
"type": "string"
},
{
"index": true,
"predicate": "name",
"tokenizer": [
"term"
],
"type": "string"
},
{
"predicate": "revenue",
"type": "default"
}
]
}
EOF

INFO "schema is correct"

Cleanup

exit 0

# eof
2 changes: 0 additions & 2 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package bulk

import (
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -113,7 +112,6 @@ func run() {
os.Exit(0)
}
if opt.RDFDir == "" || opt.SchemaFile == "" {
flag.Usage()
fmt.Fprint(os.Stderr, "RDF and schema file(s) must be specified.\n")
os.Exit(1)
}
Expand Down
16 changes: 15 additions & 1 deletion dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,21 @@ func (s *schemaStore) getPredicates(db *badger.DB) []string {
func (s *schemaStore) write(db *badger.DB) {
// Write schema always at timestamp 1, s.state.writeTs may not be equal to 1
// if bulk loader was restarted or other similar scenarios.
preds := s.getPredicates(db)

// Get predicates from the schema store so that the db includes all
// predicates from the schema file.
preds := make([]string, 0, len(s.m))
for pred := range s.m {
preds = append(preds, pred)
}

// Add predicates from the db so that final schema includes predicates
// used in the rdf file but not included in the schema file.
for _, pred := range s.getPredicates(db) {
if _, ok := s.m[pred]; ! ok {
preds = append(preds, pred)
}
}

txn := db.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()
Expand Down
2 changes: 1 addition & 1 deletion schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func LoadFromDb() error {
var s pb.SchemaUpdate
err := item.Value(func(val []byte) error {
if len(val) == 0 {
return nil
s = pb.SchemaUpdate{Predicate: attr, ValueType: pb.Posting_DEFAULT}
}
x.Checkf(s.Unmarshal(val), "Error while loading schema from db")
State().Set(attr, s)
Expand Down
8 changes: 7 additions & 1 deletion test.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#!/bin/bash

source contrib/scripts/functions.sh
# run from directory containing this script
cd ${BASH_SOURCE[0]%/*}

source ./contrib/scripts/functions.sh
function run {
go test -short=true $@ |\
GREP_COLORS='mt=01;32' egrep --line-buffered --color=always '^ok\ .*|$' |\
Expand Down Expand Up @@ -31,6 +34,9 @@ echo
echo "Running tests. Ignoring vendor folder."
runAll || exit $?

# Run non-go tests.
./contrib/scripts/test-bulk-schema.sh

echo
echo "Running load-test.sh"
./contrib/scripts/load-test.sh
Expand Down

0 comments on commit 32443a6

Please sign in to comment.