diff --git a/src/participant/streamParsing.ts b/src/participant/streamParsing.ts index 93bb5dad9..d8131a759 100644 --- a/src/participant/streamParsing.ts +++ b/src/participant/streamParsing.ts @@ -1,5 +1,166 @@ -function escapeRegex(str: string): string { - return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); +// This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm +// for substring search. It supports being invoked with multiple fragments of the +// haystack and is capable of finding matches spanning multiple fragments. +class StreamingKMP { + public needle: string; + private _lookupVector: number[]; + + // In cases where we are fed a string that has a suffix that matches a prefix + // of the needle, we're storing the index in the needle which we last matched. + // Then when we get a new haystack, we start matching from that needle. + private _lastMatchingIndex = 0; + + constructor(needle: string) { + this.needle = needle; + this._lookupVector = this._createLookupVector(); + } + + private _createLookupVector(): number[] { + const vector = new Array(this.needle.length); + let j = 0; + vector[0] = 0; + + for (let i = 1; i < this.needle.length; i++) { + while (j > 0 && this.needle[i] !== this.needle[j]) { + j = vector[j - 1]; + } + + if (this.needle[i] === this.needle[j]) { + j++; + } + + vector[i] = j; + } + + return vector; + } + + // Returns the index in the haystackFragment **after** the needle. + // This is done because the match may have occurred over multiple fragments, + // so the index of the needle start would be negative. + public match(haystackFragment: string): number { + let j = this._lastMatchingIndex; // index in needle + let i = 0; // index in haystack + + while (i < haystackFragment.length) { + if (haystackFragment[i] === this.needle[j]) { + i++; + j++; + } + + if (j === this.needle.length) { + this._lastMatchingIndex = 0; + return i; + } + + if ( + i < haystackFragment.length && + haystackFragment[i] !== this.needle[j] + ) { + if (j !== 0) { + j = this._lookupVector[j - 1]; + } else { + i++; + } + } + } + + this._lastMatchingIndex = j; + return -1; + } + + public reset(): void { + this._lastMatchingIndex = 0; + } +} + +// This class is essentially a state machine that processes a stream of text fragments +// and emitting a callback with the content between each start and end identifier. The +// two states we have are: +// 1. "waiting for start identifier" - `_matchedContent === undefined` +// 2. "waiting for end identifier" - `_matchedContent !== undefined` +// with the state transitioning from one to the other when the corresponding identifier +// is matched in the fragment stream. +class FragmentMatcher { + private _startMatcher: StreamingKMP; + private _endMatcher: StreamingKMP; + private _matchedContent?: string; + private _onContentMatched: (content: string) => void; + private _onFragmentProcessed: (content: string) => void; + + constructor({ + identifier, + onContentMatched, + onFragmentProcessed, + }: { + identifier: { + start: string; + end: string; + }; + onContentMatched: (content: string) => void; + onFragmentProcessed: (content: string) => void; + }) { + this._startMatcher = new StreamingKMP(identifier.start); + this._endMatcher = new StreamingKMP(identifier.end); + this._onContentMatched = onContentMatched; + this._onFragmentProcessed = onFragmentProcessed; + } + + private _contentMatched(): void { + const content = this._matchedContent; + if (content !== undefined) { + // Strip the trailing end identifier from the matched content + this._onContentMatched( + content.slice(0, content.length - this._endMatcher.needle.length) + ); + } + + this._matchedContent = undefined; + this._startMatcher.reset(); + this._endMatcher.reset(); + } + + // This needs to be invoked every time before we call `process` recursively or when `process` + // completes processing the fragment. It will emit a notification to subscribers with the partial + // fragment we've processed, regardless of whether there's a match or not. + private _partialFragmentProcessed( + fragment: string, + index: number | undefined = undefined + ): void { + this._onFragmentProcessed( + index === undefined ? fragment : fragment.slice(0, index) + ); + } + + public process(fragment: string): void { + if (this._matchedContent === undefined) { + // We haven't matched the start identifier yet, so try and do that + const startIndex = this._startMatcher.match(fragment); + if (startIndex !== -1) { + // We found a match for the start identifier - update `_matchedContent` to an empty string + // and recursively call `process` with the remainder of the fragment. + this._matchedContent = ''; + this._partialFragmentProcessed(fragment, startIndex); + this.process(fragment.slice(startIndex)); + } else { + this._partialFragmentProcessed(fragment); + } + } else { + const endIndex = this._endMatcher.match(fragment); + if (endIndex !== -1) { + // We've matched the end - emit the matched content and continue processing the partial fragment + this._matchedContent += fragment.slice(0, endIndex); + this._partialFragmentProcessed(fragment, endIndex); + this._contentMatched(); + this.process(fragment.slice(endIndex)); + } else { + // We haven't matched the end yet - append the fragment to the matched content and wait + // for a future fragment to contain the end identifier. + this._matchedContent += fragment; + this._partialFragmentProcessed(fragment); + } + } + } } /** @@ -22,74 +183,13 @@ export async function processStreamWithIdentifiers({ end: string; }; }): Promise { - const escapedIdentifierStart = escapeRegex(identifier.start); - const escapedIdentifierEnd = escapeRegex(identifier.end); - const regex = new RegExp( - `${escapedIdentifierStart}([\\s\\S]*?)${escapedIdentifierEnd}`, - 'g' - ); - - let contentSinceLastIdentifier = ''; - for await (const fragment of inputIterable) { - contentSinceLastIdentifier += fragment; + const fragmentMatcher = new FragmentMatcher({ + identifier, + onContentMatched: onStreamIdentifier, + onFragmentProcessed: processStreamFragment, + }); - let lastIndex = 0; - let match: RegExpExecArray | null; - while ((match = regex.exec(contentSinceLastIdentifier)) !== null) { - const endIndex = regex.lastIndex; - - // Stream content up to the end of the identifier. - const contentToStream = contentSinceLastIdentifier.slice( - lastIndex, - endIndex - ); - processStreamFragment(contentToStream); - - const identifierContent = match[1]; - onStreamIdentifier(identifierContent); - - lastIndex = endIndex; - } - - if (lastIndex > 0) { - // Remove all of the processed content. - contentSinceLastIdentifier = contentSinceLastIdentifier.slice(lastIndex); - // Reset the regex. - regex.lastIndex = 0; - } else { - // Clear as much of the content as we can safely. - const maxUnprocessedLength = identifier.start.length - 1; - if (contentSinceLastIdentifier.length > maxUnprocessedLength) { - const identifierIndex = contentSinceLastIdentifier.indexOf( - identifier.start - ); - if (identifierIndex > -1) { - // We have an identifier, so clear up until the identifier. - const contentToStream = contentSinceLastIdentifier.slice( - 0, - identifierIndex - ); - processStreamFragment(contentToStream); - contentSinceLastIdentifier = - contentSinceLastIdentifier.slice(identifierIndex); - } else { - // No identifier, so clear up until the last maxUnprocessedLength. - const processUpTo = - contentSinceLastIdentifier.length - maxUnprocessedLength; - const contentToStream = contentSinceLastIdentifier.slice( - 0, - processUpTo - ); - processStreamFragment(contentToStream); - contentSinceLastIdentifier = - contentSinceLastIdentifier.slice(processUpTo); - } - } - } - } - - // Finish up anything not streamed yet. - if (contentSinceLastIdentifier.length > 0) { - processStreamFragment(contentSinceLastIdentifier); + for await (const fragment of inputIterable) { + fragmentMatcher.process(fragment); } } diff --git a/src/test/suite/participant/streamParsing.test.ts b/src/test/suite/participant/streamParsing.test.ts index 66208ecdd..96d274d00 100644 --- a/src/test/suite/participant/streamParsing.test.ts +++ b/src/test/suite/participant/streamParsing.test.ts @@ -216,4 +216,80 @@ suite('processStreamWithIdentifiers', () => { expect(fragmentsProcessed.join('')).to.deep.equal(inputFragments.join('')); expect(identifiersStreamed).to.deep.equal(['\ncode1\n', '\ncode2\n']); }); + + test('one fragment containing multiple code blocks emits event in correct order', async () => { + // In case we have one fragment containing multiple code blocks, we want to make sure that + // fragment notifications and identifier notifications arrive in the right order so that we're + // adding code actions after the correct subfragment. + // For example: + // 'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.' + // + // should emit: + // + // processStreamFragment: 'Text before code.\n```js\ncode1\n```' + // onStreamIdentifier: '\ncode1\n' + // processStreamFragment: '\nText between code.\n```js\ncode2\n```' + // onStreamIdentifier: '\ncode2\n' + // processStreamFragment: '\nText after code.' + // + // in that order to ensure we add each code action immediately after the code block + // rather than add both at the end. + + const inputFragments = [ + 'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.', + ]; + + const inputIterable = asyncIterableFromArray(inputFragments); + const identifier = { start: '```js', end: '```' }; + + const fragmentsEmitted: { + source: 'processStreamFragment' | 'onStreamIdentifier'; + content: string; + }[] = []; + + const getFragmentHandler = ( + source: 'processStreamFragment' | 'onStreamIdentifier' + ): ((fragment: string) => void) => { + return (fragment: string): void => { + // It's an implementation detail, but the way the code is structured today, we're splitting the emitted fragments + // whenever we find either a start or end identifier. This is irrelevant as long as we're emitting the entirety of + // the text until the end of the code block in `processStreamFragment` and then the code block itself in `onStreamIdentifier`. + // With the code below, we're combining all subfragments with the same source to make the test verify the desired + // behavior rather than the actual implementation. + const lastFragment = fragmentsEmitted[fragmentsEmitted.length - 1]; + if (lastFragment?.source === source) { + lastFragment.content += fragment; + } else { + fragmentsEmitted.push({ source, content: fragment }); + } + }; + }; + + await processStreamWithIdentifiers({ + processStreamFragment: getFragmentHandler('processStreamFragment'), + onStreamIdentifier: getFragmentHandler('onStreamIdentifier'), + inputIterable, + identifier, + }); + + expect(fragmentsEmitted).to.have.length(5); + expect(fragmentsEmitted[0].source).to.equal('processStreamFragment'); + expect(fragmentsEmitted[0].content).to.equal( + 'Text before code.\n```js\ncode1\n```' + ); + + expect(fragmentsEmitted[1].source).to.equal('onStreamIdentifier'); + expect(fragmentsEmitted[1].content).to.equal('\ncode1\n'); + + expect(fragmentsEmitted[2].source).to.equal('processStreamFragment'); + expect(fragmentsEmitted[2].content).to.equal( + '\nText between code.\n```js\ncode2\n```' + ); + + expect(fragmentsEmitted[3].source).to.equal('onStreamIdentifier'); + expect(fragmentsEmitted[3].content).to.equal('\ncode2\n'); + + expect(fragmentsEmitted[4].source).to.equal('processStreamFragment'); + expect(fragmentsEmitted[4].content).to.equal('\nText after code.'); + }); });