From 7e61ae1b5fb000e70d691cacb76a33a5e40958c2 Mon Sep 17 00:00:00 2001 From: Eric Jizba Date: Tue, 19 Mar 2024 11:02:47 -0700 Subject: [PATCH] Support params when streaming (#238) * Refactor init stream request out of constructor * Support params when streaming And fix headers to match original user's request --- .eslintrc.json | 2 +- src/InvocationModel.ts | 4 +- src/http/HttpRequest.ts | 111 ++++++++++++++++++++++------------------ 3 files changed, 64 insertions(+), 53 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index 7ae6a5c..d40f38b 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -22,7 +22,7 @@ "@typescript-eslint/no-empty-interface": "off", "@typescript-eslint/no-explicit-any": "off", "@typescript-eslint/no-namespace": "off", - "@typescript-eslint/no-unused-vars": ["error", { "argsIgnorePattern": "^_" }], + "@typescript-eslint/no-unused-vars": ["error", { "argsIgnorePattern": "^_", "ignoreRestSiblings": true }], "prefer-const": ["error", { "destructuring": "all" }], "@typescript-eslint/explicit-member-accessibility": [ "error", diff --git a/src/InvocationModel.ts b/src/InvocationModel.ts index 6072359..bfb9227 100644 --- a/src/InvocationModel.ts +++ b/src/InvocationModel.ts @@ -22,7 +22,7 @@ import { toRpcHttp } from './converters/toRpcHttp'; import { toRpcTypedData } from './converters/toRpcTypedData'; import { AzFuncSystemError } from './errors'; import { waitForProxyRequest } from './http/httpProxy'; -import { HttpRequest } from './http/HttpRequest'; +import { createStreamRequest } from './http/HttpRequest'; import { InvocationContext } from './InvocationContext'; import { isHttpStreamEnabled } from './setup'; import { isHttpTrigger, isTimerTrigger, isTrigger } from './utils/isTrigger'; @@ -78,7 +78,7 @@ export class InvocationModel implements coreTypes.InvocationModel { let input: unknown; if (isHttpTrigger(bindingType) && isHttpStreamEnabled()) { const proxyRequest = await waitForProxyRequest(this.#coreCtx.invocationId); - input = new HttpRequest({ ...binding.data?.http, proxyRequest }); + input = createStreamRequest(proxyRequest, nonNullProp(req, 'triggerMetadata')); } else { input = fromRpcTypedData(binding.data); } diff --git a/src/http/HttpRequest.ts b/src/http/HttpRequest.ts index 522e34c..bd7865d 100644 --- a/src/http/HttpRequest.ts +++ b/src/http/HttpRequest.ts @@ -3,25 +3,23 @@ import * as types from '@azure/functions'; import { HttpRequestParams, HttpRequestUser } from '@azure/functions'; -import { RpcHttpData } from '@azure/functions-core'; +import { RpcHttpData, RpcTypedData } from '@azure/functions-core'; import { Blob } from 'buffer'; import { IncomingMessage } from 'http'; import * as stream from 'stream'; import { ReadableStream } from 'stream/web'; -import { FormData, Headers, Request as uRequest } from 'undici'; +import { FormData, Headers, HeadersInit, Request as uRequest } from 'undici'; import { URLSearchParams } from 'url'; import { fromNullableMapping } from '../converters/fromRpcNullable'; +import { fromRpcTypedData } from '../converters/fromRpcTypedData'; import { AzFuncSystemError } from '../errors'; -import { nonNullProp } from '../utils/nonNull'; +import { isDefined, nonNullProp } from '../utils/nonNull'; import { extractHttpUserFromHeaders } from './extractHttpUserFromHeaders'; interface InternalHttpRequestInit extends RpcHttpData { undiciRequest?: uRequest; - proxyRequest?: IncomingMessage; } -type RequestInitResult = [uRequest, URLSearchParams, HttpRequestParams]; - export class HttpRequest implements types.HttpRequest { readonly query: URLSearchParams; readonly params: HttpRequestParams; @@ -33,14 +31,6 @@ export class HttpRequest implements types.HttpRequest { constructor(init: InternalHttpRequestInit) { this.#init = init; - if (init.proxyRequest) { - [this.#uReq, this.query, this.params] = this.#initStreamRequest(init); - } else { - [this.#uReq, this.query, this.params] = this.#initInMemoryRequest(init); - } - } - - #initInMemoryRequest(init: InternalHttpRequestInit): RequestInitResult { let uReq = init.undiciRequest; if (!uReq) { const url = nonNullProp(init, 'url'); @@ -58,45 +48,15 @@ export class HttpRequest implements types.HttpRequest { headers: fromNullableMapping(init.nullableHeaders, init.headers), }); } + this.#uReq = uReq; - const query = new URLSearchParams(fromNullableMapping(init.nullableQuery, init.query)); - const params = fromNullableMapping(init.nullableParams, init.params); - - return [uReq, query, params]; - } - - #initStreamRequest(init: InternalHttpRequestInit): RequestInitResult { - const proxyReq = nonNullProp(init, 'proxyRequest'); - - const hostHeaderName = 'x-forwarded-host'; - const protoHeaderName = 'x-forwarded-proto'; - const host = proxyReq.headers[hostHeaderName]; - const proto = proxyReq.headers[protoHeaderName]; - if (typeof host !== 'string' || typeof proto !== 'string') { - throw new AzFuncSystemError(`Expected headers "${hostHeaderName}" and "${protoHeaderName}" to be set.`); - } - const url = `${proto}://${host}${nonNullProp(proxyReq, 'url')}`; - - let uReq = init.undiciRequest; - if (!uReq) { - let body: stream.Readable | undefined; - const lowerMethod = proxyReq.method?.toLowerCase(); - if (lowerMethod !== 'get' && lowerMethod !== 'head') { - body = proxyReq; - } - - uReq = new uRequest(url, { - body: body, - duplex: 'half', - method: nonNullProp(proxyReq, 'method'), - headers: >>proxyReq.headers, - }); + if (init.nullableQuery || init.query) { + this.query = new URLSearchParams(fromNullableMapping(init.nullableQuery, init.query)); + } else { + this.query = new URL(this.#uReq.url).searchParams; } - const query = new URL(url).searchParams; - const params = fromNullableMapping(init.nullableParams, init.params); - - return [uReq, query, params]; + this.params = fromNullableMapping(init.nullableParams, init.params); } get url(): string { @@ -153,3 +113,54 @@ export class HttpRequest implements types.HttpRequest { return new HttpRequest(newInit); } } + +export function createStreamRequest( + proxyReq: IncomingMessage, + triggerMetadata: Record +): HttpRequest { + const hostHeaderName = 'x-forwarded-host'; + const protoHeaderName = 'x-forwarded-proto'; + const host = proxyReq.headers[hostHeaderName]; + const proto = proxyReq.headers[protoHeaderName]; + if (typeof host !== 'string' || typeof proto !== 'string') { + throw new AzFuncSystemError(`Expected headers "${hostHeaderName}" and "${protoHeaderName}" to be set.`); + } + const url = `${proto}://${host}${nonNullProp(proxyReq, 'url')}`; + + let body: stream.Readable | undefined; + const lowerMethod = proxyReq.method?.toLowerCase(); + if (lowerMethod !== 'get' && lowerMethod !== 'head') { + body = proxyReq; + } + + // Get headers and params from trigger metadata + // See here for more info: https://github.com/Azure/azure-functions-host/issues/9840 + // NOTE: We ignore query info because it has this bug: https://github.com/Azure/azure-functions-nodejs-library/issues/168 + const { Query: rpcQueryIgnored, Headers: rpcHeaders, ...rpcParams } = triggerMetadata; + + let headers: HeadersInit | undefined; + const headersData = fromRpcTypedData(rpcHeaders); + if (typeof headersData === 'object' && isDefined(headersData)) { + headers = headersData; + } + + const uReq = new uRequest(url, { + body, + duplex: 'half', + method: nonNullProp(proxyReq, 'method'), + headers, + }); + + const params: Record = {}; + for (const [key, rpcValue] of Object.entries(rpcParams)) { + const value = fromRpcTypedData(rpcValue); + if (typeof value === 'string') { + params[key] = value; + } + } + + return new HttpRequest({ + undiciRequest: uReq, + params, + }); +}