Skip to content

Commit

Permalink
Allow reading multiple data chunks from single read() chunk.
Browse files Browse the repository at this point in the history
  • Loading branch information
taeold committed Nov 6, 2024
1 parent 9a430cc commit 785e906
Showing 1 changed file with 43 additions and 17 deletions.
60 changes: 43 additions & 17 deletions packages/functions/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export class FunctionsService implements _FirebaseService {
messagingProvider: Provider<MessagingInternalComponentName>,
appCheckProvider: Provider<AppCheckInternalComponentName>,
regionOrCustomDomain: string = DEFAULT_REGION,
readonly fetchImpl: typeof fetch = fetch,
readonly fetchImpl: typeof fetch = (...args) => fetch(...args),
) {
this.contextProvider = new ContextProvider(
authProvider,
Expand Down Expand Up @@ -450,6 +450,7 @@ async function streamAtURL(
const reader = response.body!.getReader();
const decoder = new TextDecoder();

let pendingLines: string[] = [];
let buffer = '';
let resultResolver: (value: unknown) => void;
let resultRejecter: (reason: any) => void;
Expand All @@ -470,6 +471,30 @@ async function streamAtURL(

const stream = {
[Symbol.asyncIterator]() {

const processLine = (line: string | undefined) => {
// ignore all other lines (newline, comments, etc.)
if (!line?.startsWith('data: ')) return null;

try {
const jsonData = JSON.parse(line.slice(6));
if ('result' in jsonData) {
resultResolver(decode(jsonData.result));
return { done: true, value: undefined };
}
if ('message' in jsonData) {
return { done: false, value: decode(jsonData.message) };
}
if ('error' in jsonData) {
const error = _errorForResponse(0, jsonData);
resultRejecter(error);
throw error;
}
return null; // Unrecognize keys. Skip this line.
} catch (error) {
// Not json. Skip this line.
}
};
return {
async next() {
if (options?.signal?.aborted) {
Expand All @@ -481,29 +506,30 @@ async function streamAtURL(
throw error;
}

while (pendingLines.length > 0) {
const result = processLine(pendingLines.shift());
if (result) return result;
}

while (true) {
const { value, done } = await reader.read();
if (done) return { done: true, value: undefined };

if (done) {
if (buffer.trim()) {
const result = processLine(buffer);
if (result) return result;
}
return { done: true, value: undefined };
}

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
pendingLines.push(...lines.filter(line => line.trim()));

for (const line of lines) {
if (line.startsWith('data: ')) {
const jsonData = JSON.parse(line.slice(6));
if ('result' in jsonData) {
resultResolver(decode(jsonData.result));
return { done: true, value: undefined };
} else if ('message' in jsonData) {
return { done: false, value: decode(jsonData.message) };
} else if ('error' in jsonData) {
const error = _errorForResponse(0, jsonData);
resultRejecter(error)
throw error;
}
}
// ignore all other lines (newline, comments, etc.)
if (pendingLines.length > 0) {
const result = processLine(pendingLines.shift());
if (result) return result;
}
}
}
Expand Down

0 comments on commit 785e906

Please sign in to comment.