Skip to content

Commit

Permalink
perf: improve replay upload resiliency (#29174)
Browse files Browse the repository at this point in the history
* tbd

* workable templated filestream uploader

* clean up async assertions

* upload stream uses activity monitor to detect start timeouts and stalls

* makes uploadStream retryable

* filesize detection, enoent errors, and testable retry delays

* extract fs logic to putProtocolArtifact, impl putProtocolArtifact

* aggregate errors for retryable upload streams

* fixes imports from moving api.ts, uses new upload mechanism in protocol.ts

* use spec_helper in StreamActivityMonitor_spec due to global sinon clock changes

* fix putProtocolArtifact specs when run as a part of the unit test suite

* fix return type of ProtocolManager.uploadCaptureArtifact

* convert from whatwg streams back to node streams

* extract HttpError

* ensure system test snapshots

* changelog

* more changelog

* fix unit tests

* fix api ref in integration test

* fix refs to api in snapshotting and after-pack

* small edits

* Update packages/server/lib/cloud/api/HttpError.ts

Co-authored-by: Bill Glesias <[email protected]>

* Update packages/server/lib/cloud/upload/uploadStream.ts

Co-authored-by: Bill Glesias <[email protected]>

* camelcase -> snakeCase filenames

* improve docs for StreamActivityMonitor

* added documentation to: upload_stream, put_protocol_artifact_spec

* move stream activity monitor params to consts - no magic numbers. docs.

* Update packages/server/lib/cloud/api/http_error.ts

Co-authored-by: Bill Glesias <[email protected]>

* Update packages/server/test/unit/cloud/api/put_protocol_artifact_spec.ts

Co-authored-by: Bill Glesias <[email protected]>

* fix check-ts

* fix imports in put_protocol_artifact_spec

* Update packages/server/test/unit/cloud/upload/stream_activity_monitor_spec.ts

Co-authored-by: Ryan Manuel <[email protected]>

* api.ts -> index.ts

* fix comment style, remove confusingly inapplicable comment about whatwg streams

---------

Co-authored-by: Bill Glesias <[email protected]>
Co-authored-by: Ryan Manuel <[email protected]>
  • Loading branch information
3 people authored Mar 29, 2024
1 parent fb87950 commit 3a739f3
Show file tree
Hide file tree
Showing 24 changed files with 1,103 additions and 169 deletions.
4 changes: 4 additions & 0 deletions cli/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

_Released 4/2/2024 (PENDING)_

**Performance:**

- Improvements to Test Replay upload resiliency. Fixes [#28890](https://github.com/cypress-io/cypress/issues/28890). Addressed in [#29174](https://github.com/cypress-io/cypress/pull/29174)

**Bugfixes:**

- Fixed an issue where Cypress was not executing beyond the first spec in `cypress run` for versions of Firefox 124 and up when a custom user agent was provided. Fixes [#29190](https://github.com/cypress-io/cypress/issues/29190).
Expand Down
33 changes: 33 additions & 0 deletions packages/server/lib/cloud/api/http_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const SENSITIVE_KEYS = Object.freeze(['x-amz-credential', 'x-amz-signature', 'Signature', 'AWSAccessKeyId'])
const scrubUrl = (url: string, sensitiveKeys: readonly string[]): string => {
const parsedUrl = new URL(url)

for (const [key, value] of parsedUrl.searchParams) {
if (sensitiveKeys.includes(key)) {
parsedUrl.searchParams.set(key, 'X'.repeat(value.length))
}
}

return parsedUrl.href
}

export class HttpError extends Error {
constructor (
message: string,
public readonly originalResponse: Response,
) {
super(message)
}

public static async fromResponse (response: Response): Promise<HttpError> {
const status = response.status
const statusText = await (response.json().catch(() => {
return response.statusText
}))

return new HttpError(
`${status} ${statusText} (${scrubUrl(response.url, SENSITIVE_KEYS)})`,
response,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@ const RequestErrors = require('@cypress/request-promise/errors')
const { agent } = require('@packages/network')
const pkg = require('@packages/root')

const machineId = require('./machine_id')
const errors = require('../errors')
const { apiUrl, apiRoutes, makeRoutes } = require('./routes')
const machineId = require('../machine_id')
const errors = require('../../errors')
const { apiUrl, apiRoutes, makeRoutes } = require('../routes')

import Bluebird from 'bluebird'
import { getText } from '../util/status_code'
import * as enc from './encryption'
import getEnvInformationForProjectRoot from './environment'
import { getText } from '../../util/status_code'
import * as enc from '../encryption'
import getEnvInformationForProjectRoot from '../environment'

import type { OptionsWithUrl } from 'request-promise'
import { fs } from '../util/fs'
import ProtocolManager from './protocol'
import type { ProjectBase } from '../project-base'
import { fs } from '../../util/fs'
import ProtocolManager from '../protocol'
import type { ProjectBase } from '../../project-base'

const THIRTY_SECONDS = humanInterval('30 seconds')
const SIXTY_SECONDS = humanInterval('60 seconds')
Expand Down
35 changes: 35 additions & 0 deletions packages/server/lib/cloud/api/put_protocol_artifact.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import fsAsync from 'fs/promises'
import fs from 'fs'
import Debug from 'debug'
import { uploadStream, geometricRetry } from '../upload/upload_stream'
import { StreamActivityMonitor } from '../upload/stream_activity_monitor'

const debug = Debug('cypress:server:cloud:api:protocol-artifact')

// the upload will get canceled if the source stream does not
// begin flowing within 5 seconds, or if the stream pipeline
// stalls (does not push data to the `fetch` sink) for more
// than 5 seconds
const MAX_START_DWELL_TIME = 5000
const MAX_ACTIVITY_DWELL_TIME = 5000

export const putProtocolArtifact = async (artifactPath: string, maxFileSize: number, destinationUrl: string) => {
debug(`Atttempting to upload Test Replay archive from ${artifactPath} to ${destinationUrl})`)
const { size } = await fsAsync.stat(artifactPath)

if (size > maxFileSize) {
throw new Error(`Spec recording too large: artifact is ${size} bytes, limit is ${maxFileSize} bytes`)
}

const activityMonitor = new StreamActivityMonitor(MAX_START_DWELL_TIME, MAX_ACTIVITY_DWELL_TIME)
const fileStream = fs.createReadStream(artifactPath)

await uploadStream(
fileStream,
destinationUrl,
size, {
retryDelay: geometricRetry,
activityMonitor,
},
)
}
121 changes: 29 additions & 92 deletions packages/server/lib/cloud/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { agent } from '@packages/network'
import pkg from '@packages/root'

import env from '../util/env'
import { putProtocolArtifact } from './api/put_protocol_artifact'

import type { Readable } from 'stream'
import type { ProtocolManagerShape, AppCaptureProtocolInterface, CDPClient, ProtocolError, CaptureArtifact, ProtocolErrorReport, ProtocolCaptureMethod, ProtocolManagerOptions, ResponseStreamOptions, ResponseEndedWithEmptyBodyOptions, ResponseStreamTimedOutOptions } from '@packages/types'

Expand All @@ -23,9 +25,6 @@ const debugVerbose = Debug('cypress-verbose:server:protocol')
const CAPTURE_ERRORS = !process.env.CYPRESS_LOCAL_PROTOCOL_PATH
const DELETE_DB = !process.env.CYPRESS_LOCAL_PROTOCOL_PATH

// Timeout for upload
const TWO_MINUTES = 120000
const RETRY_DELAYS = [500, 1000]
const DB_SIZE_LIMIT = 5000000000

const dbSizeLimit = () => {
Expand All @@ -51,13 +50,6 @@ const requireScript = (script: string) => {
return mod.exports
}

class CypressRetryableError extends Error {
constructor (message: string) {
super(message)
this.name = 'CypressRetryableError'
}
}

export class ProtocolManager implements ProtocolManagerShape {
private _runId?: string
private _instanceId?: string
Expand Down Expand Up @@ -267,7 +259,7 @@ export class ProtocolManager implements ProtocolManagerShape {
return this._errors.filter((e) => !e.fatal)
}

async getArchiveInfo (): Promise<{ stream: Readable, fileSize: number } | void> {
async getArchiveInfo (): Promise<{ filePath: string, fileSize: number } | void> {
const archivePath = this._archivePath

debug('reading archive from', archivePath)
Expand All @@ -276,107 +268,52 @@ export class ProtocolManager implements ProtocolManagerShape {
}

return {
stream: fs.createReadStream(archivePath),
filePath: archivePath,
fileSize: (await fs.stat(archivePath)).size,
}
}

async uploadCaptureArtifact ({ uploadUrl, payload, fileSize }: CaptureArtifact, timeout) {
const archivePath = this._archivePath
async uploadCaptureArtifact ({ uploadUrl, fileSize, filePath }: CaptureArtifact): Promise<{
success: boolean
fileSize: number
specAccess?: ReturnType<AppCaptureProtocolInterface['getDbMetadata']>
} | void> {
if (!this._protocol || !filePath || !this._db) {
debug('not uploading due to one of the following being falsy: %O', {
_protocol: !!this._protocol,
archivePath: !!filePath,
_db: !!this._db,
})

if (!this._protocol || !archivePath || !this._db) {
return
}

debug(`uploading %s to %s with a file size of %s`, archivePath, uploadUrl, fileSize)

const retryRequest = async (retryCount: number, errors: Error[]) => {
try {
if (fileSize > dbSizeLimit()) {
throw new Error(`Spec recording too large: db is ${fileSize} bytes, limit is ${dbSizeLimit()} bytes`)
}

const controller = new AbortController()

setTimeout(() => {
controller.abort()
}, timeout ?? TWO_MINUTES)

const res = await fetch(uploadUrl, {
agent,
method: 'PUT',
// @ts-expect-error - this is supported
body: payload,
headers: {
'Accept': 'application/json',
'Content-Type': 'application/x-tar',
'Content-Length': `${fileSize}`,
},
signal: controller.signal,
})
debug(`uploading %s to %s with a file size of %s`, filePath, uploadUrl, fileSize)

if (res.ok) {
return {
fileSize,
success: true,
specAccess: this._protocol?.getDbMetadata(),
}
}

const errorMessage = await res.json().catch(() => {
const url = new URL(uploadUrl)

for (const [key, value] of url.searchParams) {
if (['x-amz-credential', 'x-amz-signature'].includes(key.toLowerCase())) {
url.searchParams.set(key, 'X'.repeat(value.length))
}
}

return `${res.status} ${res.statusText} (${url.href})`
})

debug(`error response: %O`, errorMessage)

if (res.status >= 500 && res.status < 600) {
throw new CypressRetryableError(errorMessage)
}

throw new Error(errorMessage)
} catch (e) {
// Only retry errors that are network related (e.g. connection reset or timeouts)
if (['FetchError', 'AbortError', 'CypressRetryableError'].includes(e.name)) {
if (retryCount < RETRY_DELAYS.length) {
debug(`retrying upload %o`, { retryCount })
await new Promise((resolve) => setTimeout(resolve, RETRY_DELAYS[retryCount]))

return await retryRequest(retryCount + 1, [...errors, e])
}
}

const totalErrors = [...errors, e]
try {
await putProtocolArtifact(filePath, dbSizeLimit(), uploadUrl)

throw new AggregateError(totalErrors, e.message)
return {
fileSize,
success: true,
specAccess: this._protocol?.getDbMetadata(),
}
}

try {
return await retryRequest(0, [])
} catch (e) {
if (CAPTURE_ERRORS) {
this._errors.push({
error: e,
captureMethod: 'uploadCaptureArtifact',
fatal: true,
})
}

throw e
throw e
}
} finally {
await (
DELETE_DB ? fs.unlink(archivePath).catch((e) => {
debug(`Error unlinking db %o`, e)
}) : Promise.resolve()
)
if (DELETE_DB) {
await fs.unlink(filePath).catch((e) => {
debug('Error unlinking db %o', e)
})
}
}
}

Expand Down
103 changes: 103 additions & 0 deletions packages/server/lib/cloud/upload/stream_activity_monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import Debug from 'debug'
import { Transform, Readable } from 'stream'

const debug = Debug('cypress:server:cloud:stream-activity-monitor')
const debugVerbose = Debug('cypress-verbose:server:cloud:stream-activity-monitor')

export class StreamStartTimedOutError extends Error {
constructor (maxStartDwellTime: number) {
super(`Source stream failed to begin sending data after ${maxStartDwellTime}ms`)
}
}

export class StreamStalledError extends Error {
constructor (maxActivityDwellTime: number) {
super(`Stream stalled: no activity detected in the previous ${maxActivityDwellTime}ms`)
}
}

/**
* `StreamActivityMonitor` encapsulates state with regard to monitoring a stream
* for flow failure states. Given a maxStartDwellTime and a maxActivityDwellTime, this class
* can `monitor` a Node Readable stream and signal if the sink (e.g., a `fetch`) should be
* aborted via an AbortController that can be retried via `getController`. It does this
* by creating an identity Transform stream and piping the source stream through it. The
* transform stream receives each chunk that the source emits, and orchestrates some timeouts
* to determine if the stream has failed to start, or if the data flow has stalled.
*
* Example usage:
*
* const MAX_START_DWELL_TIME = 5000
* const MAX_ACTIVITY_DWELL_TIME = 5000
* const stallDetection = new StreamActivityMonitor(MAX_START_DWELL_TIME, MAX_ACTIVITY_DWELL_TIME)
* try {
* const source = fs.createReadStream('/some/source/file')
* await fetch('/destination/url', {
* method: 'PUT',
* body: stallDetection.monitor(source)
* signal: stallDetection.getController().signal
* })
* } catch (e) {
* if (stallDetection.getController().signal.reason) {
* // the `fetch` was aborted by the signal that `stallDetection` controlled
* }
* }
*
*/
export class StreamActivityMonitor {
private streamMonitor: Transform | undefined
private startTimeout: NodeJS.Timeout | undefined
private activityTimeout: NodeJS.Timeout | undefined
private controller: AbortController

constructor (private maxStartDwellTime: number, private maxActivityDwellTime: number) {
this.controller = new AbortController()
}

public getController () {
return this.controller
}

public monitor (stream: Readable): Readable {
debug('monitoring stream')
if (this.streamMonitor || this.startTimeout || this.activityTimeout) {
this.reset()
}

this.streamMonitor = new Transform({
transform: (chunk, _, callback) => {
debugVerbose('Received chunk from File ReadableStream; Enqueing to network: ', chunk.length)

clearTimeout(this.startTimeout)
this.markActivityInterval()
callback(null, chunk)
},
})

this.startTimeout = setTimeout(() => {
this.controller?.abort(new StreamStartTimedOutError(this.maxStartDwellTime))
}, this.maxStartDwellTime)

return stream.pipe(this.streamMonitor)
}

private reset () {
debug('Resetting Stream Activity Monitor')
clearTimeout(this.startTimeout)
clearTimeout(this.activityTimeout)

this.streamMonitor = undefined
this.startTimeout = undefined
this.activityTimeout = undefined

this.controller = new AbortController()
}

private markActivityInterval () {
debug('marking activity interval')
clearTimeout(this.activityTimeout)
this.activityTimeout = setTimeout(() => {
this.controller?.abort(new StreamStalledError(this.maxActivityDwellTime))
}, this.maxActivityDwellTime)
}
}
Loading

5 comments on commit 3a739f3

@cypress-bot
Copy link
Contributor

@cypress-bot cypress-bot bot commented on 3a739f3 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circle has built the linux x64 version of the Test Runner.

Learn more about this pre-release build at https://on.cypress.io/advanced-installation#Install-pre-release-version

Run this command to install the pre-release locally:

npm install https://cdn.cypress.io/beta/npm/13.7.2/linux-x64/develop-3a739f3f4f616b5b56b187ff84b53bc84cc21d4e/cypress.tgz

@cypress-bot
Copy link
Contributor

@cypress-bot cypress-bot bot commented on 3a739f3 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circle has built the linux arm64 version of the Test Runner.

Learn more about this pre-release build at https://on.cypress.io/advanced-installation#Install-pre-release-version

Run this command to install the pre-release locally:

npm install https://cdn.cypress.io/beta/npm/13.7.2/linux-arm64/develop-3a739f3f4f616b5b56b187ff84b53bc84cc21d4e/cypress.tgz

@cypress-bot
Copy link
Contributor

@cypress-bot cypress-bot bot commented on 3a739f3 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circle has built the darwin arm64 version of the Test Runner.

Learn more about this pre-release build at https://on.cypress.io/advanced-installation#Install-pre-release-version

Run this command to install the pre-release locally:

npm install https://cdn.cypress.io/beta/npm/13.7.2/darwin-arm64/develop-3a739f3f4f616b5b56b187ff84b53bc84cc21d4e/cypress.tgz

@cypress-bot
Copy link
Contributor

@cypress-bot cypress-bot bot commented on 3a739f3 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circle has built the darwin x64 version of the Test Runner.

Learn more about this pre-release build at https://on.cypress.io/advanced-installation#Install-pre-release-version

Run this command to install the pre-release locally:

npm install https://cdn.cypress.io/beta/npm/13.7.2/darwin-x64/develop-3a739f3f4f616b5b56b187ff84b53bc84cc21d4e/cypress.tgz

@cypress-bot
Copy link
Contributor

@cypress-bot cypress-bot bot commented on 3a739f3 Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Circle has built the win32 x64 version of the Test Runner.

Learn more about this pre-release build at https://on.cypress.io/advanced-installation#Install-pre-release-version

Run this command to install the pre-release locally:

npm install https://cdn.cypress.io/beta/npm/13.7.2/win32-x64/develop-3a739f3f4f616b5b56b187ff84b53bc84cc21d4e/cypress.tgz

Please sign in to comment.