diff --git a/packages/batch-processing/package.json b/packages/batch-processing/package.json index 5657cf3a724279..ae74d01774727f 100644 --- a/packages/batch-processing/package.json +++ b/packages/batch-processing/package.json @@ -27,8 +27,9 @@ "!((src|build|build-module)/(components|utils)/**)" ], "dependencies": { - "@wordpress/data": "file:../data", - "uuid": "^8.3.1" + "@wordpress/data": "file:../data", + "lodash": "^4.17.20", + "uuid": "^8.3.1" }, "publishConfig": { "access": "public" diff --git a/packages/batch-processing/src/store/actions.js b/packages/batch-processing/src/store/actions.js index 7b2ea24e3b5583..a21a7433e44f00 100644 --- a/packages/batch-processing/src/store/actions.js +++ b/packages/batch-processing/src/store/actions.js @@ -8,13 +8,29 @@ import { v4 as uuid } from 'uuid'; */ import { select, + dispatch, enqueueItemAndAutocommit as enqueueAutocommitControl, processTransaction, } from './controls'; import { STATE_ERROR, STATE_SUCCESS } from './constants'; export const enqueueItemAndAutocommit = function* ( queue, context, item ) { - return yield enqueueAutocommitControl( queue, context, item, true ); + return yield enqueueAutocommitControl( queue, context, item ); +}; + +export const enqueueItemAndWaitForResults = function* ( queue, context, item ) { + const { itemId } = yield dispatch( 'enqueueItem', queue, context, item ); + const { promise } = yield* getOrSetupPromise( queue, context ); + + return { + wait: promise.then( ( batch ) => { + if ( batch.state === STATE_ERROR ) { + throw batch.errors[ itemId ]; + } + + return batch.results[ itemId ]; + } ), + }; }; export const enqueueItem = function ( queue, context, item ) { @@ -28,6 +44,33 @@ export const enqueueItem = function ( queue, context, item ) { }; }; +const setupPromise = function ( queue, context ) { + const action = { + type: 'SETUP_PROMISE', + queue, + context, + }; + + action.promise = new Promise( ( resolve, reject ) => { + action.resolve = resolve; + action.reject = reject; + } ); + + return action; +}; + +const getOrSetupPromise = function* ( queue, context ) { + const promise = yield select( 'getPromise', queue, context ); + + if ( promise ) { + return promise; + } + + yield setupPromise( queue, context ); + + return yield select( 'getPromise', queue, context ); +}; + export const processBatch = function* ( queue, context, meta = {} ) { const batchId = uuid(); yield prepareBatchForProcessing( queue, context, batchId, meta ); @@ -51,13 +94,21 @@ export const processBatch = function* ( queue, context, meta = {} ) { } } + const promise = yield select( 'getPromise', queue, context ); yield { + queue, + context, batchId, type: 'BATCH_FINISH', state: failed ? STATE_ERROR : STATE_SUCCESS, }; + const batch = yield select( 'getBatch', batchId ); + + if ( promise ) { + promise.resolve( batch ); + } - return yield select( 'getBatch', batchId ); + return batch; }; export function* commitTransaction( batchId, transactionId ) { diff --git a/packages/batch-processing/src/store/reducer.js b/packages/batch-processing/src/store/reducer.js index 517c114a939e23..1426605d58832a 100644 --- a/packages/batch-processing/src/store/reducer.js +++ b/packages/batch-processing/src/store/reducer.js @@ -1,3 +1,8 @@ +/** + * External dependencies + */ +import { omit } from 'lodash'; + /** * Internal dependencies */ @@ -14,6 +19,7 @@ const defaultState = { enqueuedItems: {}, batches: {}, processors: {}, + promises: {}, }; export default function reducer( state = defaultState, action ) { @@ -44,7 +50,7 @@ export default function reducer( state = defaultState, action ) { } const stateQueue = state.enqueuedItems[ queue ] || {}; - const enqueuedItems = [ ...stateQueue[ context ] ]; + const enqueuedItems = [ ...( stateQueue[ context ] || [] ) ]; const transactions = {}; let transactionNb = 0; while ( enqueuedItems.length ) { @@ -83,6 +89,23 @@ export default function reducer( state = defaultState, action ) { }; } + case 'SETUP_PROMISE': { + return { + ...state, + promises: { + ...state.promises, + [ action.queue ]: { + ...( state.promises[ action.queue ] || {} ), + [ action.context ]: { + promise: action.promise, + resolve: action.resolve, + reject: action.reject, + }, + }, + }, + }; + } + case 'BATCH_START': { const { batchId } = action; return { @@ -111,6 +134,13 @@ export default function reducer( state = defaultState, action ) { : STATE_ERROR, }, }, + promises: { + ...state.promises, + [ action.queue ]: omit( + state.promises[ action.queue ] || {}, + [ action.context ] + ), + }, }; } diff --git a/packages/batch-processing/src/store/selectors.js b/packages/batch-processing/src/store/selectors.js index daac186060174b..2ed903e95bd79f 100644 --- a/packages/batch-processing/src/store/selectors.js +++ b/packages/batch-processing/src/store/selectors.js @@ -5,3 +5,7 @@ export const getBatch = ( state, batchId ) => { export const getProcessor = ( state, queue ) => { return state.processors[ queue ]; }; + +export const getPromise = ( state, queue, context ) => { + return state.promises[ queue ]?.[ context ]; +}; diff --git a/packages/batch-processing/src/test/test.js b/packages/batch-processing/src/test/test.js new file mode 100644 index 00000000000000..4d2e1dce23af5d --- /dev/null +++ b/packages/batch-processing/src/test/test.js @@ -0,0 +1,83 @@ +/** + * Internal dependencies + */ +import store from '../store'; +import { + registerProcessor, + enqueueItemAndWaitForResults, + processBatch, +} from '../store/actions'; +import { STATE_ERROR, STATE_SUCCESS } from '../'; + +const TEST_QUEUE = 'TEST_QUEUE'; +const TEST_CONTEXT = 'default'; + +async function processor( requests, transaction ) { + if ( transaction.state === STATE_ERROR ) { + throw { + code: 'transaction_failed', + data: { status: 500 }, + message: 'Transaction failed.', + }; + } + + return await Promise.resolve( + requests.map( ( request ) => ( { + done: true, + name: request.name, + } ) ) + ); +} + +async function testItem( name ) { + const item = { name }; + const { wait } = await store.dispatch( + enqueueItemAndWaitForResults( TEST_QUEUE, TEST_CONTEXT, item ) + ); + + const expected = { done: true, name }; + + // We can't await this until the batch is processed. + // eslint-disable-next-line jest/valid-expect + const promise = expect( wait ).resolves.toEqual( expected ); + + return { expected, promise }; +} + +describe( 'waitForResults', function () { + store.dispatch( registerProcessor( TEST_QUEUE, processor ) ); + + it( 'works', async () => { + expect.assertions( 4 ); + + const { expected: i1, promise: p1 } = await testItem( 'i1' ); + const { expected: i2, promise: p2 } = await testItem( 'i2' ); + + const resolves = [ p1, p2 ]; + const batch = await store.dispatch( + processBatch( TEST_QUEUE, TEST_CONTEXT ) + ); + + expect( batch.state ).toEqual( STATE_SUCCESS ); + expect( Object.values( batch.results ) ).toEqual( [ i1, i2 ] ); + + await Promise.all( resolves ); + } ); + + it( 'can use the same context more than once', async () => { + expect.assertions( 4 ); + + const { promise: p1 } = await testItem( 'i1' ); + await store.dispatch( processBatch( TEST_QUEUE, TEST_CONTEXT ) ); + await p1; + + const { expected: i2, promise: p2 } = await testItem( 'i2' ); + const batch = await store.dispatch( + processBatch( TEST_QUEUE, TEST_CONTEXT ) + ); + + expect( batch.state ).toEqual( STATE_SUCCESS ); + expect( Object.values( batch.results ) ).toEqual( [ i2 ] ); + await p2; + } ); +} );