Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support async iterator for paging method #708

Merged
merged 49 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
f20a8a8
async iteratoe logic & unit test
xiaozhenliu-gg5 Jan 13, 2020
7975b44
run all unit tests
xiaozhenliu-gg5 Jan 13, 2020
8dde950
add end-to-end test for async iterator
xiaozhenliu-gg5 Jan 14, 2020
d116750
add end-to-end test for page stream
xiaozhenliu-gg5 Jan 14, 2020
9acddd5
fix lint
xiaozhenliu-gg5 Jan 14, 2020
f919b1a
system-test
xiaozhenliu-gg5 Jan 14, 2020
a61f007
clean up
xiaozhenliu-gg5 Jan 14, 2020
240d776
system-test
xiaozhenliu-gg5 Jan 14, 2020
5379386
fix
xiaozhenliu-gg5 Jan 14, 2020
a4e5dca
fix test
xiaozhenliu-gg5 Jan 14, 2020
027b997
fix unit tests
xiaozhenliu-gg5 Jan 14, 2020
4b28b21
fix end-to-end test
xiaozhenliu-gg5 Jan 14, 2020
9c555f6
Merge branch 'master' into page_async
xiaozhenliu-gg5 Jan 14, 2020
2bb7b8e
feedback
xiaozhenliu-gg5 Jan 14, 2020
b86f22f
Merge branch 'page_async' of https://github.com/googleapis/gax-nodejs…
xiaozhenliu-gg5 Jan 14, 2020
3f5ee64
clean
xiaozhenliu-gg5 Jan 14, 2020
a1161cd
change name to asyncIterate
xiaozhenliu-gg5 Jan 14, 2020
5aa1cde
lint
xiaozhenliu-gg5 Jan 14, 2020
f466e0c
system-test
xiaozhenliu-gg5 Jan 14, 2020
e226411
proper type for iterable
xiaozhenliu-gg5 Jan 15, 2020
d65efb6
clean up
xiaozhenliu-gg5 Jan 15, 2020
720e8a0
feedback
xiaozhenliu-gg5 Jan 16, 2020
888af8f
test
xiaozhenliu-gg5 Jan 17, 2020
71bd16e
put common code in a function
xiaozhenliu-gg5 Jan 17, 2020
ede5deb
lint
xiaozhenliu-gg5 Jan 17, 2020
e3e1e54
clean up
xiaozhenliu-gg5 Jan 17, 2020
ab464c4
debugging
alexander-fenster Jan 18, 2020
4b2f8d6
make it work
xiaozhenliu-gg5 Jan 20, 2020
6961b50
feedback
xiaozhenliu-gg5 Jan 21, 2020
8cd2620
remove extra params
xiaozhenliu-gg5 Jan 21, 2020
176aa27
fix
xiaozhenliu-gg5 Jan 21, 2020
d3a3230
make it work first
xiaozhenliu-gg5 Jan 21, 2020
d7272fd
resolve request & func
xiaozhenliu-gg5 Jan 21, 2020
6209263
Merge branch 'master' into page_async
xiaozhenliu-gg5 Jan 21, 2020
5d87f7a
clean up
xiaozhenliu-gg5 Jan 22, 2020
12feb43
Merge branch 'page_async' of https://github.com/googleapis/gax-nodejs…
xiaozhenliu-gg5 Jan 22, 2020
8250d54
polish test
xiaozhenliu-gg5 Jan 22, 2020
78861ba
clean
xiaozhenliu-gg5 Jan 23, 2020
ccdf9db
add timeout for end-to-end test
xiaozhenliu-gg5 Jan 23, 2020
a28af46
expand timeout for testExpand
xiaozhenliu-gg5 Jan 23, 2020
435b9e0
make test work
xiaozhenliu-gg5 Jan 23, 2020
6e32865
expand timeout
xiaozhenliu-gg5 Jan 23, 2020
1ed0f10
fix
xiaozhenliu-gg5 Jan 23, 2020
1110cad
test
xiaozhenliu-gg5 Jan 23, 2020
32675cc
test will work
xiaozhenliu-gg5 Jan 23, 2020
28ddcf3
feedback
xiaozhenliu-gg5 Jan 23, 2020
a506d74
lint
xiaozhenliu-gg5 Jan 23, 2020
cc3d286
Merge branch 'master' into page_async
alexander-fenster Mar 20, 2020
c6d423c
fix: no third party promises anymore
alexander-fenster Mar 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion src/paginationCalls/pageDescriptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,22 @@ import * as ended from 'is-stream-ended';
import {PassThrough, Transform} from 'stream';

