Skip to content

Commit

Permalink
add an abort controller to signal request timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewrobertson committed Apr 25, 2024
1 parent 1c48074 commit 0811780
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 22 deletions.
1 change: 1 addition & 0 deletions docs/generated/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ interface Request_2 extends Request_3 {
rawBody?: Buffer;
spanId?: string;
traceId?: string;
abortController?: AbortController;
}
export { Request_2 as Request }

Expand Down
4 changes: 4 additions & 0 deletions src/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ export interface Request extends ExpressRequest {
* Cloud Trace span ID.
*/
spanId?: string;
/**
* An AbortController used to signal cancellation of a function invocation (e.g. in case of time out).
*/
abortController?: AbortController;
}

/**
Expand Down
7 changes: 5 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,14 @@ export const main = async () => {
// eslint-disable-next-line no-process-exit
process.exit(1);
}

const {userFunction, signatureType} = loadedFunction;
// It is possible to overwrite the configured signature type in code so we
// reset it here based on what we loaded.
options.signatureType = signatureType;
const server = getServer(
userFunction!,
signatureType,
options.enableExecutionId
options,
);
const errorHandler = new ErrorHandler(server);
server
Expand Down
39 changes: 39 additions & 0 deletions src/middleware/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Request, Response } from '../functions';
import { NextFunction } from 'express';

export const timeoutMiddleware = (timeoutMilliseconds: number) => {
return (req: Request, res: Response, next: NextFunction) => {
// In modern versions of Node.js that support the AbortController API we add one to
// signal function timeout.
if (timeoutMilliseconds > 0 && AbortController) {
req.abortController = new AbortController();
req.setTimeout(timeoutMilliseconds);
let executionComplete = false;
res.on('timeout', () => {
// This event is triggered when the underlying socket times out due to inactivity.
if (!executionComplete) {
executionComplete = true;
req.abortController?.abort('timeout');
}
});
req.on('close', () => {
// This event is triggered when the underlying HTTP connection is closed. This can
// happen if the data plane times out the request, the client disconnects or the
// response is complete.
if (!executionComplete) {
executionComplete = true;
req.abortController?.abort('request closed');
}
});
req.on('end', () => {
// This event is triggered when the function execution completes and we
// write an HTTP response.
executionComplete = true;
});
}
// Always call next to continue middleware processing.
next();
};
};


20 changes: 20 additions & 0 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ export interface FrameworkOptions {
* Whether or not to enable execution id support.
*/
enableExecutionId: boolean;
/**
* The request timeout.
*/
timeoutMilliseconds: number;
}

