Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asStream support #34

Merged
merged 3 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ export type {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream
} from './lib/connection'

export type {
Expand All @@ -47,6 +49,7 @@ export type {
} from './lib/pool'

export type {
TransportOptions,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @mshustov @pgayvallet now we are exporting the TransportOptions as well! 🎉

TransportRequestParams,
TransportRequestOptions,
TransportRequestOptionsWithMeta,
Expand Down
19 changes: 8 additions & 11 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,6 @@ export default class Transport {
)
}

// TODO: fixme
// if (options.asStream === true) params.asStream = true

// handle compression
if (connectionParams.body !== '' && connectionParams.body != null) {
if (isStream(connectionParams.body)) {
Expand Down Expand Up @@ -454,7 +451,8 @@ export default class Transport {
maxResponseSize,
maxCompressedResponseSize,
signal,
timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout])
timeout: toMs(options.requestTimeout != null ? options.requestTimeout : this[kRequestTimeout]),
...(options.asStream === true ? { asStream: true } : null)
})
result.statusCode = statusCode
result.headers = headers
Expand All @@ -463,6 +461,12 @@ export default class Transport {
throw new ProductNotSupportedError(this[kProductCheck] as string, result)
}

if (options.asStream === true) {
result.body = body
this[kDiagnostic].emit('response', null, result)
return returnMeta ? result : body
}

