Skip to content

Commit

Permalink
feat(firestore): Add support for PartitionQuery (#4206)
Browse files Browse the repository at this point in the history
  • Loading branch information
crwilcox authored Jun 25, 2021
1 parent 267787e commit b34783a
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 0 deletions.
148 changes: 148 additions & 0 deletions firestore/collgroupref.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@

package firestore

import (
"context"
"errors"
"fmt"
"sort"

"google.golang.org/api/iterator"
firestorepb "google.golang.org/genproto/googleapis/firestore/v1"
)

// A CollectionGroupRef is a reference to a group of collections sharing the
// same ID.
type CollectionGroupRef struct {
Expand All @@ -36,3 +46,141 @@ func newCollectionGroupRef(c *Client, dbPath, collectionID string) *CollectionGr
},
}
}

// GetPartitionedQueries returns a slice of Query objects, each containing a
// partition of a collection group. partitionCount must be a positive value and
// the number of returned partitions may be less than the requested number if
// providing the desired number would result in partitions with very few documents.
//
// If a Collection Group Query would return a large number of documents, this
// can help to subdivide the query to smaller working units that can be distributed.
func (cgr CollectionGroupRef) GetPartitionedQueries(ctx context.Context, partitionCount int) ([]Query, error) {
qp, err := cgr.getPartitions(ctx, partitionCount)
if err != nil {
return nil, err
}
queries := make([]Query, len(qp))
for _, part := range qp {
queries = append(queries, part.toQuery())
}
return queries, nil
}

// getPartitions returns a slice of queryPartition objects, describing a start
// and end range to query a subsection of the collection group. partitionCount
// must be a positive value and the number of returned partitions may be less
// than the requested number if providing the desired number would result in
// partitions with very few documents.
func (cgr CollectionGroupRef) getPartitions(ctx context.Context, partitionCount int) ([]queryPartition, error) {
orderedQuery := cgr.query().OrderBy(DocumentID, Asc)

if partitionCount <= 0 {
return nil, errors.New("a positive partitionCount must be provided")
} else if partitionCount == 1 {
return []queryPartition{{CollectionGroupQuery: orderedQuery}}, nil
}

db := cgr.c.path()
ctx = withResourceHeader(ctx, db)

// CollectionGroup Queries need to be ordered by __name__ ASC.
query, err := orderedQuery.toProto()
if err != nil {
return nil, err
}
structuredQuery := &firestorepb.PartitionQueryRequest_StructuredQuery{
StructuredQuery: query,
}

// Uses default PageSize
pbr := &firestorepb.PartitionQueryRequest{
Parent: db + "/documents",
PartitionCount: int64(partitionCount),
QueryType: structuredQuery,
}
cursorReferences := make([]*firestorepb.Value, 0, partitionCount)
iter := cgr.c.c.PartitionQuery(ctx, pbr)
for {
cursor, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("GetPartitions: %v", err)
}
cursorReferences = append(cursorReferences, cursor.GetValues()...)
}

// From Proto documentation:
// To obtain a complete result set ordered with respect to the results of the
// query supplied to PartitionQuery, the results sets should be merged:
// cursor A, cursor B, cursor M, cursor Q, cursor U, cursor W
// Once we have exhausted the pages, the cursor values need to be sorted in
// lexicographical order by segment (areas between '/').
sort.Sort(byFirestoreValue(cursorReferences))

queryPartitions := make([]queryPartition, 0, len(cursorReferences))
previousCursor := ""

for _, cursor := range cursorReferences {
cursorRef := cursor.GetReferenceValue()

// remove the root path from the reference, as queries take cursors
// relative to a collection
cursorRef = cursorRef[len(orderedQuery.path)+1:]

qp := queryPartition{
CollectionGroupQuery: orderedQuery,
StartAt: previousCursor,
EndBefore: cursorRef,
}
queryPartitions = append(queryPartitions, qp)
previousCursor = cursorRef
}

// In the case there were no partitions, we still add a single partition to
// the result, that covers the complete range.
lastPart := queryPartition{CollectionGroupQuery: orderedQuery}
if len(cursorReferences) > 0 {
cursorRef := cursorReferences[len(cursorReferences)-1].GetReferenceValue()
lastPart.StartAt = cursorRef[len(orderedQuery.path)+1:]
}
queryPartitions = append(queryPartitions, lastPart)

return queryPartitions, nil
}

// queryPartition provides a Collection Group Reference and start and end split
// points allowing for a section of a collection group to be queried. This is
// used by GetPartitions which, given a CollectionGroupReference returns smaller
// sub-queries or partitions
type queryPartition struct {
// CollectionGroupQuery is an ordered query on a CollectionGroupReference.
// This query must be ordered Asc on __name__.
// Example: client.CollectionGroup("collectionID").query().OrderBy(DocumentID, Asc)
CollectionGroupQuery Query

// StartAt is a document reference value, relative to the collection, not
// a complete parent path.
// Example: "documents/collectionName/documentName"
StartAt string

// EndBefore is a document reference value, relative to the collection, not
// a complete parent path.
// Example: "documents/collectionName/documentName"
EndBefore string
}

// toQuery converts a queryPartition object to a Query object
func (qp queryPartition) toQuery() Query {
q := *qp.CollectionGroupQuery.query()

// Remove the leading path before calling StartAt, EndBefore
if qp.StartAt != "" {
q = q.StartAt(qp.StartAt)
}
if qp.EndBefore != "" {
q = q.EndBefore(qp.EndBefore)
}
return q
}
73 changes: 73 additions & 0 deletions firestore/collgroupref_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package firestore

import (
"context"
"testing"
)

