Skip to content

Commit

Permalink
Support params when streaming (#238)
Browse files Browse the repository at this point in the history
* Refactor init stream request out of constructor

* Support params when streaming

And fix headers to match original user's request
  • Loading branch information
ejizba authored Mar 19, 2024
1 parent 5561010 commit 7e61ae1
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"@typescript-eslint/no-empty-interface": "off",
"@typescript-eslint/no-explicit-any": "off",
"@typescript-eslint/no-namespace": "off",
"@typescript-eslint/no-unused-vars": ["error", { "argsIgnorePattern": "^_" }],
"@typescript-eslint/no-unused-vars": ["error", { "argsIgnorePattern": "^_", "ignoreRestSiblings": true }],
"prefer-const": ["error", { "destructuring": "all" }],
"@typescript-eslint/explicit-member-accessibility": [
"error",
Expand Down
4 changes: 2 additions & 2 deletions src/InvocationModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { toRpcHttp } from './converters/toRpcHttp';
import { toRpcTypedData } from './converters/toRpcTypedData';
import { AzFuncSystemError } from './errors';
import { waitForProxyRequest } from './http/httpProxy';
import { HttpRequest } from './http/HttpRequest';
import { createStreamRequest } from './http/HttpRequest';
import { InvocationContext } from './InvocationContext';
import { isHttpStreamEnabled } from './setup';
import { isHttpTrigger, isTimerTrigger, isTrigger } from './utils/isTrigger';
Expand Down Expand Up @@ -78,7 +78,7 @@ export class InvocationModel implements coreTypes.InvocationModel {
let input: unknown;
if (isHttpTrigger(bindingType) && isHttpStreamEnabled()) {
const proxyRequest = await waitForProxyRequest(this.#coreCtx.invocationId);
input = new HttpRequest({ ...binding.data?.http, proxyRequest });
input = createStreamRequest(proxyRequest, nonNullProp(req, 'triggerMetadata'));
} else {
input = fromRpcTypedData(binding.data);
}
Expand Down
111 changes: 61 additions & 50 deletions src/http/HttpRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@

import * as types from '@azure/functions';
import { HttpRequestParams, HttpRequestUser } from '@azure/functions';
import { RpcHttpData } from '@azure/functions-core';
import { RpcHttpData, RpcTypedData } from '@azure/functions-core';
import { Blob } from 'buffer';
import { IncomingMessage } from 'http';
import * as stream from 'stream';
import { ReadableStream } from 'stream/web';
import { FormData, Headers, Request as uRequest } from 'undici';
import { FormData, Headers, HeadersInit, Request as uRequest } from 'undici';
import { URLSearchParams } from 'url';
import { fromNullableMapping } from '../converters/fromRpcNullable';
import { fromRpcTypedData } from '../converters/fromRpcTypedData';
import { AzFuncSystemError } from '../errors';
import { nonNullProp } from '../utils/nonNull';
import { isDefined, nonNullProp } from '../utils/nonNull';
import { extractHttpUserFromHeaders } from './extractHttpUserFromHeaders';

interface InternalHttpRequestInit extends RpcHttpData {
undiciRequest?: uRequest;
proxyRequest?: IncomingMessage;
}

type RequestInitResult = [uRequest, URLSearchParams, HttpRequestParams];

export class HttpRequest implements types.HttpRequest {
readonly query: URLSearchParams;
readonly params: HttpRequestParams;
Expand All @@ -33,14 +31,6 @@ export class HttpRequest implements types.HttpRequest {
constructor(init: InternalHttpRequestInit) {
this.#init = init;

if (init.proxyRequest) {
[this.#uReq, this.query, this.params] = this.#initStreamRequest(init);
} else {
[this.#uReq, this.query, this.params] = this.#initInMemoryRequest(init);
}
}

#initInMemoryRequest(init: InternalHttpRequestInit): RequestInitResult {
let uReq = init.undiciRequest;
if (!uReq) {
const url = nonNullProp(init, 'url');
Expand All @@ -58,45 +48,15 @@ export class HttpRequest implements types.HttpRequest {
headers: fromNullableMapping(init.nullableHeaders, init.headers),
});
}
this.#uReq = uReq;

const query = new URLSearchParams(fromNullableMapping(init.nullableQuery, init.query));
const params = fromNullableMapping(init.nullableParams, init.params);

return [uReq, query, params];
}

#initStreamRequest(init: InternalHttpRequestInit): RequestInitResult {
const proxyReq = nonNullProp(init, 'proxyRequest');

const hostHeaderName = 'x-forwarded-host';
const protoHeaderName = 'x-forwarded-proto';
const host = proxyReq.headers[hostHeaderName];
const proto = proxyReq.headers[protoHeaderName];
if (typeof host !== 'string' || typeof proto !== 'string') {
throw new AzFuncSystemError(`Expected headers "${hostHeaderName}" and "${protoHeaderName}" to be set.`);
}
const url = `${proto}://${host}${nonNullProp(proxyReq, 'url')}`;

let uReq = init.undiciRequest;
if (!uReq) {
let body: stream.Readable | undefined;
const lowerMethod = proxyReq.method?.toLowerCase();
if (lowerMethod !== 'get' && lowerMethod !== 'head') {
body = proxyReq;
}

uReq = new uRequest(url, {
body: body,
duplex: 'half',
method: nonNullProp(proxyReq, 'method'),
headers: <Record<string, string | ReadonlyArray<string>>>proxyReq.headers,
});
if (init.nullableQuery || init.query) {
this.query = new URLSearchParams(fromNullableMapping(init.nullableQuery, init.query));
} else {
this.query = new URL(this.#uReq.url).searchParams;
}

const query = new URL(url).searchParams;
const params = fromNullableMapping(init.nullableParams, init.params);

return [uReq, query, params];
this.params = fromNullableMapping(init.nullableParams, init.params);
}

get url(): string {
Expand Down Expand Up @@ -153,3 +113,54 @@ export class HttpRequest implements types.HttpRequest {
return new HttpRequest(newInit);
}
}

export function createStreamRequest(
proxyReq: IncomingMessage,
triggerMetadata: Record<string, RpcTypedData>
): HttpRequest {
const hostHeaderName = 'x-forwarded-host';
const protoHeaderName = 'x-forwarded-proto';
const host = proxyReq.headers[hostHeaderName];
const proto = proxyReq.headers[protoHeaderName];
if (typeof host !== 'string' || typeof proto !== 'string') {
throw new AzFuncSystemError(`Expected headers "${hostHeaderName}" and "${protoHeaderName}" to be set.`);
}
const url = `${proto}://${host}${nonNullProp(proxyReq, 'url')}`;

let body: stream.Readable | undefined;
const lowerMethod = proxyReq.method?.toLowerCase();
if (lowerMethod !== 'get' && lowerMethod !== 'head') {
body = proxyReq;
}

// Get headers and params from trigger metadata
// See here for more info: https://github.com/Azure/azure-functions-host/issues/9840
// NOTE: We ignore query info because it has this bug: https://github.com/Azure/azure-functions-nodejs-library/issues/168
const { Query: rpcQueryIgnored, Headers: rpcHeaders, ...rpcParams } = triggerMetadata;

let headers: HeadersInit | undefined;
const headersData = fromRpcTypedData(rpcHeaders);
if (typeof headersData === 'object' && isDefined(headersData)) {
headers = <HeadersInit>headersData;
}

const uReq = new uRequest(url, {
body,
duplex: 'half',
method: nonNullProp(proxyReq, 'method'),
headers,
});

const params: Record<string, string> = {};
for (const [key, rpcValue] of Object.entries(rpcParams)) {
const value = fromRpcTypedData(rpcValue);
if (typeof value === 'string') {
params[key] = value;
}
}

return new HttpRequest({
undiciRequest: uReq,
params,
});
}

0 comments on commit 7e61ae1

Please sign in to comment.