Skip to content

Commit

Permalink
feat: relocate snapshot metadata from kvStore to snapStore
Browse files Browse the repository at this point in the history
Closes #6742
  • Loading branch information
FUDCo committed Jan 18, 2023
1 parent f06a171 commit 0da5f4b
Show file tree
Hide file tree
Showing 17 changed files with 235 additions and 331 deletions.
10 changes: 6 additions & 4 deletions packages/SwingSet/src/controller/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,23 @@ export function makeStartXSnap(bundles, { snapStore, env, spawn }) {
}

/**
* @param {string} vatID
* @param {string} name
* @param {(request: Uint8Array) => Promise<Uint8Array>} handleCommand
* @param {boolean} [metered]
* @param {string} [snapshotHash]
* @param {boolean} [reload]
*/
async function startXSnap(
vatID,
name,
handleCommand,
metered,
snapshotHash = undefined,
reload = false,
) {
const meterOpts = metered ? {} : { meteringLimit: 0 };
if (snapStore && snapshotHash) {
if (snapStore && reload) {
// console.log('startXSnap from', { snapshotHash });
return snapStore.load(snapshotHash, async snapshot => {
return snapStore.loadSnapshot(vatID, async snapshot => {
const xs = doXSnap({
snapshot,
name,
Expand Down
8 changes: 2 additions & 6 deletions packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ const enableKernelGC = true;
// v$NN.reapInterval = $NN or 'never'
// v$NN.reapCountdown = $NN or 'never'
// exclude from consensus
// local.v$NN.lastSnapshot = JSON({ snapshotID, startPos })
// local.snapshot.$id = [vatID, ...]
// local.*

// m$NN.remaining = $NN // remaining capacity (in computrons) or 'unlimited'
// m$NN.threshold = $NN // notify when .remaining first drops below this
Expand Down Expand Up @@ -786,10 +785,7 @@ export default function makeKernelKeeper(kernelStorage, kernelSlog) {
const promisePrefix = `${vatID}.c.p`;
const kernelPromisesToReject = [];

const old = vatKeeper.getLastSnapshot();
if (old) {
vatKeeper.removeFromSnapshot(old.snapshotID);
}
vatKeeper.deleteSnapshots();

// Note: ASCII order is "+,-./", and we rely upon this to split the
// keyspace into the various o+NN/o-NN/etc spaces. If we were using a
Expand Down
125 changes: 26 additions & 99 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,8 @@ export function initializeVatState(kvStore, streamStore, vatID) {
kvStore.set(`${vatID}.d.nextID`, `${FIRST_DEVICE_ID}`);
kvStore.set(`${vatID}.nextDeliveryNum`, `0`);
kvStore.set(`${vatID}.incarnationNumber`, `1`);
kvStore.set(
`${vatID}.t.startPosition`,
`${JSON.stringify(streamStore.STREAM_START)}`,
);
kvStore.set(
`${vatID}.t.endPosition`,
`${JSON.stringify(streamStore.STREAM_START)}`,
);
kvStore.set(`${vatID}.t.startPosition`, `${streamStore.STREAM_START}`);
kvStore.set(`${vatID}.t.endPosition`, `${streamStore.STREAM_START}`);
}

/**
Expand Down Expand Up @@ -486,9 +480,9 @@ export function makeVatKeeper(
*/
function* getTranscript(startPos) {
if (startPos === undefined) {
startPos = JSON.parse(getRequired(`${vatID}.t.startPosition`));
startPos = Number(getRequired(`${vatID}.t.startPosition`));
}
const endPos = JSON.parse(getRequired(`${vatID}.t.endPosition`));
const endPos = Number(getRequired(`${vatID}.t.endPosition`));
for (const entry of streamStore.readStream(
transcriptStream,
/** @type { StreamPosition } */ (startPos),
Expand All @@ -504,88 +498,34 @@ export function makeVatKeeper(
* @param {object} entry The transcript entry to append.
*/
function addToTranscript(entry) {
const oldPos = JSON.parse(getRequired(`${vatID}.t.endPosition`));
const oldPos = Number(getRequired(`${vatID}.t.endPosition`));
const newPos = streamStore.writeStreamItem(
transcriptStream,
JSON.stringify(entry),
oldPos,
);
kvStore.set(`${vatID}.t.endPosition`, `${JSON.stringify(newPos)}`);
kvStore.set(`${vatID}.t.endPosition`, `${newPos}`);
}

/** @returns {StreamPosition} */
function getTranscriptEndPosition() {
return JSON.parse(
const endPosition =
kvStore.get(`${vatID}.t.endPosition`) ||
assert.fail('missing endPosition'),
);
assert.fail('missing endPosition');
return Number(endPosition);
}

/**
* @returns {{ snapshotID: string, startPos: StreamPosition } | undefined}
*/
function getLastSnapshot() {
const notation = kvStore.get(`local.${vatID}.lastSnapshot`);
if (!notation) {
return undefined;
}
const { snapshotID, startPos } = JSON.parse(notation);
assert.typeof(snapshotID, 'string');
assert(startPos);
return { snapshotID, startPos };
function getSnapshotInfo() {
return snapStore?.getSnapshotInfo(vatID);
}

function transcriptSnapshotStats() {
const totalEntries = getTranscriptEndPosition();
const lastSnapshot = getLastSnapshot();
const snapshottedEntries = lastSnapshot ? lastSnapshot.startPos : 0;
const snapshotInfo = getSnapshotInfo();
const snapshottedEntries = snapshotInfo ? snapshotInfo.endPos : 0;
return { totalEntries, snapshottedEntries };
}

/**
* Add vatID to consumers of a snapshot.
*
* @param {string} snapshotID
*/
function addToSnapshot(snapshotID) {
const key = `local.snapshot.${snapshotID}`;
const consumers = JSON.parse(kvStore.get(key) || '[]');
assert(Array.isArray(consumers));

// We can't completely rule out the possibility that
// a vat will use the same snapshot twice in a row.
//
// PERFORMANCE NOTE: we assume consumer lists are short;
// usually length 1. So O(n) search here is better
// than keeping the list sorted.
if (!consumers.includes(vatID)) {
consumers.push(vatID);
kvStore.set(key, JSON.stringify(consumers));
// console.log('addToSnapshot result:', { vatID, snapshotID, consumers });
}
}

/**
* Remove vatID from consumers of a snapshot.
*
* @param {string} snapshotID
*/
function removeFromSnapshot(snapshotID) {
const key = `local.snapshot.${snapshotID}`;
const consumersJSON = kvStore.get(key);
if (!consumersJSON) {
throw Fail`cannot remove ${vatID}: ${key} key not defined`;
}
const consumers = JSON.parse(consumersJSON);
assert(Array.isArray(consumers));
const ix = consumers.indexOf(vatID);
assert(ix >= 0);
consumers.splice(ix, 1);
// console.log('removeFromSnapshot done:', { vatID, snapshotID, consumers });
kvStore.set(key, JSON.stringify(consumers));
return consumers.length;
}

/**
* Store a snapshot, if given a snapStore.
*
Expand All @@ -597,30 +537,19 @@ export function makeVatKeeper(
return false;
}

const info = await manager.makeSnapshot(snapStore);
const endPosition = getTranscriptEndPosition();
const info = await manager.makeSnapshot(endPosition, snapStore);
const {
hash: snapshotID,
hash,
rawByteCount,
rawSaveSeconds,
compressedByteCount,
compressSeconds,
} = info;
const old = getLastSnapshot();
if (old && old.snapshotID !== snapshotID) {
if (removeFromSnapshot(old.snapshotID) === 0) {
snapStore.deleteSnapshot(old.snapshotID);
}
}
const endPosition = getTranscriptEndPosition();
kvStore.set(
`local.${vatID}.lastSnapshot`,
JSON.stringify({ snapshotID, startPos: endPosition }),
);
addToSnapshot(snapshotID);
kernelSlog.write({
type: 'heap-snapshot-save',
vatID,
snapshotID,
hash,
rawByteCount,
rawSaveSeconds,
compressedByteCount,
Expand All @@ -630,17 +559,15 @@ export function makeVatKeeper(
return true;
}

function deleteSnapshots() {
if (snapStore) {
snapStore.deleteVatSnapshots(vatID);
}
}

function removeSnapshotAndTranscript() {
const skey = `local.${vatID}.lastSnapshot`;
if (snapStore) {
const notation = kvStore.get(skey);
if (notation) {
const { snapshotID } = JSON.parse(notation);
if (removeFromSnapshot(snapshotID) === 0) {
snapStore.deleteSnapshot(snapshotID);
}
kvStore.delete(skey);
}
snapStore.deleteVatSnapshots(vatID);
}

const endPos = getRequired(`${vatID}.t.endPosition`);
Expand Down Expand Up @@ -719,8 +646,8 @@ export function makeVatKeeper(
vatStats,
dumpState,
saveSnapshot,
getLastSnapshot,
removeFromSnapshot,
deleteSnapshots,
getSnapshotInfo,
removeSnapshotAndTranscript,
});
}
4 changes: 2 additions & 2 deletions packages/SwingSet/src/kernel/vat-loader/manager-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import { makeTranscriptManager } from './transcript.js';
/**
*
* @typedef { { getManager: (shutdown: () => Promise<void>,
* makeSnapshot?: (ss: SnapStore) => Promise<SnapshotInfo>) => VatManager,
* makeSnapshot?: (endPos: number, ss: SnapStore) => Promise<SnapshotResult>) => VatManager,
* syscallFromWorker: (vso: VatSyscallObject) => VatSyscallResult,
* setDeliverToWorker: (dtw: unknown) => void,
* } } ManagerKit
Expand Down Expand Up @@ -259,7 +259,7 @@ function makeManagerKit(
/**
*
* @param { () => Promise<void>} shutdown
* @param {(ss: SnapStore) => Promise<SnapshotInfo>} makeSnapshot
* @param {(endPos: number, ss: SnapStore) => Promise<SnapshotResult>} makeSnapshot
* @returns {VatManager}
*/
function getManager(shutdown, makeSnapshot) {
Expand Down
21 changes: 11 additions & 10 deletions packages/SwingSet/src/kernel/vat-loader/manager-subprocess-xsnap.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const decoder = new TextDecoder();
* allVatPowers: VatPowers,
* kernelKeeper: KernelKeeper,
* kernelSlog: KernelSlog,
* startXSnap: (name: string, handleCommand: AsyncHandler, metered?: boolean, snapshotHash?: string) => Promise<XSnap>,
* startXSnap: (vatID: string, name: string, handleCommand: AsyncHandler, metered?: boolean, reload?: boolean) => Promise<XSnap>,
* testLog: (...args: unknown[]) => void,
* }} tools
* @returns {VatManagerFactory}
Expand Down Expand Up @@ -112,10 +112,9 @@ export function makeXsSubprocessFactory({
}

const vatKeeper = kernelKeeper.provideVatKeeper(vatID);
const lastSnapshot = vatKeeper.getLastSnapshot();
if (lastSnapshot) {
const { snapshotID } = lastSnapshot;
kernelSlog.write({ type: 'heap-snapshot-load', vatID, snapshotID });
const snapshotInfo = vatKeeper.getSnapshotInfo();
if (snapshotInfo) {
kernelSlog.write({ type: 'heap-snapshot-load', vatID, ...snapshotInfo });
}

// `startXSnap` adds `argName` as a dummy argument so that 'ps'
Expand All @@ -125,10 +124,11 @@ export function makeXsSubprocessFactory({

// start the worker and establish a connection
const worker = await startXSnap(
vatID,
argName,
handleCommand,
metered,
lastSnapshot ? lastSnapshot.snapshotID : undefined,
!!snapshotInfo,
);

/** @type { (item: Tagged) => Promise<CrankResults> } */
Expand All @@ -144,7 +144,7 @@ export function makeXsSubprocessFactory({
return { ...result, reply: [tag, ...rest] };
}

if (lastSnapshot) {
if (snapshotInfo) {
parentLog(vatID, `snapshot loaded. dispatch ready.`);
} else {
parentLog(vatID, `instructing worker to load bundle..`);
Expand Down Expand Up @@ -209,11 +209,12 @@ export function makeXsSubprocessFactory({
return worker.close().then(_ => undefined);
}
/**
* @param {number} endPos
* @param {SnapStore} snapStore
* @returns {Promise<SnapshotInfo>}
* @returns {Promise<SnapshotResult>}
*/
function makeSnapshot(snapStore) {
return snapStore.save(fn => worker.snapshot(fn));
function makeSnapshot(endPos, snapStore) {
return snapStore.saveSnapshot(vatID, endPos, fn => worker.snapshot(fn));
}

return mk.getManager(shutdown, makeSnapshot);
Expand Down
4 changes: 2 additions & 2 deletions packages/SwingSet/src/kernel/vat-warehouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ export function makeVatWarehouse(kernelKeeper, vatLoader, policyOptions) {
// TODO(3218): persist this option; avoid spinning up a vat that isn't pipelined
const { enablePipelining = false } = options;

const lastSnapshot = vatKeeper.getLastSnapshot();
const snapshotInfo = vatKeeper.getSnapshotInfo();
await manager.replayTranscript(
lastSnapshot ? lastSnapshot.startPos : undefined,
snapshotInfo ? snapshotInfo.endPos : undefined,
);

const result = {
Expand Down
2 changes: 1 addition & 1 deletion packages/SwingSet/src/types-ambient.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
* @typedef { import('./types-external.js').VatManagerFactory } VatManagerFactory
* @typedef { import('./types-external.js').VatManager } VatManager
* @typedef { import('./types-external.js').SnapStore } SnapStore
* @typedef { import('./types-external.js').SnapshotInfo } SnapshotInfo
* @typedef { import('./types-external.js').SnapshotResult } SnapshotResult
* @typedef { import('./types-external.js').WaitUntilQuiescent } WaitUntilQuiescent
*/

Expand Down
4 changes: 2 additions & 2 deletions packages/SwingSet/src/types-external.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ export {};
* } } VatManagerFactory
* @typedef { { deliver: (delivery: VatDeliveryObject) => Promise<VatDeliveryResult>,
* replayTranscript: (startPos: StreamPosition | undefined) => Promise<number?>,
* makeSnapshot?: (ss: SnapStore) => Promise<SnapshotInfo>,
* makeSnapshot?: (endPos: number, ss: SnapStore) => Promise<SnapshotResult>,
* shutdown: () => Promise<void>,
* } } VatManager
*
Expand Down Expand Up @@ -291,7 +291,7 @@ export {};
/**
* @typedef { import('@agoric/swing-store').KVStore } KVStore
* @typedef { import('@agoric/swing-store').SnapStore } SnapStore
* @typedef { import('@agoric/swing-store').SnapshotInfo } SnapshotInfo
* @typedef { import('@agoric/swing-store').SnapshotResult } SnapshotResult
* @typedef { import('@agoric/swing-store').StreamStore } StreamStore
* @typedef { import('@agoric/swing-store').StreamPosition } StreamPosition
* @typedef { import('@agoric/swing-store').SwingStore } SwingStore
Expand Down
6 changes: 1 addition & 5 deletions packages/SwingSet/test/test-state.js
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,7 @@ test('crankhash - skip keys', t => {
// certain local keys are excluded from consensus, and should not affect
// the hash
k.kvStore.set('one', '1');
k.kvStore.set('local.snapshot.XYZ', '["vat1234"]');
k.kvStore.set(
'local.v1234.lastSnapshot',
'{"snapshotID":"XYZ","startPos":4}',
);
k.kvStore.set('local.doNotHashMe', 'random nonsense');
t.throws(() => k.kvStore.set('host.foo', 'bar'));
t.is(k.emitCrankHashes().crankhash, expCrankhash);
});
Expand Down
3 changes: 2 additions & 1 deletion packages/SwingSet/test/test-xsnap-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ test('child termination distinguished from meter exhaustion', async t => {
/** @type { any } */
const kernelKeeper = {
provideVatKeeper: () => ({
getLastSnapshot: () => undefined,
getSnapshotInfo: () => undefined,
addToTranscript: () => undefined,
}),
getRelaxDurabilityRules: () => false,
Expand All @@ -60,6 +60,7 @@ test('child termination distinguished from meter exhaustion', async t => {
// @ts-expect-error close enough for this test
const managerOptions = { useTranscript: true };
const schandler = _vso => ['ok', null];

const m = await xsWorkerFactory.createFromBundle(
'v1',
bundle,
Expand Down
Loading

0 comments on commit 0da5f4b

Please sign in to comment.