func TestCGR_TestQueryPartition_ToQuery(t *testing.T) {
cgr := newCollectionGroupRef(testClient, testClient.path(), "collectionID")
qp := queryPartition{
CollectionGroupQuery: cgr.Query.OrderBy(DocumentID, Asc),
StartAt: "documents/start/at",
EndBefore: "documents/end/before",
}

got := qp.toQuery()

want := Query{
c: testClient,
path: "projects/projectID/databases/(default)",
parentPath: "projects/projectID/databases/(default)/documents",
collectionID: "collectionID",
startVals: []interface{}{"documents/start/at"},
endVals: []interface{}{"documents/end/before"},
startBefore: true,
endBefore: true,
allDescendants: true,
orders: []order{{fieldPath: []string{"__name__"}, dir: 1}},
}

if !testEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
}
}

func TestCGR_TestGetPartitions(t *testing.T) {
cgr := newCollectionGroupRef(testClient, testClient.path(), "collectionID")
_, err := cgr.getPartitions(context.Background(), 0)
if err == nil {
t.Error("Expected an error when requested partition count is < 1")
}

parts, err := cgr.getPartitions(context.Background(), 1)
if err != nil {
t.Error("Didn't expect an error when requested partition count is 1")
}
if len(parts) != 1 {
t.Fatal("Expected 1 queryPartition")
}
got := parts[0]
want := queryPartition{
CollectionGroupQuery: cgr.Query.OrderBy(DocumentID, Asc),
StartAt: "",
EndBefore: "",
}
if !testEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
}
}
85 changes: 85 additions & 0 deletions firestore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,3 +1601,88 @@ func TestDetectProjectID(t *testing.T) {
t.Errorf("expected an error while using TokenSource that does not have a project ID")
}
}

func TestIntegration_ColGroupRefPartitions(t *testing.T) {
h := testHelper{t}
coll := integrationColl(t)
ctx := context.Background()

// Create a doc in the test collection so a collectionID is live for testing
doc := coll.NewDoc()
h.mustCreate(doc, integrationTestMap)

for _, tc := range []struct {
collectionID string
expectedPartitionCount int
}{
// Verify no failures if a collection doesn't exist
{collectionID: "does-not-exist", expectedPartitionCount: 1},
// Verify a collectionID with a small number of results returns a partition
{collectionID: coll.collectionID, expectedPartitionCount: 1},
} {
colGroup := iClient.CollectionGroup(tc.collectionID)
partitions, err := colGroup.getPartitions(ctx, 10)
if err != nil {
t.Fatalf("getPartitions: received unexpected error: %v", err)
}
if got, want := len(partitions), tc.expectedPartitionCount; got != want {
t.Errorf("Unexpected Partition Count: got %d, want %d", got, want)
}
}
}

func TestIntegration_ColGroupRefPartitionsLarge(t *testing.T) {
// Create collection with enough documents to have multiple partitions.
coll := integrationColl(t)
collectionID := coll.collectionID + "largeCollection"
coll = iClient.Collection(collectionID)

ctx := context.Background()

documentCount := 2*128 + 127 // Minimum partition size is 128.

// Create documents in a collection sufficient to trigger multiple partitions.
batch := iClient.Batch()
deleteBatch := iClient.Batch()
for i := 0; i < documentCount; i++ {
doc := coll.Doc(fmt.Sprintf("doc%d", i))
batch.Create(doc, integrationTestMap)
deleteBatch.Delete(doc)
}
batch.Commit(ctx)
defer deleteBatch.Commit(ctx)

// Verify that we retrieve 383 documents for the colGroup (128*2 + 127)
colGroup := iClient.CollectionGroup(collectionID)
docs, err := colGroup.Documents(ctx).GetAll()
if err != nil {
t.Fatalf("GetAll(): received unexpected error: %v", err)
}
if got, want := len(docs), documentCount; got != want {
t.Errorf("Unexpected number of documents in collection group: got %d, want %d", got, want)
}

// Get partitions, allow up to 10 to come back, expect less will be returned.
partitions, err := colGroup.GetPartitionedQueries(ctx, 10)
if err != nil {
t.Fatalf("GetPartitionedQueries: received unexpected error: %v", err)
}
if len(partitions) < 2 {
t.Errorf("Unexpected Partition Count. Expected 2 or more: got %d, want 2+", len(partitions))
}

// Verify that we retrieve 383 documents across all partitions. (128*2 + 127)
totalCount := 0
for _, query := range partitions {

allDocs, err := query.Documents(ctx).GetAll()
if err != nil {
t.Fatalf("GetAll(): received unexpected error: %v", err)
}
totalCount += len(allDocs)
}

if got, want := totalCount, documentCount; got != want {
t.Errorf("Unexpected number of documents across partitions: got %d, want %d", got, want)
}
}
8 changes: 8 additions & 0 deletions firestore/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

tspb "github.com/golang/protobuf/ptypes/timestamp"
firestorepb "google.golang.org/genproto/googleapis/firestore/v1"
pb "google.golang.org/genproto/googleapis/firestore/v1"
)

Expand Down Expand Up @@ -214,3 +215,10 @@ func typeOrder(v *pb.Value) int {
panic(fmt.Sprintf("bad value type: %v", v))
}
}

// byReferenceValue implements sort.Interface for []*firestorepb.Value
type byFirestoreValue []*firestorepb.Value

func (a byFirestoreValue) Len() int { return len(a) }
func (a byFirestoreValue) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byFirestoreValue) Less(i, j int) bool { return compareValues(a[i], a[j]) < 0 }

0 comments on commit b34783a

Please sign in to comment.