Skip to content

Commit

Permalink
First pass at enqueueItemAndWaitForResults.
Browse files Browse the repository at this point in the history
  • Loading branch information
TimothyBJacobs committed Oct 18, 2020
1 parent 043da74 commit e36d90e
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 5 deletions.
5 changes: 3 additions & 2 deletions packages/batch-processing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
"!((src|build|build-module)/(components|utils)/**)"
],
"dependencies": {
"@wordpress/data": "file:../data",
"uuid": "^8.3.1"
"@wordpress/data": "file:../data",
"lodash": "^4.17.20",
"uuid": "^8.3.1"
},
"publishConfig": {
"access": "public"
Expand Down
55 changes: 53 additions & 2 deletions packages/batch-processing/src/store/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,29 @@ import { v4 as uuid } from 'uuid';
*/
import {
select,
dispatch,
enqueueItemAndAutocommit as enqueueAutocommitControl,
processTransaction,
} from './controls';
import { STATE_ERROR, STATE_SUCCESS } from './constants';

export const enqueueItemAndAutocommit = function* ( queue, context, item ) {
return yield enqueueAutocommitControl( queue, context, item, true );
return yield enqueueAutocommitControl( queue, context, item );
};

export const enqueueItemAndWaitForResults = function* ( queue, context, item ) {
const { itemId } = yield dispatch( 'enqueueItem', queue, context, item );
const { promise } = yield* getOrSetupPromise( queue, context );

return {
wait: promise.then( ( batch ) => {
if ( batch.state === STATE_ERROR ) {
throw batch.errors[ itemId ];
}

return batch.results[ itemId ];
} ),
};
};

export const enqueueItem = function ( queue, context, item ) {
Expand All @@ -28,6 +44,33 @@ export const enqueueItem = function ( queue, context, item ) {
};
};

const setupPromise = function ( queue, context ) {
const action = {
type: 'SETUP_PROMISE',
queue,
context,
};

action.promise = new Promise( ( resolve, reject ) => {
action.resolve = resolve;
action.reject = reject;
} );

return action;
};

const getOrSetupPromise = function* ( queue, context ) {
const promise = yield select( 'getPromise', queue, context );

if ( promise ) {
return promise;
}

yield setupPromise( queue, context );

return yield select( 'getPromise', queue, context );
};

export const processBatch = function* ( queue, context, meta = {} ) {
const batchId = uuid();
yield prepareBatchForProcessing( queue, context, batchId, meta );
Expand All @@ -51,13 +94,21 @@ export const processBatch = function* ( queue, context, meta = {} ) {
}
}

const promise = yield select( 'getPromise', queue, context );
yield {
queue,
context,
batchId,
type: 'BATCH_FINISH',
state: failed ? STATE_ERROR : STATE_SUCCESS,
};
const batch = yield select( 'getBatch', batchId );

if ( promise ) {
promise.resolve( batch );
}

return yield select( 'getBatch', batchId );
return batch;
};

export function* commitTransaction( batchId, transactionId ) {
Expand Down
32 changes: 31 additions & 1 deletion packages/batch-processing/src/store/reducer.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/**
* External dependencies
*/
import { omit } from 'lodash';

/**
* Internal dependencies
*/
Expand All @@ -14,6 +19,7 @@ const defaultState = {
enqueuedItems: {},
batches: {},
processors: {},
promises: {},
};

export default function reducer( state = defaultState, action ) {
Expand Down Expand Up @@ -44,7 +50,7 @@ export default function reducer( state = defaultState, action ) {
}

const stateQueue = state.enqueuedItems[ queue ] || {};
const enqueuedItems = [ ...stateQueue[ context ] ];
const enqueuedItems = [ ...( stateQueue[ context ] || [] ) ];
const transactions = {};
let transactionNb = 0;
while ( enqueuedItems.length ) {
Expand Down Expand Up @@ -83,6 +89,23 @@ export default function reducer( state = defaultState, action ) {
};
}

case 'SETUP_PROMISE': {
return {
...state,
promises: {
...state.promises,
[ action.queue ]: {
...( state.promises[ action.queue ] || {} ),
[ action.context ]: {
promise: action.promise,
resolve: action.resolve,
reject: action.reject,
},
},
},
};
}

case 'BATCH_START': {
const { batchId } = action;
return {
Expand Down Expand Up @@ -111,6 +134,13 @@ export default function reducer( state = defaultState, action ) {
: STATE_ERROR,
},
},
promises: {
...state.promises,
[ action.queue ]: omit(
state.promises[ action.queue ] || {},
[ action.context ]
),
},
};
}

Expand Down
4 changes: 4 additions & 0 deletions packages/batch-processing/src/store/selectors.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ export const getBatch = ( state, batchId ) => {
export const getProcessor = ( state, queue ) => {
return state.processors[ queue ];
};

export const getPromise = ( state, queue, context ) => {
return state.promises[ queue ]?.[ context ];
};
83 changes: 83 additions & 0 deletions packages/batch-processing/src/test/test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Internal dependencies
*/
import store from '../store';
import {
registerProcessor,
enqueueItemAndWaitForResults,
processBatch,
} from '../store/actions';
import { STATE_ERROR, STATE_SUCCESS } from '../';

const TEST_QUEUE = 'TEST_QUEUE';
const TEST_CONTEXT = 'default';

async function processor( requests, transaction ) {
if ( transaction.state === STATE_ERROR ) {
throw {
code: 'transaction_failed',
data: { status: 500 },
message: 'Transaction failed.',
};
}

return await Promise.resolve(
requests.map( ( request ) => ( {
done: true,
name: request.name,
} ) )
);
}

async function testItem( name ) {
const item = { name };
const { wait } = await store.dispatch(
enqueueItemAndWaitForResults( TEST_QUEUE, TEST_CONTEXT, item )
);

const expected = { done: true, name };

// We can't await this until the batch is processed.
// eslint-disable-next-line jest/valid-expect
const promise = expect( wait ).resolves.toEqual( expected );

return { expected, promise };
}

describe( 'waitForResults', function () {
store.dispatch( registerProcessor( TEST_QUEUE, processor ) );

it( 'works', async () => {
expect.assertions( 4 );

const { expected: i1, promise: p1 } = await testItem( 'i1' );
const { expected: i2, promise: p2 } = await testItem( 'i2' );

const resolves = [ p1, p2 ];
const batch = await store.dispatch(
processBatch( TEST_QUEUE, TEST_CONTEXT )
);

expect( batch.state ).toEqual( STATE_SUCCESS );
expect( Object.values( batch.results ) ).toEqual( [ i1, i2 ] );

await Promise.all( resolves );
} );

it( 'can use the same context more than once', async () => {
expect.assertions( 4 );

const { promise: p1 } = await testItem( 'i1' );
await store.dispatch( processBatch( TEST_QUEUE, TEST_CONTEXT ) );
await p1;

const { expected: i2, promise: p2 } = await testItem( 'i2' );
const batch = await store.dispatch(
processBatch( TEST_QUEUE, TEST_CONTEXT )
);

expect( batch.state ).toEqual( STATE_SUCCESS );
expect( Object.values( batch.results ) ).toEqual( [ i2 ] );
await p2;
} );
} );

0 comments on commit e36d90e

Please sign in to comment.