import {APICaller} from '../apiCaller';
import {GaxCall, APICallback} from '../apitypes';
import {
GaxCall,
APICallback,
SimpleCallbackFunction,
RequestType,
GaxCallPromise,
} from '../apitypes';
import {Descriptor} from '../descriptor';
import {CallSettings} from '../gax';
import {NormalApiCaller} from '../normalCalls/normalApiCaller';

import {PagedApiCaller} from './pagedApiCaller';
import * as call_1 from '../call';
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved

let resolveFunction: Function;
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
let resolveRequest: Function;

/**
* A descriptor for methods that support pagination.
Expand Down Expand Up @@ -103,6 +113,102 @@ export class PageDescriptor implements Descriptor {
return stream;
}

// create async iterator(settings) => iterable
async(apiCall: GaxCall, request: {}, options: CallSettings): {} {
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
const iterable = this.createIterator(options);
const funcPromise =
typeof apiCall === 'function' ? Promise.resolve(apiCall) : apiCall;
funcPromise
.then((func: GaxCall) => {
this.resolveParams(request, func, options);
})
.catch(error => {
throw new Error(error);
});
return iterable;
}

createIterator(options: CallSettings): {} {
const responsePageTokenFieldName = this.responsePageTokenField;
const requestPageTokenFieldName = this.requestPageTokenField;
const asyncIterable = {
[Symbol.asyncIterator]() {
const funcPromise = new Promise((resolve, reject) => {
resolveFunction = resolve;
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
});
const requestPromise = new Promise((resolve, reject) => {
resolveRequest = resolve;
});
const cache: Array<{}> = [];
let nextPageRequest: RequestType = {};
let firstCall = true;
return {
async next() {
const ongoingCall = new call_1.OngoingCallPromise(options.promise);

xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
const func = (await funcPromise) as SimpleCallbackFunction;
const request = (await requestPromise) as RequestType;
if (firstCall) {
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
ongoingCall.call(func, request);
const [
response,
nextRequest,
rawresponse,
] = await ongoingCall.promise;
//@ts-ignore
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
cache.push(...response.responses.map(r => r.content));
//@ts-ignore
const pageToken = response[responsePageTokenFieldName];
if (pageToken) {
nextPageRequest = Object.assign({}, request);
nextPageRequest[requestPageTokenFieldName] = pageToken;
}
firstCall = false;
return Promise.resolve({done: false, value: cache.shift()});
} else {
if (cache.length > 0) {
const value = cache.shift();
return Promise.resolve({done: false, value});
} else if (nextPageRequest) {
ongoingCall.call(func, nextPageRequest);
const [
response,
nextRequest,
rawResponse,
] = await ongoingCall.promise;
//@ts-ignore
const pageToken = response[responsePageTokenFieldName];
if (pageToken) {
nextPageRequest[requestPageTokenFieldName] = pageToken;
}
//@ts-ignore
else nextPageRequest = null;
//@ts-ignore
cache.push(...response.responses.map(r => r.content));
const value = cache.shift();
return Promise.resolve({done: false, value});
} else {
return Promise.resolve({done: true, value: -1});
}
}
},
};
},
};
return asyncIterable; // return iterable
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
}

resolveParams(request: RequestType, func: GaxCall, settings: CallSettings) {
if (settings.pageToken) {
request[this.requestPageTokenField] = settings.pageToken;
}
if (settings.pageSize) {
request[this.requestPageSizeField!] = settings.pageSize;
}
resolveRequest(request);
resolveFunction(func);
}

getApiCaller(settings: CallSettings): APICaller {
if (!settings.autoPaginate) {
return new NormalApiCaller();
Expand Down
15 changes: 15 additions & 0 deletions test/fixtures/google-gax-packaging-test-app/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async function testShowcase() {
await testEcho(grpcClient);
await testExpand(grpcClient);
await testPagedExpand(grpcClient);
await testPagedExpandAsync(grpcClient);
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
await testCollect(grpcClient);
await testChat(grpcClient);
await testWait(grpcClient);
Expand Down Expand Up @@ -136,6 +137,20 @@ async function testPagedExpand(client) {
assert.deepStrictEqual(words, result);
}

async function testPagedExpandAsync(client) {
const words = ['nobody', 'ever', 'reads', 'test', 'input'];
const request = {
content: words.join(' '),
pageSize: 2,
};
const response = [];
const iterable = client.pagedExpandAsync(request);
for await (const resource of iterable){
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
response.push(resource);
}
assert.deepStrictEqual(words, response);
}

async function testCollect(client) {
const words = ['nobody', 'ever', 'reads', 'test', 'input'];
const result = await new Promise((resolve, reject) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ class EchoClient {
'wait',
'pagedExpand',
];
this._innerCallPromise = {};
alexander-fenster marked this conversation as resolved.
Show resolved Hide resolved
for (const methodName of echoStubMethods) {
const innerCallPromise = echoStub.then(
this._innerCallPromise[methodName] = echoStub.then(
stub => (...args) => {
return stub[methodName].apply(stub, args);
},
Expand All @@ -207,7 +208,7 @@ class EchoClient {
}
);
this._innerApiCalls[methodName] = gaxModule.createApiCall(
innerCallPromise,
this._innerCallPromise[methodName],
defaults[methodName],
this._descriptors.page[methodName] ||
this._descriptors.stream[methodName] ||
Expand Down Expand Up @@ -584,6 +585,13 @@ class EchoClient {

return this._innerApiCalls.pagedExpand(request, options, callback);
}

pagedExpandAsync(request, options) {
options = options || {};
request = request || {};
const callSettings = new gax.CallSettings(options);
return this._descriptors.page.pagedExpand.async(this._innerCallPromise['pagedExpand'], request, callSettings);
}
}

module.exports = EchoClient;
39 changes: 39 additions & 0 deletions test/fixtures/google-gax-packaging-test-app/test/gapic-v1beta1.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,45 @@ describe('EchoClient', () => {
});
});

it('invokes pagedExpand using async iterator', async () => {
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
const client = new showcaseModule.v1beta1.EchoClient({
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});

// Mock request
const request = {};

// Mock response
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
const nextPageToken = '';
const responsesElement = {};
const expectedResponse = {
nextPageToken: nextPageToken,
responses: responsesElement,
};

client._descriptors.page.pagedExpand.async = (apiCall, request, options) => {
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
async next(){
return Promise.resolve({done: true, value: -1});
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
return asyncIterable;
}

// test paging method by async iterator
const response = {};
const iterable = client.pagedExpandAsync(request);
for await (const resource of iterable){
response.push(resource);
}
assert.deepStrictEqual(response, expectedResponse.responses);
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
});

it('invokes pagedExpand with error', done => {
const client = new showcaseModule.v1beta1.EchoClient({
credentials: {client_email: 'bogus', private_key: 'bogus'},
Expand Down
29 changes: 29 additions & 0 deletions test/unit/pagedIteration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {APICallback, GaxCallPromise} from '../../src/apitypes';

import * as util from './utils';
import {Stream} from 'stream';
import * as gax from '../../src/gax';

describe('paged iteration', () => {
const pageSize = 3;
Expand Down Expand Up @@ -194,6 +195,34 @@ describe('paged iteration', () => {
});
});

describe('use async iterator', () => {
// tslint:disable-next-line no-any
let spy: any;
let apiCall: GaxCallPromise;
beforeEach(() => {
spy = sinon.spy(func);
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
apiCall = util.createApiCall(spy, createOptions);
});

async function iterableChecker(
// tslint:disable-next-line no-any
iterable: any
xiaozhenliu-gg5 marked this conversation as resolved.
Show resolved Hide resolved
) {
let counter = 0;
for await (const resource of iterable) {
counter++;
if (counter === 10) break;
}
expect(counter).to.equal(10);
}
it('returns an iterable, count to 10', () => {
const settings = new gax.CallSettings(
(createOptions && createOptions.settings) || {}
);
iterableChecker(descriptor.async(apiCall, {}, settings));
});
});

describe('stream conversion', () => {
// tslint:disable-next-line no-any
let spy: any;
Expand Down