Skip to content

Commit

Permalink
fix: Correctly limit Buffer requests
Browse files Browse the repository at this point in the history
  • Loading branch information
kamilogorek committed Jun 23, 2021
1 parent 688a986 commit 8d36cce
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 95 deletions.
35 changes: 18 additions & 17 deletions packages/browser/src/transports/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,23 +133,24 @@ export class FetchTransport extends BaseTransport {
}

return this._buffer.add(
new SyncPromise<Response>((resolve, reject) => {
void this._fetch(sentryRequest.url, options)
.then(response => {
const headers = {
'x-sentry-rate-limits': response.headers.get('X-Sentry-Rate-Limits'),
'retry-after': response.headers.get('Retry-After'),
};
this._handleResponse({
requestType: sentryRequest.type,
response,
headers,
resolve,
reject,
});
})
.catch(reject);
}),
() =>
new SyncPromise<Response>((resolve, reject) => {
void this._fetch(sentryRequest.url, options)
.then(response => {
const headers = {
'x-sentry-rate-limits': response.headers.get('X-Sentry-Rate-Limits'),
'retry-after': response.headers.get('Retry-After'),
};
this._handleResponse({
requestType: sentryRequest.type,
response,
headers,
resolve,
reject,
});
})
.catch(reject);
}),
);
}
}
37 changes: 19 additions & 18 deletions packages/browser/src/transports/xhr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,28 @@ export class XHRTransport extends BaseTransport {
}

return this._buffer.add(
new SyncPromise<Response>((resolve, reject) => {
const request = new XMLHttpRequest();
() =>
new SyncPromise<Response>((resolve, reject) => {
const request = new XMLHttpRequest();

request.onreadystatechange = (): void => {
if (request.readyState === 4) {
const headers = {
'x-sentry-rate-limits': request.getResponseHeader('X-Sentry-Rate-Limits'),
'retry-after': request.getResponseHeader('Retry-After'),
};
this._handleResponse({ requestType: sentryRequest.type, response: request, headers, resolve, reject });
}
};
request.onreadystatechange = (): void => {
if (request.readyState === 4) {
const headers = {
'x-sentry-rate-limits': request.getResponseHeader('X-Sentry-Rate-Limits'),
'retry-after': request.getResponseHeader('Retry-After'),
};
this._handleResponse({ requestType: sentryRequest.type, response: request, headers, resolve, reject });
}
};

request.open('POST', sentryRequest.url);
for (const header in this.options.headers) {
if (this.options.headers.hasOwnProperty(header)) {
request.setRequestHeader(header, this.options.headers[header]);
request.open('POST', sentryRequest.url);
for (const header in this.options.headers) {
if (this.options.headers.hasOwnProperty(header)) {
request.setRequestHeader(header, this.options.headers[header]);
}
}
}
request.send(sentryRequest.body);
}),
request.send(sentryRequest.body);
}),
);
}
}
2 changes: 1 addition & 1 deletion packages/browser/test/unit/mocks/simpletransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { BaseTransport } from '../../../src/transports';

