Skip to content

Commit

Permalink
Cherry-pick fix for offset not working with multiple order statements…
Browse files Browse the repository at this point in the history
… into release/v1.0 (#3455)

* Fix offset doesn't return correct results with multiple order statements. (#3400)

Multisort happens in two stages.

In the first stage the uids are sorted by the first predicate and
In the second stage the other sorts are executed with the ids that are the output from the first stage.
While considering the uids that are input for the second stage, we must also consider uids with equal values at the boundary where offset/count is applied. This was being done for count but not for offset.

This PR adds multiSortOffsets to sortResult which keeps track of the pending offset (which would be <= offset in the query). The multiSortOffset must be applied to individual uid lists after all the sorts are done.

* Use processToFastJsonNoErr in query_test.
  • Loading branch information
pawanrawal authored May 21, 2019
1 parent 6e4b47b commit f24d2da
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 38 deletions.
2 changes: 2 additions & 0 deletions contrib/scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go get github.com/dgraph-io/dgo
go get github.com/stretchr/testify/require

pushd $GOPATH/src/google.golang.org/grpc
# TODO(pawan) - This file seems to be useless. Delete it as dgraph doesn't compile with
# grpc v1.8.2
git checkout v1.8.2
popd

67 changes: 67 additions & 0 deletions query/query1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1819,6 +1819,73 @@ func TestMultiSort7Paginate(t *testing.T) {
require.JSONEq(t, `{"data": {"me":[{"name":"Alice","age":25},{"name":"Alice","age":75},{"name":"Alice","age":75},{"name":"Bob","age":25},{"name":"Bob","age":75},{"name":"Colin","age":25},{"name":"Elizabeth","age":25}]}}`, js)
}

func TestMultiSortPaginateWithOffset(t *testing.T) {
t.Parallel()
tests := []struct {
name string
query string
result string
}{
{
"Offset in middle of bucket",
`{
me(func: uid(10005, 10006, 10001, 10002, 10003, 10004, 10007, 10000), orderasc: name, orderasc: age, first: 6, offset: 1) {
name
age
}
}`,
`{"data": {"me":[{"name":"Alice","age":75},{"name":"Alice","age":75},{"name":"Bob","age":25},{"name":"Bob","age":75},{"name":"Colin","age":25},{"name":"Elizabeth","age":25}]}}`,
},
{
"Offset at boundary of bucket",
`{
me(func: uid(10005, 10006, 10001, 10002, 10003, 10004, 10007, 10000), orderasc: name, orderasc: age, first: 4, offset: 3) {
name
age
}
}`,
`{"data": {"me":[{"name":"Bob","age":25},{"name":"Bob","age":75},{"name":"Colin","age":25},{"name":"Elizabeth","age":25}]}}`,
},
{
"Offset in middle of second bucket",
`{
me(func: uid(10005, 10006, 10001, 10002, 10003, 10004, 10007, 10000), orderasc: name, orderasc: age, first: 3, offset: 4) {
name
age
}
}`,
`{"data": {"me":[{"name":"Bob","age":75},{"name":"Colin","age":25},{"name":"Elizabeth","age":25}]}}`,
},
{
"Offset equal to number of uids",
`{
me(func: uid(10005, 10006, 10001, 10002, 10003, 10004, 10007, 10000), orderasc: name, orderasc: age, first: 3, offset: 8) {
name
age
}
}`,
`{"data": {"me":[]}}`,
},
{
"Offset larger than records",
`{
me(func: uid(10005, 10006, 10001, 10002, 10003, 10004, 10007, 10000), orderasc: name, orderasc: age, first: 10, offset: 10000) {
name
age
}
}`,
`{"data": {"me":[]}}`,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
js := processToFastJsonNoErr(t, tt.query)
require.JSONEq(t, tt.result, js)
})
}
}