const contentEncoding = (headers['content-encoding'] ?? '').toLowerCase()
if (contentEncoding.includes('gzip') || contentEncoding.includes('deflate')) {
body = await unzip(body)
Expand All @@ -473,13 +477,6 @@ export default class Transport {
body = body.toString()
}

// TODO: fixme
// if (options.asStream === true) {
// result.body = response
// this[kDiagnostic].emit('response', null, result)
// return result
// }

const isHead = params.method === 'HEAD'
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
Expand Down
15 changes: 13 additions & 2 deletions src/connection/BaseConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,26 @@ export interface ConnectionRequestOptions {
context: any
maxResponseSize?: number
maxCompressedResponseSize?: number
asStream?: boolean
signal?: AbortSignal
timeout?: number
}

export interface ConnectionRequestOptionsAsStream extends ConnectionRequestOptions {
asStream: true
}

export interface ConnectionRequestResponse {
body: string | Buffer
headers: http.IncomingHttpHeaders
statusCode: number
}

export interface ConnectionRequestResponseAsStream {
body: ReadableStream
headers: http.IncomingHttpHeaders
statusCode: number
}

export default class BaseConnection {
url: URL
tls: TlsConnectionOptions | null
Expand Down Expand Up @@ -127,7 +136,9 @@ export default class BaseConnection {
}

/* istanbul ignore next */
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse>
async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream>
async request (params: ConnectionRequestParams, options: any): Promise<any> {
throw new ConfigurationError('The request method should be implemented by extended classes')
}

Expand Down
17 changes: 15 additions & 2 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* under the License.
*/

/* eslint-disable @typescript-eslint/restrict-template-expressions */

import hpagent from 'hpagent'
import http from 'http'
import https from 'https'
Expand All @@ -27,7 +29,9 @@ import BaseConnection, {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream,
getIssuerCertificate
} from './BaseConnection'
import { kCaFingerprint } from '../symbols'
Expand Down Expand Up @@ -87,7 +91,9 @@ export default class HttpConnection extends BaseConnection {
: https.request
}

async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse>
async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream>
async request (params: ConnectionRequestParams, options: any): Promise<any> {
return await new Promise((resolve, reject) => {
let cleanedListeners = false

Expand All @@ -109,7 +115,6 @@ export default class HttpConnection extends BaseConnection {

this._openRequests++
if (options.signal != null) {
// @ts-expect-error
options.signal.addEventListener(
'abort',
() => request.abort(),
Expand All @@ -121,6 +126,14 @@ export default class HttpConnection extends BaseConnection {
cleanListeners()
this._openRequests--

if (options.asStream === true) {
return resolve({
body: response,
statusCode: response.statusCode as number,
headers: response.headers
})
}

const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase()
const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate')
const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile')
Expand Down
17 changes: 15 additions & 2 deletions src/connection/UndiciConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
* under the License.
*/

/* eslint-disable @typescript-eslint/restrict-template-expressions */

import { EventEmitter } from 'events'
import Debug from 'debug'
import buffer from 'buffer'
Expand All @@ -26,7 +28,9 @@ import BaseConnection, {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream,
getIssuerCertificate
} from './BaseConnection'
import { Pool, buildConnector, Dispatcher } from 'undici'
Expand Down Expand Up @@ -109,7 +113,9 @@ export default class Connection extends BaseConnection {
this.pool = new Pool(this.url.toString(), undiciOptions)
}

async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
async request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse>
async request (params: ConnectionRequestParams, options: ConnectionRequestOptionsAsStream): Promise<ConnectionRequestResponseAsStream>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it mean that we need to re-generate all the API type definitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, the connection.request is only used by the transport, and its signature hasn't changed.
In the client you might need to use directly client.transport.request<Readable>, as the surface API assumes you are using JSON.

async request (params: ConnectionRequestParams, options: any): Promise<any> {
const maxResponseSize = options.maxResponseSize ?? MAX_STRING_LENGTH
const maxCompressedResponseSize = options.maxCompressedResponseSize ?? MAX_BUFFER_LENGTH
const requestParams = {
Expand All @@ -135,7 +141,6 @@ export default class Connection extends BaseConnection {
timeoutId = setTimeout(() => {
timedout = true
if (options.signal != null) {
// @ts-expect-error
options.signal.dispatchEvent('abort')
} else {
this[kEmitter].emit('abort')
Expand Down Expand Up @@ -168,6 +173,14 @@ export default class Connection extends BaseConnection {
}
}

if (options.asStream === true) {
return {
statusCode: response.statusCode,
headers: response.headers,
body: response.body
}
}

const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase()
const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate') // eslint-disable-line
const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile')
Expand Down
4 changes: 3 additions & 1 deletion src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ export type {
ConnectionOptions,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse
ConnectionRequestOptionsAsStream,
ConnectionRequestResponse,
ConnectionRequestResponseAsStream
} from './BaseConnection'

export {
Expand Down
62 changes: 31 additions & 31 deletions test/unit/http-connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,37 +467,6 @@ test('Custom headers for connection', async t => {
server.stop()
})

// // TODO: add a check that the response is not decompressed
// test('asStream set to true', t => {
// t.plan(2)

// function handler (req, res) {
// res.end('ok')
// }

// buildServer(handler, ({ port }, server) => {
// const connection = new Connection({
// url: new URL(`http://localhost:${port}`)
// })
// connection.request({
// path: '/hello',
// method: 'GET',
// asStream: true
// }, (err, res) => {
// t.error(err)

// let payload = ''
// res.setEncoding('utf8')
// res.on('data', chunk => { payload += chunk })
// res.on('error', err => t.fail(err))
// res.on('end', () => {
// t.equal(payload, 'ok')
// server.stop()
// })
// })
// })
// })

test('Ipv6 support', t => {
const connection = new HttpConnection({
url: new URL('http://[::1]:9200')
Expand Down Expand Up @@ -1172,3 +1141,34 @@ test('Should decrease the request count if a request never sent', async t => {
}
t.equal(connection._openRequests, 0)
})

test('as stream', async t => {
t.plan(2)

function handler (req: http.IncomingMessage, res: http.ServerResponse) {
res.end('ok')
}

const [{ port }, server] = await buildServer(handler)
const connection = new HttpConnection({
url: new URL(`http://localhost:${port}`)
})
const res = await connection.request({
path: '/',
method: 'GET'
}, {
asStream: true,
requestId: 42,
name: 'test',
context: null
})
t.ok(res.body instanceof Readable)
res.body.setEncoding('utf8')
let payload = ''
for await (const chunk of res.body) {
payload += chunk
}
t.equal(payload, 'ok')
server.stop()
})

Loading