export class SimpleTransport extends BaseTransport {
public sendEvent(_: Event): PromiseLike<Response> {
return this._buffer.add(
return this._buffer.add(() =>
SyncPromise.resolve({
status: Status.fromHttpCode(200),
}),
Expand Down
11 changes: 6 additions & 5 deletions packages/core/test/mocks/transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ export class FakeTransport implements Transport {
public sendEvent(_event: Event): PromiseLike<Response> {
this.sendCalled += 1;
return this._buffer.add(
new SyncPromise(async res => {
await sleep(this.delay);
this.sentCount += 1;
res({ status: Status.Success });
}),
() =>
new SyncPromise(async res => {
await sleep(this.delay);
this.sentCount += 1;
res({ status: Status.Success });
}),
);
}

Expand Down
91 changes: 47 additions & 44 deletions packages/node/src/transports/base/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,59 +203,62 @@ export abstract class BaseTransport implements Transport {
return Promise.reject(new SentryError('Not adding Promise due to buffer limit reached.'));
}
return this._buffer.add(
new Promise<Response>((resolve, reject) => {
if (!this.module) {
throw new SentryError('No module available');
}
const options = this._getRequestOptions(this.urlParser(sentryRequest.url));
const req = this.module.request(options, res => {
const statusCode = res.statusCode || 500;
const status = Status.fromHttpCode(statusCode);
() =>
new Promise<Response>((resolve, reject) => {
if (!this.module) {
throw new SentryError('No module available');
}
const options = this._getRequestOptions(this.urlParser(sentryRequest.url));
const req = this.module.request(options, res => {
const statusCode = res.statusCode || 500;
const status = Status.fromHttpCode(statusCode);

res.setEncoding('utf8');
res.setEncoding('utf8');

/**
* "Key-value pairs of header names and values. Header names are lower-cased."
* https://nodejs.org/api/http.html#http_message_headers
*/
let retryAfterHeader = res.headers ? res.headers['retry-after'] : '';
retryAfterHeader = (Array.isArray(retryAfterHeader) ? retryAfterHeader[0] : retryAfterHeader) as string;
/**
* "Key-value pairs of header names and values. Header names are lower-cased."
* https://nodejs.org/api/http.html#http_message_headers
*/
let retryAfterHeader = res.headers ? res.headers['retry-after'] : '';
retryAfterHeader = (Array.isArray(retryAfterHeader) ? retryAfterHeader[0] : retryAfterHeader) as string;

let rlHeader = res.headers ? res.headers['x-sentry-rate-limits'] : '';
rlHeader = (Array.isArray(rlHeader) ? rlHeader[0] : rlHeader) as string;
let rlHeader = res.headers ? res.headers['x-sentry-rate-limits'] : '';
rlHeader = (Array.isArray(rlHeader) ? rlHeader[0] : rlHeader) as string;

const headers = {
'x-sentry-rate-limits': rlHeader,
'retry-after': retryAfterHeader,
};
const headers = {
'x-sentry-rate-limits': rlHeader,
'retry-after': retryAfterHeader,
};

const limited = this._handleRateLimit(headers);
if (limited)
logger.warn(
`Too many ${sentryRequest.type} requests, backing off until: ${this._disabledUntil(sentryRequest.type)}`,
);
const limited = this._handleRateLimit(headers);
if (limited)
logger.warn(
`Too many ${sentryRequest.type} requests, backing off until: ${this._disabledUntil(
sentryRequest.type,
)}`,
);

if (status === Status.Success) {
resolve({ status });
} else {
let rejectionMessage = `HTTP Error (${statusCode})`;
if (res.headers && res.headers['x-sentry-error']) {
rejectionMessage += `: ${res.headers['x-sentry-error']}`;
if (status === Status.Success) {
resolve({ status });
} else {
let rejectionMessage = `HTTP Error (${statusCode})`;
if (res.headers && res.headers['x-sentry-error']) {
rejectionMessage += `: ${res.headers['x-sentry-error']}`;
}
reject(new SentryError(rejectionMessage));
}
reject(new SentryError(rejectionMessage));
}

// Force the socket to drain
res.on('data', () => {
// Drain
// Force the socket to drain
res.on('data', () => {
// Drain
});
res.on('end', () => {
// Drain
});
});
res.on('end', () => {
// Drain
});
});
req.on('error', reject);
req.end(sentryRequest.body);
}),
req.on('error', reject);
req.end(sentryRequest.body);
}),
);
}
}
16 changes: 14 additions & 2 deletions packages/utils/src/promisebuffer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { SentryError } from './error';
import { isThenable } from './is';
import { SyncPromise } from './syncpromise';

type TaskProducer<T> = () => PromiseLike<T>;

/** A simple queue that holds promises. */
export class PromiseBuffer<T> {
/** Internal set of queued Promises */
Expand All @@ -18,13 +21,22 @@ export class PromiseBuffer<T> {
/**
* Add a promise to the queue.
*
* @param task Can be any PromiseLike<T>
* @param taskProducer A function producing any PromiseLike<T>
* @returns The original promise.
*/
public add(task: PromiseLike<T>): PromiseLike<T> {
public add(taskProducer: PromiseLike<T> | TaskProducer<T>): PromiseLike<T> {
// NOTE: This is necessary to preserve backwards compatibility
// It should accept _only_ `TaskProducer<T>` but we dont want to break other custom transports
// that are utilizing our `Buffer` implementation.
// see: https://github.com/getsentry/sentry-javascript/issues/3725
const normalizedTaskProducer: TaskProducer<T> = isThenable(taskProducer)
? () => taskProducer as PromiseLike<T>
: (taskProducer as TaskProducer<T>);

if (!this.isReady()) {
return SyncPromise.reject(new SentryError('Not adding Promise due to buffer limit reached.'));
}
const task = normalizedTaskProducer();
if (this._buffer.indexOf(task) === -1) {
this._buffer.push(task);
}
Expand Down
24 changes: 16 additions & 8 deletions packages/utils/test/promisebuffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@ describe('PromiseBuffer', () => {
describe('add()', () => {
test('no limit', () => {
const q = new PromiseBuffer<void>();
const p = new SyncPromise<void>(resolve => setTimeout(resolve, 1));
const p = jest.fn(
() => new SyncPromise<void>(resolve => setTimeout(resolve, 1)),
);
q.add(p);
expect(q.length()).toBe(1);
});

test('with limit', () => {
const q = new PromiseBuffer<void>(1);
const p = new SyncPromise<void>(resolve => setTimeout(resolve, 1));
expect(q.add(p)).toEqual(p);
expect(
q.add(
new SyncPromise<void>(resolve => setTimeout(resolve, 1)),
),
).rejects.toThrowError();
let t1;
const p1 = jest.fn(() => {
t1 = new SyncPromise<void>(resolve => setTimeout(resolve, 1));
return t1;
});
const p2 = jest.fn(
() => new SyncPromise<void>(resolve => setTimeout(resolve, 1)),
);
expect(q.add(p1)).toEqual(t1);
expect(q.add(p2)).rejects.toThrowError();
expect(q.length()).toBe(1);
expect(p1).toHaveBeenCalled();
expect(p2).not.toHaveBeenCalled();
});
});

Expand Down

0 comments on commit 8d36cce

Please sign in to comment.