func TestFilterRootOverride(t *testing.T) {

query := `{
Expand Down
127 changes: 89 additions & 38 deletions worker/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ var emptySortResult pb.SortResult

type sortresult struct {
reply *pb.SortResult
vals [][]types.Val
err error
// For multi sort we apply the offset in two stages. In the first stage a part of the offset
// is applied but equal values in the bucket that the offset falls into are skipped. This
// slice stores the remaining offset for individual uid lists that must be applied after all
// multi sort is done.
// TODO (pawan) - Offset has type int32 whereas paginate function returns an int. We should
// use a common type so that we can avoid casts between the two.
multiSortOffsets []int32
vals [][]types.Val
err error
}

// SortOverNetwork sends sort query over the network.
Expand Down Expand Up @@ -99,43 +106,58 @@ var (
errDone = x.Errorf("Done processing buckets")
)

func resultWithError(err error) *sortresult {
return &sortresult{&emptySortResult, nil, nil, err}
}

func sortWithoutIndex(ctx context.Context, ts *pb.SortMessage) *sortresult {
span := otrace.FromContext(ctx)
span.Annotate(nil, "sortWithoutIndex")

n := len(ts.UidMatrix)
r := new(pb.SortResult)
multiSortVals := make([][]types.Val, n)
var multiSortOffsets []int32
// Sort and paginate directly as it'd be expensive to iterate over the index which
// might have millions of keys just for retrieving some values.
sType, err := schema.State().TypeOf(ts.Order[0].Attr)
if err != nil || !sType.IsScalar() {
return &sortresult{&emptySortResult, nil,
x.Errorf("Cannot sort attribute %s of type object.", ts.Order[0].Attr)}
return resultWithError(x.Errorf("Cannot sort attribute %s of type object.",
ts.Order[0].Attr))
}

for i := 0; i < n; i++ {
select {
case <-ctx.Done():
return &sortresult{&emptySortResult, nil, ctx.Err()}
return resultWithError(ctx.Err())
default:
// Copy, otherwise it'd affect the destUids and hence the srcUids of Next level.
tempList := &pb.List{Uids: ts.UidMatrix[i].Uids}
var vals []types.Val
if vals, err = sortByValue(ctx, ts, tempList, sType); err != nil {
return &sortresult{&emptySortResult, nil, err}
return resultWithError(err)
}
start, end, err := paginate(ts, tempList, vals)
if err != nil {
return &sortresult{&emptySortResult, nil, err}
return resultWithError(err)
}
if len(ts.Order) > 1 {
var offset int32
// Usually start would equal ts.Offset unless the values around the offset index
// (at offset-1, offset-2 index and so on) are equal. In that case we keep those
// values and apply the remaining offset later.
if int32(start) < ts.Offset {
offset = ts.Offset - int32(start)
}
multiSortOffsets = append(multiSortOffsets, offset)
}
tempList.Uids = tempList.Uids[start:end]
vals = vals[start:end]
r.UidMatrix = append(r.UidMatrix, tempList)
multiSortVals[i] = vals
}
}
return &sortresult{r, multiSortVals, nil}
return &sortresult{r, multiSortOffsets, multiSortVals, nil}
}

func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult {
Expand All @@ -157,12 +179,12 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult {
order := ts.Order[0]
typ, err := schema.State().TypeOf(order.Attr)
if err != nil {
return &sortresult{&emptySortResult, nil, fmt.Errorf("Attribute %s not defined in schema", order.Attr)}
return resultWithError(fmt.Errorf("Attribute %s not defined in schema", order.Attr))
}

// Get the tokenizers and choose the corresponding one.
if !schema.State().IsIndexed(order.Attr) {
return &sortresult{&emptySortResult, nil, x.Errorf("Attribute %s is not indexed.", order.Attr)}
return resultWithError(x.Errorf("Attribute %s is not indexed.", order.Attr))
}

tokenizers := schema.State().Tokenizer(order.Attr)
Expand All @@ -179,12 +201,12 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult {
// String type can have multiple tokenizers, only one of which is
// sortable.
if typ == types.StringID {
return &sortresult{&emptySortResult, nil,
x.Errorf("Attribute:%s does not have exact index for sorting.", order.Attr)}
return resultWithError(x.Errorf("Attribute:%s does not have exact index for sorting.",
order.Attr))
}
// Other types just have one tokenizer, so if we didn't find a
// sortable tokenizer, then attribute isn't sortable.
return &sortresult{&emptySortResult, nil, x.Errorf("Attribute:%s is not sortable.", order.Attr)}
return resultWithError(x.Errorf("Attribute:%s is not sortable.", order.Attr))
}

// Iterate over every bucket / token.
Expand Down Expand Up @@ -213,7 +235,7 @@ BUCKETS:
key := item.Key() // No need to copy.
select {
case <-ctx.Done():
return &sortresult{&emptySortResult, nil, ctx.Err()}
return resultWithError(ctx.Err())
default:
k := x.Parse(key)
if k == nil {
Expand All @@ -231,24 +253,26 @@ BUCKETS:
case errContinue:
// Continue iterating over tokens / index buckets.
default:
return &sortresult{&emptySortResult, nil, err}
return resultWithError(err)
}
}
}

var multiSortOffsets []int32
for _, il := range out {
r.UidMatrix = append(r.UidMatrix, il.ulist)
if len(ts.Order) > 1 {
// TODO - For lossy tokenizer, no need to pick all values.
values = append(values, il.values)
multiSortOffsets = append(multiSortOffsets, il.multiSortOffset)
}
}

select {
case <-ctx.Done():
return &sortresult{&emptySortResult, nil, ctx.Err()}
return resultWithError(ctx.Err())
default:
return &sortresult{r, values, nil}
return &sortresult{r, multiSortOffsets, values, nil}
}
}

Expand Down Expand Up @@ -355,9 +379,8 @@ func multiSort(ctx context.Context, r *sortresult, ts *pb.SortMessage) error {
return err
}
// Paginate
if len(ul.Uids) > int(ts.Count) {
ul.Uids = ul.Uids[:ts.Count]
}
start, end := x.PageRange(int(ts.Count), int(r.multiSortOffsets[i]), len(ul.Uids))
ul.Uids = ul.Uids[start:end]
r.reply.UidMatrix[i] = ul
}

Expand Down Expand Up @@ -390,6 +413,7 @@ func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error
return nil, x.Errorf("We do not yet support negative or infinite count with sorting: %s %d. "+
"Try flipping order and return first few elements instead.", ts.Order[0].Attr, ts.Count)
}
// TODO (pawan) - Why check only the first attribute, what if other attributes are of list type?
if schema.State().IsList(ts.Order[0].Attr) {
return nil, x.Errorf("Sorting not supported on attr: %s of type: [scalar]", ts.Order[0].Attr)
}
Expand Down Expand Up @@ -467,10 +491,11 @@ func fetchValues(ctx context.Context, in *pb.Query, idx int, or chan orderResult
}

type intersectedList struct {
offset int
ulist *pb.List
values []types.Val
uset map[uint64]struct{}
offset int
ulist *pb.List
values []types.Val
uset map[uint64]struct{}
multiSortOffset int32
}

// intersectBucket intersects every UID list in the UID matrix with the
Expand All @@ -496,7 +521,10 @@ func intersectBucket(ctx context.Context, ts *pb.SortMessage, token string,
// For each UID list, we need to intersect with the index bucket.
for i, ul := range ts.UidMatrix {
il := &out[i]
if count > 0 && len(il.ulist.Uids) >= count {
// We need to reduce multiSortOffset while checking the count as we might have included
// some extra uids from the bucket that the offset falls into. We are going to discard
// the first multiSortOffset number of uids later after all sorts are applied.
if count > 0 && len(il.ulist.Uids)-int(il.multiSortOffset) >= count {
continue
}

Expand Down Expand Up @@ -525,6 +553,7 @@ func intersectBucket(ctx context.Context, ts *pb.SortMessage, token string,

// We are within the page. We need to apply sorting.
// Sort results by value before applying offset.
// TODO (pawan) - Why do we do this? Looks like it it is only useful for language.
if vals, err = sortByValue(ctx, ts, result, scalar); err != nil {
return err
}
Expand All @@ -535,16 +564,22 @@ func intersectBucket(ctx context.Context, ts *pb.SortMessage, token string,

if il.offset > 0 {
// Apply the offset.
result.Uids = result.Uids[il.offset:n]
if len(ts.Order) > 1 {
vals = vals[il.offset:n]
if len(ts.Order) == 1 {
result.Uids = result.Uids[il.offset:n]
} else {
// In case of multi sort we can't apply the offset yet, as the order might change
// after other sort orders are applied. So we need to pick all the uids in the
// current bucket.
// Since we are picking all values in this bucket, we have to apply this remaining
// offset later and hence are storing it here.
il.multiSortOffset = int32(il.offset)
}
il.offset = 0
n = len(result.Uids)
}

// n is number of elements to copy from result to out.
// In case of multiple sort, we dont wan't to apply the count and copy all uids for the
// In case of multiple sort, we don't want to apply the count and copy all uids for the
// current bucket.
if count > 0 && (len(ts.Order) == 1) {
slack := count - len(il.ulist.Uids)
Expand All @@ -561,7 +596,9 @@ func intersectBucket(ctx context.Context, ts *pb.SortMessage, token string,

// Check out[i] sizes for all i.
for i := 0; i < len(ts.UidMatrix); i++ { // Iterate over UID lists.
if len(out[i].ulist.Uids) < count {
// We need to reduce multiSortOffset while checking the count as we might have included
// some extra uids earlier for the multi-sort case.
if len(out[i].ulist.Uids)-int(out[i].multiSortOffset) < count {
return errContinue
}

Expand Down Expand Up @@ -595,16 +632,30 @@ func paginate(ts *pb.SortMessage, dest *pb.List, vals []types.Val) (int, int, er
offset := int(ts.Offset)
start, end := x.PageRange(count, offset, len(dest.Uids))

// For multiple sort, we need to take all equal values at the end. So we update end.
for len(ts.Order) > 1 && end < len(dest.Uids) {
eq, err := types.Equal(vals[end-1], vals[end])
if err != nil {
return 0, 0, err
// For multiple sort, we need to take all equal values at the start and end.
// This is because the final sort order depends on other sort attributes and we can't ignore
// equal values at start or the end.
if len(ts.Order) > 1 {
for start < len(vals) && start > 0 {
eq, err := types.Equal(vals[start], vals[start-1])
if err != nil {
return 0, 0, err
}
if !eq {
break
}
start--
}
if !eq {
break
for end < len(dest.Uids) {
eq, err := types.Equal(vals[end-1], vals[end])
if err != nil {
return 0, 0, err
}
if !eq {
break
}
end++
}
end++
}

return start, end, nil
Expand Down

0 comments on commit f24d2da

Please sign in to comment.