From b34783a4d7a8c88204e0f44bd411795d8267d811 Mon Sep 17 00:00:00 2001 From: Christopher Wilcox Date: Fri, 25 Jun 2021 11:19:16 -0700 Subject: [PATCH] feat(firestore): Add support for PartitionQuery (#4206) --- firestore/collgroupref.go | 148 +++++++++++++++++++++++++++++++++ firestore/collgroupref_test.go | 73 ++++++++++++++++ firestore/integration_test.go | 85 +++++++++++++++++++ firestore/order.go | 8 ++ 4 files changed, 314 insertions(+) create mode 100644 firestore/collgroupref_test.go diff --git a/firestore/collgroupref.go b/firestore/collgroupref.go index e43a7e648c54..c13ff1f160b8 100644 --- a/firestore/collgroupref.go +++ b/firestore/collgroupref.go @@ -14,6 +14,16 @@ package firestore +import ( + "context" + "errors" + "fmt" + "sort" + + "google.golang.org/api/iterator" + firestorepb "google.golang.org/genproto/googleapis/firestore/v1" +) + // A CollectionGroupRef is a reference to a group of collections sharing the // same ID. type CollectionGroupRef struct { @@ -36,3 +46,141 @@ func newCollectionGroupRef(c *Client, dbPath, collectionID string) *CollectionGr }, } } + +// GetPartitionedQueries returns a slice of Query objects, each containing a +// partition of a collection group. partitionCount must be a positive value and +// the number of returned partitions may be less than the requested number if +// providing the desired number would result in partitions with very few documents. +// +// If a Collection Group Query would return a large number of documents, this +// can help to subdivide the query to smaller working units that can be distributed. +func (cgr CollectionGroupRef) GetPartitionedQueries(ctx context.Context, partitionCount int) ([]Query, error) { + qp, err := cgr.getPartitions(ctx, partitionCount) + if err != nil { + return nil, err + } + queries := make([]Query, len(qp)) + for _, part := range qp { + queries = append(queries, part.toQuery()) + } + return queries, nil +} + +// getPartitions returns a slice of queryPartition objects, describing a start +// and end range to query a subsection of the collection group. partitionCount +// must be a positive value and the number of returned partitions may be less +// than the requested number if providing the desired number would result in +// partitions with very few documents. +func (cgr CollectionGroupRef) getPartitions(ctx context.Context, partitionCount int) ([]queryPartition, error) { + orderedQuery := cgr.query().OrderBy(DocumentID, Asc) + + if partitionCount <= 0 { + return nil, errors.New("a positive partitionCount must be provided") + } else if partitionCount == 1 { + return []queryPartition{{CollectionGroupQuery: orderedQuery}}, nil + } + + db := cgr.c.path() + ctx = withResourceHeader(ctx, db) + + // CollectionGroup Queries need to be ordered by __name__ ASC. + query, err := orderedQuery.toProto() + if err != nil { + return nil, err + } + structuredQuery := &firestorepb.PartitionQueryRequest_StructuredQuery{ + StructuredQuery: query, + } + + // Uses default PageSize + pbr := &firestorepb.PartitionQueryRequest{ + Parent: db + "/documents", + PartitionCount: int64(partitionCount), + QueryType: structuredQuery, + } + cursorReferences := make([]*firestorepb.Value, 0, partitionCount) + iter := cgr.c.c.PartitionQuery(ctx, pbr) + for { + cursor, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("GetPartitions: %v", err) + } + cursorReferences = append(cursorReferences, cursor.GetValues()...) + } + + // From Proto documentation: + // To obtain a complete result set ordered with respect to the results of the + // query supplied to PartitionQuery, the results sets should be merged: + // cursor A, cursor B, cursor M, cursor Q, cursor U, cursor W + // Once we have exhausted the pages, the cursor values need to be sorted in + // lexicographical order by segment (areas between '/'). + sort.Sort(byFirestoreValue(cursorReferences)) + + queryPartitions := make([]queryPartition, 0, len(cursorReferences)) + previousCursor := "" + + for _, cursor := range cursorReferences { + cursorRef := cursor.GetReferenceValue() + + // remove the root path from the reference, as queries take cursors + // relative to a collection + cursorRef = cursorRef[len(orderedQuery.path)+1:] + + qp := queryPartition{ + CollectionGroupQuery: orderedQuery, + StartAt: previousCursor, + EndBefore: cursorRef, + } + queryPartitions = append(queryPartitions, qp) + previousCursor = cursorRef + } + + // In the case there were no partitions, we still add a single partition to + // the result, that covers the complete range. + lastPart := queryPartition{CollectionGroupQuery: orderedQuery} + if len(cursorReferences) > 0 { + cursorRef := cursorReferences[len(cursorReferences)-1].GetReferenceValue() + lastPart.StartAt = cursorRef[len(orderedQuery.path)+1:] + } + queryPartitions = append(queryPartitions, lastPart) + + return queryPartitions, nil +} + +// queryPartition provides a Collection Group Reference and start and end split +// points allowing for a section of a collection group to be queried. This is +// used by GetPartitions which, given a CollectionGroupReference returns smaller +// sub-queries or partitions +type queryPartition struct { + // CollectionGroupQuery is an ordered query on a CollectionGroupReference. + // This query must be ordered Asc on __name__. + // Example: client.CollectionGroup("collectionID").query().OrderBy(DocumentID, Asc) + CollectionGroupQuery Query + + // StartAt is a document reference value, relative to the collection, not + // a complete parent path. + // Example: "documents/collectionName/documentName" + StartAt string + + // EndBefore is a document reference value, relative to the collection, not + // a complete parent path. + // Example: "documents/collectionName/documentName" + EndBefore string +} + +// toQuery converts a queryPartition object to a Query object +func (qp queryPartition) toQuery() Query { + q := *qp.CollectionGroupQuery.query() + + // Remove the leading path before calling StartAt, EndBefore + if qp.StartAt != "" { + q = q.StartAt(qp.StartAt) + } + if qp.EndBefore != "" { + q = q.EndBefore(qp.EndBefore) + } + return q +} diff --git a/firestore/collgroupref_test.go b/firestore/collgroupref_test.go new file mode 100644 index 000000000000..bbcbfe4ebb62 --- /dev/null +++ b/firestore/collgroupref_test.go @@ -0,0 +1,73 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +import ( + "context" + "testing" +) + +func TestCGR_TestQueryPartition_ToQuery(t *testing.T) { + cgr := newCollectionGroupRef(testClient, testClient.path(), "collectionID") + qp := queryPartition{ + CollectionGroupQuery: cgr.Query.OrderBy(DocumentID, Asc), + StartAt: "documents/start/at", + EndBefore: "documents/end/before", + } + + got := qp.toQuery() + + want := Query{ + c: testClient, + path: "projects/projectID/databases/(default)", + parentPath: "projects/projectID/databases/(default)/documents", + collectionID: "collectionID", + startVals: []interface{}{"documents/start/at"}, + endVals: []interface{}{"documents/end/before"}, + startBefore: true, + endBefore: true, + allDescendants: true, + orders: []order{{fieldPath: []string{"__name__"}, dir: 1}}, + } + + if !testEqual(got, want) { + t.Errorf("got %+v, want %+v", got, want) + } +} + +func TestCGR_TestGetPartitions(t *testing.T) { + cgr := newCollectionGroupRef(testClient, testClient.path(), "collectionID") + _, err := cgr.getPartitions(context.Background(), 0) + if err == nil { + t.Error("Expected an error when requested partition count is < 1") + } + + parts, err := cgr.getPartitions(context.Background(), 1) + if err != nil { + t.Error("Didn't expect an error when requested partition count is 1") + } + if len(parts) != 1 { + t.Fatal("Expected 1 queryPartition") + } + got := parts[0] + want := queryPartition{ + CollectionGroupQuery: cgr.Query.OrderBy(DocumentID, Asc), + StartAt: "", + EndBefore: "", + } + if !testEqual(got, want) { + t.Errorf("got %+v, want %+v", got, want) + } +} diff --git a/firestore/integration_test.go b/firestore/integration_test.go index e398150f84fa..ca841858f988 100644 --- a/firestore/integration_test.go +++ b/firestore/integration_test.go @@ -1601,3 +1601,88 @@ func TestDetectProjectID(t *testing.T) { t.Errorf("expected an error while using TokenSource that does not have a project ID") } } + +func TestIntegration_ColGroupRefPartitions(t *testing.T) { + h := testHelper{t} + coll := integrationColl(t) + ctx := context.Background() + + // Create a doc in the test collection so a collectionID is live for testing + doc := coll.NewDoc() + h.mustCreate(doc, integrationTestMap) + + for _, tc := range []struct { + collectionID string + expectedPartitionCount int + }{ + // Verify no failures if a collection doesn't exist + {collectionID: "does-not-exist", expectedPartitionCount: 1}, + // Verify a collectionID with a small number of results returns a partition + {collectionID: coll.collectionID, expectedPartitionCount: 1}, + } { + colGroup := iClient.CollectionGroup(tc.collectionID) + partitions, err := colGroup.getPartitions(ctx, 10) + if err != nil { + t.Fatalf("getPartitions: received unexpected error: %v", err) + } + if got, want := len(partitions), tc.expectedPartitionCount; got != want { + t.Errorf("Unexpected Partition Count: got %d, want %d", got, want) + } + } +} + +func TestIntegration_ColGroupRefPartitionsLarge(t *testing.T) { + // Create collection with enough documents to have multiple partitions. + coll := integrationColl(t) + collectionID := coll.collectionID + "largeCollection" + coll = iClient.Collection(collectionID) + + ctx := context.Background() + + documentCount := 2*128 + 127 // Minimum partition size is 128. + + // Create documents in a collection sufficient to trigger multiple partitions. + batch := iClient.Batch() + deleteBatch := iClient.Batch() + for i := 0; i < documentCount; i++ { + doc := coll.Doc(fmt.Sprintf("doc%d", i)) + batch.Create(doc, integrationTestMap) + deleteBatch.Delete(doc) + } + batch.Commit(ctx) + defer deleteBatch.Commit(ctx) + + // Verify that we retrieve 383 documents for the colGroup (128*2 + 127) + colGroup := iClient.CollectionGroup(collectionID) + docs, err := colGroup.Documents(ctx).GetAll() + if err != nil { + t.Fatalf("GetAll(): received unexpected error: %v", err) + } + if got, want := len(docs), documentCount; got != want { + t.Errorf("Unexpected number of documents in collection group: got %d, want %d", got, want) + } + + // Get partitions, allow up to 10 to come back, expect less will be returned. + partitions, err := colGroup.GetPartitionedQueries(ctx, 10) + if err != nil { + t.Fatalf("GetPartitionedQueries: received unexpected error: %v", err) + } + if len(partitions) < 2 { + t.Errorf("Unexpected Partition Count. Expected 2 or more: got %d, want 2+", len(partitions)) + } + + // Verify that we retrieve 383 documents across all partitions. (128*2 + 127) + totalCount := 0 + for _, query := range partitions { + + allDocs, err := query.Documents(ctx).GetAll() + if err != nil { + t.Fatalf("GetAll(): received unexpected error: %v", err) + } + totalCount += len(allDocs) + } + + if got, want := totalCount, documentCount; got != want { + t.Errorf("Unexpected number of documents across partitions: got %d, want %d", got, want) + } +} diff --git a/firestore/order.go b/firestore/order.go index e5ee1e09fb53..c495a141fd36 100644 --- a/firestore/order.go +++ b/firestore/order.go @@ -22,6 +22,7 @@ import ( "strings" tspb "github.com/golang/protobuf/ptypes/timestamp" + firestorepb "google.golang.org/genproto/googleapis/firestore/v1" pb "google.golang.org/genproto/googleapis/firestore/v1" ) @@ -214,3 +215,10 @@ func typeOrder(v *pb.Value) int { panic(fmt.Sprintf("bad value type: %v", v)) } } + +// byReferenceValue implements sort.Interface for []*firestorepb.Value +type byFirestoreValue []*firestorepb.Value + +func (a byFirestoreValue) Len() int { return len(a) } +func (a byFirestoreValue) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byFirestoreValue) Less(i, j int) bool { return compareValues(a[i], a[j]) < 0 }