Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚗️[RUMF-1405] add mechanism to simulate intake issue #1757

Merged
merged 4 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/core/src/domain/configuration/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { catchUserErrors } from '../../tools/catchUserErrors'
import { display } from '../../tools/display'
import { assign, isPercentage, ONE_KIBI_BYTE, ONE_SECOND } from '../../tools/utils'
import { updateExperimentalFeatures } from './experimentalFeatures'
import { initSimulation } from './simulation'
import type { TransportConfiguration } from './transportConfiguration'
import { computeTransportConfiguration } from './transportConfiguration'

Expand Down Expand Up @@ -40,6 +41,11 @@ export interface InitConfiguration {
enableExperimentalFeatures?: string[] | undefined
replica?: ReplicaUserConfiguration | undefined
datacenter?: string

// simulation options
simulationStart?: string | undefined
simulationEnd?: string | undefined
simulationLabel?: string | undefined
}

// This type is only used to build the core configuration. Logs and RUM SDKs are using a proper type
Expand Down Expand Up @@ -90,6 +96,8 @@ export function validateAndBuildConfiguration(initConfiguration: InitConfigurati
// Set the experimental feature flags as early as possible, so we can use them in most places
updateExperimentalFeatures(initConfiguration.enableExperimentalFeatures)

initSimulation(initConfiguration.simulationStart, initConfiguration.simulationEnd, initConfiguration.simulationLabel)

return assign(
{
beforeSend:
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/domain/configuration/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ export {
getExperimentalFeatures,
} from './experimentalFeatures'
export * from './intakeSites'
export { isSimulationActive, getSimulationLabel } from './simulation'
29 changes: 29 additions & 0 deletions packages/core/src/domain/configuration/simulation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { dateNow } from '../../tools/timeUtils'
import type { TimeStamp } from '../../tools/timeUtils'

interface Simulation {
start: TimeStamp
end: TimeStamp
label: string
}

let simulation: Simulation | undefined

export function initSimulation(simulationStart?: string, simulationEnd?: string, simulationLabel?: string) {
if (simulationStart && simulationEnd && simulationLabel) {
simulation = {
start: new Date(simulationStart).getTime() as TimeStamp,
end: new Date(simulationEnd).getTime() as TimeStamp,
label: simulationLabel,
}
}
}

export function isSimulationActive() {
const now = dateNow()
return simulation !== undefined && now >= simulation.start && now <= simulation.end
}

export function getSimulationLabel() {
return simulation?.label
}
11 changes: 9 additions & 2 deletions packages/core/src/domain/telemetry/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ import { ConsoleApiName } from '../../tools/display'
import { toStackTraceString } from '../../tools/error'
import { assign, combine, jsonStringify, performDraw, includes, startsWith, arrayFrom } from '../../tools/utils'
import type { Configuration } from '../configuration'
import { getExperimentalFeatures, INTAKE_SITE_STAGING, INTAKE_SITE_US1_FED } from '../configuration'
import {
getExperimentalFeatures,
getSimulationLabel,
INTAKE_SITE_STAGING,
INTAKE_SITE_US1_FED,
isSimulationActive,
} from '../configuration'
import type { StackTrace } from '../tracekit'
import { computeStackTrace } from '../tracekit'
import { Observable } from '../../tools/observable'
Expand Down Expand Up @@ -82,7 +88,8 @@ export function startTelemetry(configuration: Configuration): Telemetry {
telemetry: event as any, // https://github.com/microsoft/TypeScript/issues/48457
experimental_features: arrayFrom(getExperimentalFeatures()),
},
contextProvider !== undefined ? contextProvider() : {}
contextProvider !== undefined ? contextProvider() : {},
isSimulationActive() ? { simulation_label: getSimulationLabel() } : {}
)
}

Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export {
isExperimentalFeatureEnabled,
updateExperimentalFeatures,
resetExperimentalFeatures,
isSimulationActive,
getSimulationLabel,
} from './domain/configuration'
export { trackRuntimeError } from './domain/error/trackRuntimeError'
export { computeStackTrace, StackTrace } from './domain/tracekit'
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/transport/httpRequest.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('httpRequest', () => {
interceptor = interceptRequests()
requests = interceptor.requests
endpointBuilder = stubEndpointBuilder(ENDPOINT_URL)
request = createHttpRequest(endpointBuilder, BATCH_BYTES_LIMIT, noop)
request = createHttpRequest(endpointBuilder, BATCH_BYTES_LIMIT, noop, true)
})

afterEach(() => {
Expand Down Expand Up @@ -209,7 +209,7 @@ describe('httpRequest intake parameters', () => {
interceptor = interceptRequests()
requests = interceptor.requests
endpointBuilder = createEndpointBuilder({ clientToken }, 'logs', [])
request = createHttpRequest(endpointBuilder, BATCH_BYTES_LIMIT, noop)
request = createHttpRequest(endpointBuilder, BATCH_BYTES_LIMIT, noop, true)
})

afterEach(() => {
Expand Down
23 changes: 20 additions & 3 deletions packages/core/src/transport/httpRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { EndpointBuilder } from '../domain/configuration'
import { addTelemetryError } from '../domain/telemetry'
import { monitor } from '../tools/monitor'
import { isExperimentalFeatureEnabled } from '../domain/configuration'
import { isExperimentalFeatureEnabled, isSimulationActive } from '../domain/configuration'
import type { RawError } from '../tools/error'
import { newRetryState, sendWithRetryStrategy } from './sendWithRetryStrategy'

Expand All @@ -15,6 +15,7 @@ import { newRetryState, sendWithRetryStrategy } from './sendWithRetryStrategy'
*/

export type HttpRequest = ReturnType<typeof createHttpRequest>

export interface HttpResponse {
status: number
}
Expand All @@ -27,7 +28,8 @@ export interface Payload {
export function createHttpRequest(
endpointBuilder: EndpointBuilder,
bytesLimit: number,
reportError: (error: RawError) => void
reportError: (error: RawError) => void,
toPrimaryEndpoint: boolean
) {
const retryState = newRetryState()
const sendStrategyForRetry = (payload: Payload, onResponse: (r: HttpResponse) => void) =>
Expand All @@ -38,14 +40,29 @@ export function createHttpRequest(
if (!isExperimentalFeatureEnabled('retry')) {
fetchKeepAliveStrategy(endpointBuilder, bytesLimit, payload)
} else {
sendWithRetryStrategy(payload, retryState, sendStrategyForRetry, endpointBuilder.endpointType, reportError)
sendWithRetryStrategy(
payload,
retryState,
sendStrategyForRetry,
endpointBuilder.endpointType,
toPrimaryEndpoint,
reportError
)
}
},
/**
* Since fetch keepalive behaves like regular fetch on Firefox,
* keep using sendBeaconStrategy on exit
*/
sendOnExit: (payload: Payload) => {
if (
isExperimentalFeatureEnabled('retry') &&
endpointBuilder.endpointType !== 'sessionReplay' &&
toPrimaryEndpoint &&
isSimulationActive()
) {
return
}
sendBeaconStrategy(endpointBuilder, bytesLimit, payload)
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('sendWithRetryStrategy', () => {
data: payload?.data ?? 'a',
bytesCount: payload?.bytesCount ?? 1,
}
sendWithRetryStrategy(effectivePayload, state, sendStub.sendStrategy, ENDPOINT_TYPE, reportErrorSpy)
sendWithRetryStrategy(effectivePayload, state, sendStub.sendStrategy, ENDPOINT_TYPE, true, reportErrorSpy)
}
})

Expand Down
41 changes: 34 additions & 7 deletions packages/core/src/transport/sendWithRetryStrategy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { addTelemetryDebug } from '../domain/telemetry'
import type { EndpointType } from '../domain/configuration'
import { isSimulationActive } from '../domain/configuration'
import { monitor } from '../tools/monitor'
import type { RawError } from '../tools/error'
import { clocksNow } from '../tools/timeUtils'
Expand Down Expand Up @@ -40,18 +41,27 @@ export function sendWithRetryStrategy(
state: RetryState,
sendStrategy: SendStrategy,
endpointType: EndpointType,
toPrimaryEndpoint: boolean,
reportError: (error: RawError) => void
) {
if (
state.transportStatus === TransportStatus.UP &&
state.queuedPayloads.size() === 0 &&
state.bandwidthMonitor.canHandle(payload)
) {
send(payload, state, sendStrategy, {
onSuccess: () => retryQueuedPayloads(RetryReason.AFTER_SUCCESS, state, sendStrategy, endpointType, reportError),
send(payload, state, sendStrategy, toPrimaryEndpoint, {
onSuccess: () =>
retryQueuedPayloads(
RetryReason.AFTER_SUCCESS,
state,
sendStrategy,
endpointType,
toPrimaryEndpoint,
reportError
),
onFailure: () => {
state.queuedPayloads.enqueue(payload)
scheduleRetry(state, sendStrategy, endpointType, reportError)
scheduleRetry(state, sendStrategy, endpointType, toPrimaryEndpoint, reportError)
},
})
} else {
Expand All @@ -63,6 +73,7 @@ function scheduleRetry(
state: RetryState,
sendStrategy: SendStrategy,
endpointType: EndpointType,
toPrimaryEndpoint: boolean,
reportError: (error: RawError) => void
) {
if (state.transportStatus !== TransportStatus.DOWN) {
Expand All @@ -71,7 +82,7 @@ function scheduleRetry(
setTimeout(
monitor(() => {
const payload = state.queuedPayloads.first()
send(payload, state, sendStrategy, {
send(payload, state, sendStrategy, toPrimaryEndpoint, {
onSuccess: () => {
state.queuedPayloads.dequeue()
if (state.lastFailureStatus !== 0) {
Expand All @@ -80,11 +91,18 @@ function scheduleRetry(
})
}
state.currentBackoffTime = INITIAL_BACKOFF_TIME
retryQueuedPayloads(RetryReason.AFTER_RESUME, state, sendStrategy, endpointType, reportError)
retryQueuedPayloads(
RetryReason.AFTER_RESUME,
state,
sendStrategy,
endpointType,
toPrimaryEndpoint,
reportError
)
},
onFailure: () => {
state.currentBackoffTime = Math.min(MAX_BACKOFF_TIME, state.currentBackoffTime * 2)
scheduleRetry(state, sendStrategy, endpointType, reportError)
scheduleRetry(state, sendStrategy, endpointType, toPrimaryEndpoint, reportError)
},
})
}),
Expand All @@ -96,8 +114,16 @@ function send(
payload: Payload,
state: RetryState,
sendStrategy: SendStrategy,
toPrimaryEndpoint: boolean,
{ onSuccess, onFailure }: { onSuccess: () => void; onFailure: () => void }
) {
if (isSimulationActive() && toPrimaryEndpoint) {
state.transportStatus =
state.bandwidthMonitor.ongoingRequestCount > 0 ? TransportStatus.FAILURE_DETECTED : TransportStatus.DOWN
state.lastFailureStatus = 555
onFailure()
return
}
state.bandwidthMonitor.add(payload)
sendStrategy(payload, (response) => {
state.bandwidthMonitor.remove(payload)
Expand All @@ -119,6 +145,7 @@ function retryQueuedPayloads(
state: RetryState,
sendStrategy: SendStrategy,
endpointType: EndpointType,
toPrimaryEndpoint: boolean,
reportError: (error: RawError) => void
) {
if (reason === RetryReason.AFTER_SUCCESS && state.queuedPayloads.isFull() && !state.queueFullReported) {
Expand All @@ -132,7 +159,7 @@ function retryQueuedPayloads(
const previousQueue = state.queuedPayloads
state.queuedPayloads = newPayloadQueue()
while (previousQueue.size() > 0) {
sendWithRetryStrategy(previousQueue.dequeue()!, state, sendStrategy, endpointType, reportError)
sendWithRetryStrategy(previousQueue.dequeue()!, state, sendStrategy, endpointType, toPrimaryEndpoint, reportError)
}
}

Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/transport/startBatchWithReplica.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ export function startBatchWithReplica<T extends Context>(
reportError: (error: RawError) => void,
replicaEndpoint?: EndpointBuilder
) {
const primaryBatch = createBatch(endpoint)
const primaryBatch = createBatch(endpoint, true)
let replicaBatch: Batch | undefined
if (replicaEndpoint) {
replicaBatch = createBatch(replicaEndpoint)
replicaBatch = createBatch(replicaEndpoint, false)
}

function createBatch(endpointBuilder: EndpointBuilder) {
function createBatch(endpointBuilder: EndpointBuilder, toPrimaryEndpoint: boolean) {
return new Batch(
createHttpRequest(endpointBuilder, configuration.batchBytesLimit, reportError),
createHttpRequest(endpointBuilder, configuration.batchBytesLimit, reportError, toPrimaryEndpoint),
configuration.batchMessagesLimit,
configuration.batchBytesLimit,
configuration.messageBytesLimit,
Expand Down
6 changes: 6 additions & 0 deletions packages/logs/src/domain/assembly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
combine,
createEventRateLimiter,
getRelativeTime,
isSimulationActive,
getSimulationLabel,
} from '@datadog/browser-core'
import type { CommonContext } from '../rawLogsEvent.types'
import type { LogsConfiguration } from './configuration'
Expand Down Expand Up @@ -52,6 +54,10 @@ export function startLogsAssembly(
messageContext
)

if (isSimulationActive()) {
log.simulation_label = getSimulationLabel()
}

if (
// Todo: [RUMF-1230] Move this check to the logger collection in the next major release
!isAuthorized(rawLogsEvent.status, HandlerType.http, logger) ||
Expand Down
6 changes: 6 additions & 0 deletions packages/rum-core/src/domain/assembly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import {
display,
createEventRateLimiter,
canUseEventBridge,
isSimulationActive,
getSimulationLabel,
} from '@datadog/browser-core'
import type { RumEventDomainContext } from '../domainContext.types'
import type {
Expand Down Expand Up @@ -133,6 +135,10 @@ export function startRumAssembly(
const serverRumEvent = combine(rumContext as RumContext & Context, rawRumEvent) as RumEvent & Context
serverRumEvent.context = combine(commonContext.context, customerContext)

if (isSimulationActive()) {
serverRumEvent.context.simulation_label = getSimulationLabel()
}

if (!('has_replay' in serverRumEvent.session)) {
;(serverRumEvent.session as Mutable<RumEvent['session']>).has_replay = commonContext.hasReplay
}
Expand Down
8 changes: 4 additions & 4 deletions packages/rum-core/src/transport/startRumBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ function makeRumBatch(
lifeCycle: LifeCycle,
reportError: (error: RawError) => void
): RumBatch {
const primaryBatch = createRumBatch(configuration.rumEndpointBuilder, () =>
const primaryBatch = createRumBatch(configuration.rumEndpointBuilder, true, () =>
lifeCycle.notify(LifeCycleEventType.BEFORE_UNLOAD)
)

let replicaBatch: Batch | undefined
const replica = configuration.replica
if (replica !== undefined) {
replicaBatch = createRumBatch(replica.rumEndpointBuilder)
replicaBatch = createRumBatch(replica.rumEndpointBuilder, false)
}

function createRumBatch(endpointBuilder: EndpointBuilder, unloadCallback?: () => void) {
function createRumBatch(endpointBuilder: EndpointBuilder, toPrimaryEndpoint: boolean, unloadCallback?: () => void) {
return new Batch(
createHttpRequest(endpointBuilder, configuration.batchBytesLimit, reportError),
createHttpRequest(endpointBuilder, configuration.batchBytesLimit, reportError, toPrimaryEndpoint),
configuration.batchMessagesLimit,
configuration.batchBytesLimit,
configuration.messageBytesLimit,
Expand Down
7 changes: 6 additions & 1 deletion packages/rum/src/boot/startRecording.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ describe('startRecording', () => {
defaultPrivacyLevel: DefaultPrivacyLevel.ALLOW,
})
.beforeBuild(({ lifeCycle, configuration, viewContexts, sessionManager }) => {
const httpRequest = createHttpRequest(configuration.sessionReplayEndpointBuilder, SEGMENT_BYTES_LIMIT, noop)
const httpRequest = createHttpRequest(
configuration.sessionReplayEndpointBuilder,
SEGMENT_BYTES_LIMIT,
noop,
true
)

const requestSendSpy = spyOn(httpRequest, 'sendOnExit')
;({ waitAsyncCalls: waitRequestSendCalls, expectNoExtraAsyncCall: expectNoExtraRequestSendCalls } =
Expand Down
2 changes: 1 addition & 1 deletion packages/rum/src/boot/startRecording.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export function startRecording(
}

const replayRequest =
httpRequest || createHttpRequest(configuration.sessionReplayEndpointBuilder, SEGMENT_BYTES_LIMIT, reportError)
httpRequest || createHttpRequest(configuration.sessionReplayEndpointBuilder, SEGMENT_BYTES_LIMIT, reportError, true)

const { addRecord, stop: stopSegmentCollection } = startSegmentCollection(
lifeCycle,
Expand Down