Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(xsnap): stream snapshots over process pipe #7541

Merged
merged 9 commits into from
Apr 30, 2023

Conversation

mhofman
Copy link
Member

@mhofman mhofman commented Apr 27, 2023

closes: #6363
refs: agoric-labs/xsnap-pub#39

Description

This introduces support for streaming the xsnap snapshot over an stdio pipe with the worker process, removing the need to transition through the filesystem.

Security Considerations

This ultimately might allow use to remove fs permissions from the xsnap-worker

Scaling Considerations

This should improve the performance of snapshot load and save, and enable future optimizations like teeing a snapshot we're making into a new worker will compressing it and saving it in parallel. See #6943

Documentation Considerations

Internal implementation detail triggered by new option (on by default.

Testing Considerations

Updated the xsnap snapshot test to exercise various combinations of save / load using fs or pipe, as well as making sure that multiple snapshot in a row over the pipe behave as expected.

@mhofman mhofman requested a review from warner April 27, 2023 22:40
@mhofman mhofman added the force:integration Force integration tests to run on PR label Apr 27, 2023
Copy link
Member

@warner warner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok, I never understood the baton bits well enough to read this properly, but I agree with the parts that I do understand: if snapshotUseFs is false (which is the default), then we send snapshots by writing them to a specific fd, and we read snapshots on a separate (fixed) fd. The snapshot reads follow a sequence of:

  • prepare a snapshot reader to collect bytes arriving on FD N
  • write "please send a snapshot on FD N" on the command pipe
  • wait until the command-response pipe returns an ack, with a size
  • wait until the snapshot reader has seen that many bytes
  • fire the promise that was returned earlier by makeSnapshot

If it really does work that way, then this sounds good to me.

packages/xsnap/src/xsnap.js Show resolved Hide resolved
});
const postStart = await (snapshotStream &&
(snapshotUseFs
? async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a pretty big trinary expression.. is there an easy way to define a pair of functions earlier and then make the ?: fit on a single line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did that and a little more

@@ -146,7 +175,17 @@ export async function xsnap(options) {
console.log('XSNAP_DEBUG_RR', { bin, args });
}
const xsnapProcess = spawn(bin, args, {
stdio: ['ignore', stdout, stderr, 'pipe', 'pipe'],
stdio: [
'ignore',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add comments to label these, unless prettier objects: fd numbers and their use

output.end();
};
const checkDone = () => {
if (snapshotSize !== undefined && !(snapshotReadSize < snapshotSize)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. && (snapshotReadSize >= snapshotSize) would seem more readable, to me

Copy link
Member Author

@mhofman mhofman Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but that is not equivalent in the case where snapshotSize is null. However that's just too complicated for it's own good, will simplify.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, let's not rely upon Number(null)===0: if it's not a number, we shouldn't reach the comparison

Copy link
Member Author

@mhofman mhofman Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isNaN(Number(null)) === true, but yes agreed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used -1 as that will always be lower than 0 :)

@mhofman
Copy link
Member Author

mhofman commented Apr 28, 2023

  • wait until the snapshot reader has seen that many bytes

Technically it's wait until the consumer has seen that many bytes (or until it stops reading, whichever comes first), but since it's a simple pipe, that's the same thing.

  • wait until the command-response pipe returns an ack, with a size
  • wait until the snapshot reader has seen that many bytes

This being I/O on separate pipes, there is no guarantee in the order of these 2, so they're reacted to in parallel.

  • fire the promise that was returned earlier by makeSnapshot

There is no promise returned by makeSnapshot. The data yielded by the generator will start flowing right away. We however make sure that once the consumer is done with the data (either by reaching the end, or by returning early), that all got cleaned up first. The consumer knows it has reached the end because we close the stream once it has seen the number of expected bytes.

@warner
Copy link
Member

warner commented Apr 28, 2023

  • wait until the snapshot reader has seen that many bytes

Technically it's wait until the consumer has seen that many bytes (or until it stops reading, whichever comes first), but since it's a simple pipe, that's the same thing.

  • wait until the command-response pipe returns an ack, with a size
  • wait until the snapshot reader has seen that many bytes

This being I/O on separate pipes, there is no guarantee in the order of these 2, so they're reacted to in parallel.

Right, but you can't wait for N bytes until you know what N is, yeah? So as long as the reader is accepting bytes "in the background" (the IO callback is being called each time some data arrives), the code could do an await sendCommand('write snapshot') and not proceed until we know N, and then switch to a mode where we're instead waiting for N.

  • fire the promise that was returned earlier by makeSnapshot

There is no promise returned by makeSnapshot. The data yielded by the generator will start flowing right away. We however make sure that once the consumer is done with the data (either by reaching the end, or by returning early), that all got cleaned up first. The consumer knows it has reached the end because we close the stream once it has seen the number of expected bytes.

Oh, I thought makeSnapshot would return a promise too. I get that this would give us two different completion signals:

  • 1: stream closes (aka iterator ends)
  • 2: makeSnapshot return promise fires

But aren't there (failure) cases where you don't get far enough to build the stream, and makeSnapshot needs to signal failure without one?

@mhofman
Copy link
Member Author

mhofman commented Apr 28, 2023

But aren't there (failure) cases where you don't get far enough to build the stream, and makeSnapshot needs to signal failure without one?

makeSnapshot synchronously returns an async generator (instead of synchronously returning a promise before #7531). If there is an error, it will be communicated during the iteration of the stream.

@mhofman
Copy link
Member Author

mhofman commented Apr 28, 2023

So as long as the reader is accepting bytes "in the background" (the IO callback is being called each time some data arrives), the code could do an await sendCommand('write snapshot') and not proceed until we know N, and then switch to a mode where we're instead waiting for N.

I'm not sure what "waiting for N" here means. If we're accepting bytes in the background (needed to avoid blocking the sender xsnap-worker process), we have to keep track of the bytes as we're accepting them. We may accept the bytes before we learn N, or after, so we need to check if we reached N in both places (while accepting bytes, or when we learn N).

@warner
Copy link
Member

warner commented Apr 28, 2023

So as long as the reader is accepting bytes "in the background" (the IO callback is being called each time some data arrives), the code could do an await sendCommand('write snapshot') and not proceed until we know N, and then switch to a mode where we're instead waiting for N.

I'm not sure what "waiting for N" here means. If we're accepting bytes in the background (needed to avoid blocking the sender xsnap-worker process), we have to keep track of the bytes as we're accepting them. We may accept the bytes before we learn N, or after, so we need to check if we reached N in both places (while accepting bytes, or when we learn N).

Yeah, I guess I should be more precise. The callback that is invoked (which is maybe really an async generator which gets re-activated each time the FD becomes readable and select/poll/libuv/etc notices) gets a set of bytes, and either has a target count N or not. If it doesn't know N yet, it blindly delivers the bytes to the stream reader (the tee that's feeding compression and hashing, and maybe a third branch going to a replacement worker). If it does know N by that point, it feeds the bytes, but also checks the cumulative size, and if the size exceeds N, then it closes the stream. That's what I meant by "waiting for N": the stream close is "waiting" for size >= N.

(of course, we're depending upon the child process to be honest about N, because if N is too small, then we won't notice until we've already sent more than N bytes to the stream. It's important that the stream iterator is closed with an error, and that the client of that iterator pays attention to that error, and doesn't e.g. store a corrupt snapshot into the DB. Streaming APIs in cryptographic operations are troublesome this way, e.g. checking a hash on a streaming blob of data, because they frequently provide not-yet-validated data to a caller before sending a later "yup that was safe" signal, and it's really easy to forget that you must not rely on that data until getting the signal)

If makeSnapshot() also returned a Promise, that Promise would be fulfilled or rejected by the IO callback when it sees the size >= N threshold (fulfilled if size === N).

But, I trust that the sync return of an async generator means we'll always get a generator, and thus can rely upon the failure signal from the generator/stream, instead of needing a separate promise. And I'm willing to believe that one signalling mechanism is better than two.

thanks!

@mhofman
Copy link
Member Author

mhofman commented Apr 28, 2023

I never understood the baton bits well enough to read this properly

I do and I still made a mistake. Not a fatal one, but it was caught by one of the tests.
It's actually an issue that was introduced in #7531, but only revealed by the more flexible streaming in this PR.

Generator functions start suspended. That means the baton would only be taken by makeSnapshot once the generator stream starts getting consumed. An operation on the worker immediately following makeSnapshot would actually take the baton before the snapshot would take it, unless the snapshot consumption started before that next operation. In practice this never happens in SwingSet, but still enough of a footgun that I'll be fixing the root cause by synchronously taking the baton in makeSnapshot like every other operation.

Who said that single threaded programs couldn't run into data races due to faulty synchronization?

@mhofman
Copy link
Member Author

mhofman commented Apr 28, 2023

Apologies in advance, I will need to rebase this PR to amend the base before re-adding the commits. I will try to keep things consistent as much as possible and will not change the base to facilitate review.

@mhofman mhofman force-pushed the mhofman/6363-stream-snapshot branch from 34c1ac3 to 6c276c5 Compare April 28, 2023 23:11
@mhofman mhofman requested a review from kriskowal April 28, 2023 23:12
@mhofman
Copy link
Member Author

mhofman commented Apr 28, 2023

@warner PTAL
@kriskowal feel free to peak at the baton shenanigans

I've pushed some commits below the original ones, and interspersed the fixup commits addressing feedback to make sure this won't have any auto-squash rebase conflicts., but the original commits' diff should have barely changed besides simple conflict resolutions.

@warner warner self-requested a review April 29, 2023 20:47
Copy link
Member

@warner warner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, some suggested name changes and a pair of magic-number constants, but it seems pretty good to me. I'm not strong on the baton-passing stuff, so I'm still taking some of that on faith, and it'd be great if @kriskowal could take a second look.

const vat1 = await xsnap({
...options(io),
handleCommand,
snapshotStream: snapshotStream1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suggest snapshotStream: vat0.makeSnapshot(),

makes for a nice "clone the worker" idiom

* @param {string} [description]
* @returns {AsyncGenerator<Uint8Array, void, undefined>}
*/
function makeSnapshot(description = 'unknown') {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still thinking this should be named makeSnapshotStream, to make it clear that you aren't getting a Uint8Array as a single big blob, and to make it clear that you can't just copy the result (since it's a single-use generator).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think we should almost never pass plain Uint8Array around, only using streams, making the Stream suffix redundant ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, yeah, we have good reasons for using streams rather than multi-megabyte blobs, but a newcomer won't know that, and when we say "snapshot" in most other contexts (snapStore tables, files on disk, ingest-snapshot.js tools) we mean a blob of bytes, so their confusion would be justified

)}-XXXXXX.xss`,
});

const cleanup = async () => fs.unlink(snapPath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will cleanup be called temporally later than afterSpawn? I'd define them in the same order, so swap these two lines.

};

return harden({
snapPath: `@8:${safeHintFromDescription(snapshotDescription)}`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works (I'm guessing the related xsnap change was to limit the atoi to the separating colon), although I'm not sure we really need the snapshot ID on the command line (they're long and not very useful).

If we really need it to be available, we could stick it in an environment variable for the child process, where you could read it from /proc/PID/environ.

But, this is fine for now, no need to change it today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for atoi while C seem to say behavior is undefined when reaching a non numeric char, POSIX defines that the behavior is the one of strtol, and stops at non-numeric chars. In a perfect world I'd go an be explicit about this on the xsnap-worker side, but I really don't want another PR just for that.

Also the description will only include the hash when loading the vat the first time. On snapshot reload it will only contain vatID-snapPos. The reason I wanted to include is because for forced snapshot reload, I can end up with 2 process with the exact same args during a reload, not knowing which is which is which. I also have manually tested the old worker retirement, but being able to spot a problem with different cmdline is helpful if more than one process remains.

packages/xsnap/src/xsnap.js Show resolved Hide resolved
)}-XXXXXX.xss`,
});
// eslint-disable-next-line @jessie.js/no-nested-await
const handle = await fs.open(snapPath, 'w+');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why w+ ("writing and reading"). It sounds like this is the filename we send to the xsnap "please write a snapshot" command, which means xsnap will write to it, and we'll read from it. So should it use r instead of w+?

(using multiple modes on a single file is suspicious)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of stream opening race mitigation. Before I'd create the file name, tell xsnap write here (which it does with wb), wait for the response, then create the stream, wait for the file open event, then hook the stream.

To simplify the logic, I'm abusing file modes, and now the flow is as follow: JS creates the file in w+ mode, aka create/truncate and allow reading, creates the read stream, tell xsnap to take the snapshot, wait for the response, and then connects the stream to the output.

It's a lot more similar to the wiring for the streaming snapshot, except for when the piping of source to output stream occurs. If not that, I'd have to create a thunk to call in the "maybePipe", with all the asynchronous error handling complications.

I can try to document this better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, so it's a bit like what tail -F must do (wait for the file to exist, then wait for new data to appear). But here, the kernel side creates the file in the first place (so it can start waiting for new data right away), and the worker side opens a pre-existing file for writing (i.e. truncate, but the reader doesn't have to wait for them).

And of course Node's enthusiasm for async means that it's one notch more complicated: the kernel side can't just open the file, it has to start opening a file and then wait for the open to complete.

I'd be tempted to do a sync open(mode='w') to create the empty file, then close it right away, then an async open(mode='r') to set up the reader, with a comment in between to explain that we pre-create the file to avoid needing to watch for it to appear when the child starts writing. But I can imagine how that flow might be not as good as the mode='w+' you're doing.

So yeah, please add a comment explaining what the two sides observe, and why w+ is a tolerable abuse.

finished(output).finally(() => sourceStream.destroy());
} else {
sourceStream = savedSnapshotsStream;
snapPath = '@7';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this magic number should get a SNAPSHOT_SAVE_FD constant too

output.on('data', onData);

const result = batonKit.promise.then(async () => {
await messagesToXsnap.next(encoder.encode(`w${snapPath}`));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment: "tell xsnap to write a snapshot to disk or the pipe"

just to help readers orient themselves here

snapshotSize = Number(lengthStr);
maybePipe();
} else {
snapshotSize = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean if we get here? I think it means that the snapshot write failed, and xsnap is trying to tell us there was an error, or maybe xsnap died in the middle of the process.

I guess snapshotSize = -1 means that cleanup() will finish quickly, which is good, but where do we tell the stream (going into snapStore?) that the data is bad, and it should not be saved? And how does the caller (who just hands the stream to snapStore) learn that it failed? I guess maybe snapStore.saveSnapshot() is supposed to throw when it sees the stream finish with an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finally section checks the read size against this size, and if different throws, which means the consumer will see an error before its stream iteration ends. And yes, if iteration throws, the snapStore.saveSnapshot() will not store anything and re-throw.

await fs.unlink(tmpSnapPath);
await done;
(piped && snapshotReadSize === snapshotSize) ||
Fail`Snapshot size does not match. saved=${q(snapshotSize)}, read=${q(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, ok, here's one place that would signal an error after snapshotSize = -1, and since this at the top of async function* makeSnapshotInternal, the generator given to snapStore will exit with an error. I think that works.

@mhofman mhofman force-pushed the mhofman/6363-stream-snapshot branch from a49c075 to c41472a Compare April 30, 2023 01:32
@mhofman mhofman added automerge:rebase Automatically rebase updates, then merge and removed force:integration Force integration tests to run on PR labels Apr 30, 2023
@mhofman mhofman force-pushed the mhofman/6363-stream-snapshot branch from c41472a to 3f77ff9 Compare April 30, 2023 01:53
@mergify mergify bot merged commit 91dc63f into master Apr 30, 2023
@mergify mergify bot deleted the mhofman/6363-stream-snapshot branch April 30, 2023 02:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
automerge:rebase Automatically rebase updates, then merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream snapshots between kernel and xsnap worker
2 participants