Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BulkWriter #1055

Merged
merged 7 commits into from
Apr 29, 2020
Merged

Add BulkWriter #1055

merged 7 commits into from
Apr 29, 2020

Conversation

thebrianchen
Copy link

No description provided.

@thebrianchen thebrianchen self-assigned this Apr 29, 2020
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Apr 29, 2020
@codecov
Copy link

codecov bot commented Apr 29, 2020

Codecov Report

Merging #1055 into node10 will decrease coverage by 0.02%.
The diff coverage is 99.05%.

Impacted file tree graph

@@            Coverage Diff             @@
##           node10    #1055      +/-   ##
==========================================
- Coverage   98.65%   98.63%   -0.03%     
==========================================
  Files          25       27       +2     
  Lines       16620    17460     +840     
  Branches     1331     1381      +50     
==========================================
+ Hits        16397    17222     +825     
- Misses        220      235      +15     
  Partials        3        3              
Impacted Files Coverage Δ
dev/src/index.ts 98.33% <97.50%> (-0.03%) ⬇️
dev/src/bulk-writer.ts 98.78% <98.78%> (ø)
dev/src/backoff.ts 100.00% <100.00%> (ø)
dev/src/document.ts 98.93% <100.00%> (-0.03%) ⬇️
dev/src/rate-limiter.ts 100.00% <100.00%> (ø)
dev/src/types.ts 99.70% <100.00%> (+0.01%) ⬆️
dev/src/write-batch.ts 100.00% <100.00%> (ø)
dev/src/field-value.ts 97.00% <0.00%> (-1.13%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0b2ea39...1117172. Read the comment docs.

Copy link
Contributor

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I don't need to review all lines here again, can you:

  • Add the .vscode file to master
  • Reset this branch to node10
  • Merge master into this branch
  • Create a commit with all merge conflicts
  • Resolve the merge conflicts and lint errors in a separate commit. This will be the only commit I need to review.

@schmidt-sebastian schmidt-sebastian removed their assignment Apr 29, 2020
@thebrianchen thebrianchen force-pushed the bc/bulk-node10 branch 2 times, most recently from 118c524 to 54a2ae4 Compare April 29, 2020 20:18
@@ -1259,7 +1259,6 @@ describe('v1.FirestoreClient', () => {
new protos.google.firestore.v1.WriteRequest()
);
request.database = '';
const expectedHeaderRequestParams = 'database=';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please exclude the generated test files as the node10 branch contains the more recent versions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@thebrianchen thebrianchen merged commit 955f858 into node10 Apr 29, 2020
@thebrianchen thebrianchen deleted the bc/bulk-node10 branch April 29, 2020 22:50
schmidt-sebastian added a commit that referenced this pull request Jun 24, 2020
* fix!: mark v1beta1 client as deprecated (#937)

* feat!: use QueryDocumentSnapshot in FirestoreDataConverter (#965)

* deps: update to gts 2.x (#1013)

* chore!: update settings for Node 10 (#1019)

* deps: drop through2 (#1014)

* feat: support BigInt (#1016)

* fix: make update.sh work on Linux (#1043)

* fix: only use BigInt in BigInt system test (#1044)

* fix: make pbjs compile admin proto again (#1045)

* Add BulkWriter (#1055)

* docs: Add documentation for FirestoreDataConverter (#1059)

* chore: enforce return types (#1065)

* fix: add generic to Firestore.getAll() (#1066)

* chore: remove internal WriteOp (#1067)

* chore: add linter checks for it|describe.only (#1068)

* fix: handle terminate in BulkWriter (#1070)

* chore: run template copying last in synthtool (#1071)

* feat: Firestore Bundles implementation (#1078)

* feat: add support for set() with SetOptions when using FirestoreDataConverter (#1087)

* feat: Add totalDocuments and totalBytes to bundle metadata. (#1085)

* feat: Add totalDocuments and totalBytes to bundle metadata.

* fix: Better comment

* fix: Better testing.

* fix: Improve metadata testing.

* fix: incomplete expect in rate-limiter test (#1092)

* Remove BatchWrite proto, fix conformance tests

* chore: use public API types internally (#1100)

* feat: add Partition and BatchWrite protos (#1110)

* fix: remove GCF transaction fallback (#1112)

* fix: add BulkWriter integration tests, java backport changes, delete fix (#1117)

* chore: merge master (#1218)

* chore: add eslint check for console.log statements (#1229)

* fix: another attempt at fixing the flaky BulkWriter test (#1228)

* Fix comment

* Renames

* Test fix

* Fix unit tests

Co-authored-by: Brian Chen <[email protected]>
Co-authored-by: wu-hui <[email protected]>
@jakeleventhal
Copy link

is there any documentation for how to use this?
@schmidt-sebastian

@schmidt-sebastian
Copy link
Contributor

@jakeleventhal Not apart from API documentation. We will a more comprehensive guide when this lands in more languages (Java is next).

From an API perspective, it is similar to WriteBatch. The usage is a bit different: BulkWriter will periodically send requests as they are scheduled (via set(), update(), etc). You can use flush() to forcefully send requests that were already queued (optional), but you must call close() when you are done issuing requests.
flush() and done() return Promises indicating when all writes in their scope have finished (successful or not). The status for each write is returned via the Promise returned by the individual operations.

@jakeleventhal
Copy link

jakeleventhal commented Jul 15, 2020

What is the purpose of the bulk writer then if it is not making atomic updates?
Is this not the same thing as just doing?

const updates = [];
things.forEach(thing => {
   updates.push(firestore.collection('Things').add(thing));
});

await Promise.all(updates);

@jakeleventhal
Copy link

@schmidt-sebastian

@thebrianchen
Copy link
Author

@jakeleventhal BulkWriter will allow you to perform large numbers of writes in parallel, such as in the data ingestion use case. Without the atomicity bottleneck of WriteBatch or Transaction, BulkWriter should have higher throughput and reduced contention errors.

@jakeleventhal
Copy link

@thebrianchen

just to clarify, bulk writer is NOT atomic.
also are you referring to the data ingestion use case that i described in my recent comment?

i'm still not seeing how the bulk writer is any different

@thebrianchen
Copy link
Author

just to clarify, bulk writer is NOT atomic.

That is correct -- the BulkWriter is not atomic.

also are you referring to the data ingestion use case that i described in my recent comment?

For small numbers of writes, there is not that big of difference between writing in a loop (like the code sample in your recent comment) or using the BulkWriter. However, with large numbers of writes, the for loop will run into contention errors, unless you write your own throttling and error handling/retry code. BulkWriter's main benefit is that it has built in in 500/50/5 throttling and will automatically retry contention errors for you.

@schmidt-sebastian
Copy link
Contributor

On top of what @thebrianchen said, your write throughput with BulkWriter will be higher since multiple writes will be processed by the same backend, which reduces the amount of work that the backend has to perform for each requests (e.g. some internal lookups are now cached). The reason we initially suggested using WriteBatch for large updates was this affinity, but in most cases the overhead due to atomicity outweighed the benefits of a warm cache.

@jakeleventhal
Copy link

thanks for the explanation! @schmidt-sebastian @thebrianchen

@kachler
Copy link

kachler commented Jul 28, 2020

This thread has been quite beneficial, but I'm still a little fuzzy about the proper usage. Does BulkWriter have a maximum number of writes per batch, like the 500 document limit on WriteBatch, or does it handle that logic internally?

For example, let's say we need to update a timestamp field on 20,000+ documents, would the best practice be something similar to this:

async function bulkWriter() {
    try {
        const updateFields = {
            timestamp: '2020-12-31T23:59:59.999Z',
        };

        const firestore = new Firestore();
        let bulkWriter = firestore.bulkWriter();
    
        docIds.forEach(id => { 
        const documentRef = firestore.doc(`${collectionName}/${id}`);
        bulkWriter.update(documentRef, updateFields);
        });
        
        await bulkWriter.close().then(() => {
          console.log('Executed all writes');
        });
    } catch(error) {
        console.error(error);
    };
};

Or should flush() be invoked periodically? If we should be using flush(), what are the threshold recommendations?

@thebrianchen
Copy link
Author

Does BulkWriter have a maximum number of writes per batch, like the 500 document limit on WriteBatch, or does it handle that logic internally?

BulkWriter automatically batches and send writes for you, so you don't have to worry about batching writes. Your code sample looks good -- my only recommendation would be to add a .catch() to the bulkWriter.update() promise so that you can handle any failed writes.

Or should flush() be invoked periodically? If we should be using flush(), what are the threshold recommendations?

You don't not need to periodically call flush() if you call close() at the end of your writes. flush()'s main purpose is to allow you to track the progress of the bulkWriter's writes.

@kachler
Copy link

kachler commented Jul 29, 2020

Thank you for your help @thebrianchen! One last question, if we call flush() periodically, do we need to call something at the end of the writes?

@schmidt-sebastian
Copy link
Contributor

You need to either call flush() or close() at the end of all writes. We tried to mimic Streaming APIs (such as the one used in Java). flush() is optional, close() is not. The implementation detail here is that a final flush() can act as a close().

@timofei-iatsenko
Copy link

Hi, this thread was really helpful to understand WriteBatch vs BulkWriter. May be would be better to create some documentation explaining this?

By the way, I have a question about WriteBatch vs BulkWriter, we used WriteBatch for a while, we have about 1000 rows to update every 5 minutes. And it works fine except of atomicity. We really don't need, even more, it's bad for us when the whole batch is rejected when only one row is corrupted (deleted for example).

So we decided to switch to BulkWriter, this seems work, but the same job take much more time and some times 'Bandwith Exhausted' error appears.

Are we doing right? Should we use BulkWriter or WriteBatch in this case? Which one is faster and less demanding on resources?

@thebrianchen
Copy link
Author

@thekip Thanks for sharing your feedback and writing in!

We're still in the midst of refining this feature, and will have better documentation out soon!

Are you using BulkWriter with the default throttling enabled? When you hit the Firestore rate limits, you'll run into the "Bandwidth Exhausted" errors. Ideally, you should use BulkWriter with throttling enabled and specify a retry function with the BulkWriter.onWriteError() callback. This way, BulkWriter will automatically retry failed writes for you. Depending on your workload, either BulkWriter or WriteBatch could be faster, but we are working on improving the performance of BulkWriter. Since BulkWriter performs your writes in parallel (rather than atomically), it should have faster performance and require less resources.

If you post a code snippet, I'd be happy to help you with your use case!

@jakeleventhal
Copy link

@schmidt-sebastian when can we expect no restriction on batch size for atomic updates?

@timofei-iatsenko
Copy link

@thebrianchen thanks for the answer. The code snippet is staight forward:

    const firestoreBulkWriter = this.firestore.bulkWriter();

    campaigns.forEach((campaign) => {
      const { id, ...data } = campaign;

      firestoreBulkWriter.update(
        this.firestore.collection('campaigns').doc(id),
        data,
      ).catch((e) => {
        functions.logger.error(e);

        this.errors.push({
          message: `Live: Error writing campaign: ${campaign.campaign_name} [${campaign.id}]`,
          originalError: e.message,
        });
      });
    });

    await firestoreBulkWriter.close();
    functions.logger.info('Live campaigns updated');

I just creating a BulkWriter with default params and updating documents.

@thebrianchen
Copy link
Author

@thekip What you have looks good! Another addition you can make is to use the onWriteError handler to retry exhausted errors more than the default setting of 10 times. The promise of the original write will be resolved with the result of the final retry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants