diff --git a/src/Transport.ts b/src/Transport.ts index 3867c2c..38931fc 100644 --- a/src/Transport.ts +++ b/src/Transport.ts @@ -36,6 +36,7 @@ import { ErrorOptions } from './errors' import { Connection, ConnectionRequestParams } from './connection' +import { isBinary } from './connection/BaseConnection' import Diagnostic from './Diagnostic' import Serializer from './Serializer' import { Readable as ReadableStream } from 'node:stream' @@ -556,18 +557,7 @@ export default class Transport { body = await unzip(body) } - const binaryTypes = [ - 'application/vnd.mapbox-vector-tile', - 'application/vnd.apache.arrow.stream', - 'application/vnd.elasticsearch+arrow+stream', - 'application/smile', - 'application/vnd.elasticsearch+smile', - 'application/cbor', - 'application/vnd.elasticsearch+cbor' - ] - const contentType = headers['content-type'] ?? '' - const isBinary = binaryTypes.map(type => contentType.includes(type)).includes(true) - if (Buffer.isBuffer(body) && !isBinary) { + if (Buffer.isBuffer(body) && !isBinary(headers['content-type'] ?? '')) { body = body.toString() } diff --git a/src/connection/BaseConnection.ts b/src/connection/BaseConnection.ts index 4ba90ee..2cd86df 100644 --- a/src/connection/BaseConnection.ts +++ b/src/connection/BaseConnection.ts @@ -248,3 +248,19 @@ export function isCaFingerprintMatch (cert1: string | null, cert2: string | null } return cert1 === cert2 } + +export function isBinary (contentType: string | string[]): boolean { + const binaryTypes = [ + 'application/vnd.mapbox-vector-tile', + 'application/vnd.apache.arrow.stream', + 'application/vnd.elasticsearch+arrow+stream', + 'application/smile', + 'application/vnd.elasticsearch+smile', + 'application/cbor', + 'application/vnd.elasticsearch+cbor' + ] + + return binaryTypes + .map(type => contentType.includes(type)) + .includes(true) +} diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index e0ca3c7..219d730 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -33,7 +33,8 @@ import BaseConnection, { ConnectionRequestResponse, ConnectionRequestResponseAsStream, getIssuerCertificate, - isCaFingerprintMatch + isCaFingerprintMatch, + isBinary } from './BaseConnection' import { kCaFingerprint } from '../symbols' import { Readable as ReadableStream, pipeline } from 'node:stream' @@ -147,7 +148,7 @@ export default class HttpConnection extends BaseConnection { const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase() const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate') - const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile') + const bodyIsBinary = isBinary(response.headers['content-type'] ?? '') /* istanbul ignore else */ if (response.headers['content-length'] !== undefined) { @@ -167,8 +168,8 @@ export default class HttpConnection extends BaseConnection { // if the response is compressed, we must handle it // as buffer for allowing decompression later - let payload = isCompressed || isVectorTile ? new Array() : '' - const onData = isCompressed || isVectorTile ? onDataAsBuffer : onDataAsString + let payload = isCompressed || bodyIsBinary ? new Array() : '' + const onData = isCompressed || bodyIsBinary ? onDataAsBuffer : onDataAsString let currentLength = 0 function onDataAsBuffer (chunk: Buffer): void { @@ -208,13 +209,13 @@ export default class HttpConnection extends BaseConnection { } resolve({ - body: isCompressed || isVectorTile ? Buffer.concat(payload as Buffer[]) : payload as string, + body: isCompressed || bodyIsBinary ? Buffer.concat(payload as Buffer[]) : payload as string, statusCode: response.statusCode as number, headers: response.headers }) } - if (!isCompressed && !isVectorTile) { + if (!isCompressed && !bodyIsBinary) { response.setEncoding('utf8') } diff --git a/src/connection/UndiciConnection.ts b/src/connection/UndiciConnection.ts index 7f62c87..b450fe6 100644 --- a/src/connection/UndiciConnection.ts +++ b/src/connection/UndiciConnection.ts @@ -31,7 +31,8 @@ import BaseConnection, { ConnectionRequestResponse, ConnectionRequestResponseAsStream, getIssuerCertificate, - isCaFingerprintMatch + isCaFingerprintMatch, + isBinary } from './BaseConnection' import { Pool, buildConnector, Dispatcher } from 'undici' import { @@ -182,7 +183,7 @@ export default class Connection extends BaseConnection { // @ts-expect-error Assume header is not string[] for now. const contentEncoding = (response.headers['content-encoding'] ?? '').toLowerCase() const isCompressed = contentEncoding.includes('gzip') || contentEncoding.includes('deflate') // eslint-disable-line - const isVectorTile = (response.headers['content-type'] ?? '').includes('application/vnd.mapbox-vector-tile') + const bodyIsBinary = isBinary(response.headers['content-type'] ?? '') /* istanbul ignore else */ if (response.headers['content-length'] !== undefined) { @@ -198,7 +199,7 @@ export default class Connection extends BaseConnection { this.diagnostic.emit('deserialization', null, options) try { - if (isCompressed || isVectorTile) { // eslint-disable-line + if (isCompressed || bodyIsBinary) { // eslint-disable-line let currentLength = 0 const payload: Buffer[] = [] for await (const chunk of response.body) { diff --git a/test/unit/http-connection.test.ts b/test/unit/http-connection.test.ts index fb9d1b7..449a35e 100644 --- a/test/unit/http-connection.test.ts +++ b/test/unit/http-connection.test.ts @@ -1113,6 +1113,28 @@ test('Support mapbox vector tile', async t => { server.stop() }) +test('Support Apache Arrow', async t => { + t.plan(1) + + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + function handler (req: http.IncomingMessage, res: http.ServerResponse) { + res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream') + res.end(Buffer.from(binaryContent, 'base64')) + } + + const [{ port }, server] = await buildServer(handler) + const connection = new HttpConnection({ + url: new URL(`http://localhost:${port}`) + }) + const res = await connection.request({ + path: '/_query', + method: 'POST', + }, options) + t.equal(res.body.toString('base64'), Buffer.from(binaryContent, 'base64').toString('base64')) + server.stop() +}) + test('Check server fingerprint (success)', async t => { t.plan(1) diff --git a/test/unit/undici-connection.test.ts b/test/unit/undici-connection.test.ts index 78693ef..b03ea24 100644 --- a/test/unit/undici-connection.test.ts +++ b/test/unit/undici-connection.test.ts @@ -983,6 +983,28 @@ test('Support mapbox vector tile', async t => { server.stop() }) +test('Support Apache Arrow', async t => { + t.plan(1) + + const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA=' + + function handler (_req: http.IncomingMessage, res: http.ServerResponse) { + res.setHeader('Content-Type', 'application/vnd.apache.arrow.stream') + res.end(Buffer.from(binaryContent, 'base64')) + } + + const [{ port }, server] = await buildServer(handler) + const connection = new UndiciConnection({ + url: new URL(`http://localhost:${port}`) + }) + const res = await connection.request({ + path: '/_query', + method: 'POST', + }, options) + t.equal(res.body.toString('base64'), Buffer.from(binaryContent, 'base64').toString('base64')) + server.stop() +}) + test('Check server fingerprint (success)', async t => { t.plan(1)