/**
Expand Down Expand Up @@ -112,6 +116,20 @@ const SignatureOption = new ConfigurableOption(
);
}
);
const TimeoutOption = new ConfigurableOption(
'timeout',
'CLOUD_RUN_TIMEOUT_SECONDS',
0,
(x: string | number) => {
if (typeof x === 'string') {
x = parseInt(x, 10)
}
if (isNaN(x) || x < 0) {
throw new OptionsError("Timeout must be a positive integer");
}
return Math.floor(x * 1000);
}
)

export const requiredNodeJsVersionForLogExecutionID = '13.0.0';
const ExecutionIdOption = new ConfigurableOption(
Expand Down Expand Up @@ -158,13 +176,15 @@ export const parseOptions = (
FunctionTargetOption.cliOption,
SignatureOption.cliOption,
SourceLocationOption.cliOption,
TimeoutOption.cliOption,
],
});
return {
port: PortOption.parse(argv, envVars),
target: FunctionTargetOption.parse(argv, envVars),
sourceLocation: SourceLocationOption.parse(argv, envVars),
signatureType: SignatureOption.parse(argv, envVars),
timeoutMilliseconds: TimeoutOption.parse(argv, envVars),
printHelp: cliArgs[2] === '-h' || cliArgs[2] === '--help',
enableExecutionId: ExecutionIdOption.parse(argv, envVars),
};
Expand Down
29 changes: 16 additions & 13 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,29 @@ import * as express from 'express';
import * as http from 'http';
import * as onFinished from 'on-finished';
import {HandlerFunction, Request, Response} from './functions';
import {SignatureType} from './types';
import {setLatestRes} from './invoker';
import {legacyPubSubEventMiddleware} from './pubsub_middleware';
import {cloudEventToBackgroundEventMiddleware} from './middleware/cloud_event_to_background_event';
import {backgroundEventToCloudEventMiddleware} from './middleware/background_event_to_cloud_event';
import {timeoutMiddleware} from './middleware/timeout';
import {wrapUserFunction} from './function_wrappers';
import {asyncLocalStorageMiddleware} from './async_local_storage';
import {executionContextMiddleware} from './execution_context';
import {errorHandler} from './logger';
import {FrameworkOptions} from './options';

/**
* Creates and configures an Express application and returns an HTTP server
* which will run it.
* @param userFunction User's function.
* @param functionSignatureType Type of user's function signature.
* @param options the configured Function Framework options.
* @return HTTP server.
*/
export function getServer(
userFunction: HandlerFunction,
functionSignatureType: SignatureType,
enableExecutionId: boolean
options: FrameworkOptions
): http.Server {

// App to use for function executions.
const app = express();

Expand Down Expand Up @@ -89,7 +90,7 @@ export function getServer(
};

// Apply middleware
if (functionSignatureType !== 'typed') {
if (options.signatureType !== 'typed') {
// If the function is not typed then JSON parsing can be done automatically, otherwise the
// functions format must determine deserialization.
app.use(bodyParser.json(cloudEventsBodySavingOptions));
Expand Down Expand Up @@ -120,23 +121,23 @@ export function getServer(
app.use(asyncLocalStorageMiddleware);

if (
functionSignatureType === 'event' ||
functionSignatureType === 'cloudevent'
options.signatureType === 'event' ||
options.signatureType === 'cloudevent'
) {
// If a Pub/Sub subscription is configured to invoke a user's function directly, the request body
// needs to be marshalled into the structure that wrapEventFunction expects. This unblocks local
// development with the Pub/Sub emulator
app.use(legacyPubSubEventMiddleware);
}

if (functionSignatureType === 'event') {
if (options.signatureType === 'event') {
app.use(cloudEventToBackgroundEventMiddleware);
}
if (functionSignatureType === 'cloudevent') {
if (options.signatureType === 'cloudevent') {
app.use(backgroundEventToCloudEventMiddleware);
}

if (functionSignatureType === 'http') {
if (options.signatureType === 'http') {
app.use('/favicon.ico|/robots.txt', (req, res) => {
// Neither crawlers nor browsers attempting to pull the icon find the body
// contents particularly useful, so we send nothing in the response body.
Expand All @@ -151,16 +152,18 @@ export function getServer(
});
}

app.use(timeoutMiddleware(options.timeoutMilliseconds));

// Set up the routes for the user's function
const requestHandler = wrapUserFunction(userFunction, functionSignatureType);
if (functionSignatureType === 'http') {
const requestHandler = wrapUserFunction(userFunction, options.signatureType);
if (options.signatureType === 'http') {
app.all('/*', requestHandler);
} else {
app.post('/*', requestHandler);
}

// Error Handler
if (enableExecutionId) {
if (options.enableExecutionId) {
app.use(errorHandler);
}

Expand Down
17 changes: 13 additions & 4 deletions test/integration/legacy_event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import * as functions from '../../src/functions';
import * as sinon from 'sinon';
import {getServer} from '../../src/server';
import * as supertest from 'supertest';
import { SignatureType } from '../../src/types';

const TEST_CLOUD_EVENT = {
specversion: '1.0',
Expand All @@ -31,6 +32,16 @@ const TEST_CLOUD_EVENT = {
},
};

const testOptions = {
signatureType: "event" as SignatureType,
enableExecutionId: false,
timeoutMilliseconds: 0,
port: "0",
target: "",
sourceLocation: "",
printHelp: false,
};

describe('Event Function', () => {
beforeEach(() => {
// Prevent log spew from the PubSub emulator request.
Expand Down Expand Up @@ -186,8 +197,7 @@ describe('Event Function', () => {
receivedData = data;
receivedContext = context as functions.CloudFunctionsContext;
},
'event',
/*enableExecutionId=*/ false
testOptions
);
const requestHeaders = {
'Content-Type': 'application/json',
Expand All @@ -208,8 +218,7 @@ describe('Event Function', () => {
() => {
throw 'I crashed';
},
'event',
/*enableExecutionId=*/ false
testOptions,
);
await supertest(server)
.post('/')
Expand Down
46 changes: 46 additions & 0 deletions test/middleware/timeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import * as assert from 'assert';
import * as sinon from 'sinon';
import {NextFunction} from 'express';
import {Request, Response} from '../../src/functions';

import {timeoutMiddleware} from '../../src/middleware/timeout';

describe('timeoutMiddleware', () => {
let request: Request;
let response: Response;
let next: NextFunction;
beforeEach(() => {
request = {
setTimeout: sinon.spy(),
on: sinon.spy(),
} as unknown as Request;
response = {
on: sinon.spy(),
} as unknown as Response;
next = sinon.spy();
});

it('calls the next function', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual((next as sinon.SinonSpy).called, true);
});

it('adds an abort controller to the request', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual(!!request.abortController, true);
});

it('adds an abort controller to the request', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual(!!request.abortController, true);
});

it('sets the request timeout', () => {
const middleware = timeoutMiddleware(1000);
middleware(request, response, next);
assert.strictEqual((request.setTimeout as sinon.SinonSpy).calledWith(1000), true);
});
});
Loading

0 comments on commit 0811780

Please sign in to comment.