From bfe1f72d0d79ac804e1ba90299742c940dfde97e Mon Sep 17 00:00:00 2001 From: Nikola Irinchev Date: Fri, 27 Sep 2024 00:11:11 +0200 Subject: [PATCH] Replace regex fragment matching with streaming KMP --- src/participant/streamParsing.ts | 222 +++++++++++++++++++++---------- 1 file changed, 152 insertions(+), 70 deletions(-) diff --git a/src/participant/streamParsing.ts b/src/participant/streamParsing.ts index 93bb5dad9..c90eb97ed 100644 --- a/src/participant/streamParsing.ts +++ b/src/participant/streamParsing.ts @@ -1,5 +1,148 @@ -function escapeRegex(str: string): string { - return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); +// This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm +// for substring search. It supports being invoked with multiple fragments of the +// haystack and is capable of finding matches spanning multiple fragments. +class StreamingKMP { + public needle: string; + private _lookupVector: number[]; + + // In cases where we are fed a string that has a suffix that matches a prefix + // of the needle, we're storing the index in the needle which we last matched. + // Then when we get a new haystack, we start matching from that needle. + private _lastMatchingIndex = 0; + + constructor(needle: string) { + this.needle = needle; + this._lookupVector = this._createLookupVector(); + } + + private _createLookupVector(): number[] { + const vector = new Array(this.needle.length); + let j = 0; + vector[0] = 0; + + for (let i = 1; i < this.needle.length; i++) { + while (j > 0 && this.needle[i] !== this.needle[j]) { + j = vector[j - 1]; + } + + if (this.needle[i] === this.needle[j]) { + j++; + } + + vector[i] = j; + } + + return vector; + } + + // Returns the index in the haystackFragment **after** the needle. + // This is done because the match may have occurred over multiple fragments, + // so the index of the needle start would be negative. + public match(haystackFragment: string): number { + let j = this._lastMatchingIndex; // index in needle + let i = 0; // index in haystack + + while (i < haystackFragment.length) { + if (haystackFragment[i] === this.needle[j]) { + i++; + j++; + } + + if (j === this.needle.length) { + this._lastMatchingIndex = 0; + return i; + } + + if ( + i < haystackFragment.length && + haystackFragment[i] !== this.needle[j] + ) { + if (j !== 0) { + j = this._lookupVector[j - 1]; + } else { + i++; + } + } + } + + this._lastMatchingIndex = j; + return -1; + } + + public reset(): void { + this._lastMatchingIndex = 0; + } +} + +class FragmentMatcher { + private _startMatcher: StreamingKMP; + private _endMatcher: StreamingKMP; + private _matchedContent?: string; + private _onContentMatched: (content: string) => void; + + constructor({ + identifier, + onContentMatched, + }: { + identifier: { + start: string; + end: string; + }; + onContentMatched: (content: string) => void; + }) { + this._startMatcher = new StreamingKMP(identifier.start); + this._endMatcher = new StreamingKMP(identifier.end); + this._onContentMatched = onContentMatched; + } + + private _contentMatched(): void { + const content = this._matchedContent; + if (content !== undefined) { + // Strip the trailing end identifier from the matched content + this._onContentMatched( + content.slice(0, content.length - this._endMatcher.needle.length) + ); + } + + this._matchedContent = undefined; + this._startMatcher.reset(); + this._endMatcher.reset(); + } + + public process(fragment: string): void { + if (this._matchedContent === undefined) { + // We haven't matched the start identifier yet, so try and do that + const startIndex = this._startMatcher.match(fragment); + if (startIndex !== -1) { + let endIndex = this._endMatcher.match(fragment.slice(startIndex)); + if (endIndex !== -1) { + // This is the case where both the start and the end identifiers are contained in the same fragment. + // In this case, we emit the content between the two identifiers and continue processing the rest of the fragment. + + // endIndex is relative to the slice, so we need to add startIndex to it. + endIndex = startIndex + endIndex; + + this._matchedContent = fragment.slice(startIndex, endIndex); + this._contentMatched(); + this.process(fragment.slice(endIndex)); + } else { + // If we only matched the start, we add the partial fragment to the matched content and + // wait for another fragment to complete the match. + this._matchedContent = fragment.slice(startIndex); + } + } + } else { + const endIndex = this._endMatcher.match(fragment); + if (endIndex !== -1) { + // We've matched the end - emit the matched content and continue processing the partial fragment + this._matchedContent += fragment.slice(0, endIndex); + this._contentMatched(); + this.process(fragment.slice(endIndex)); + } else { + this._matchedContent += fragment; + } + } + } } /** @@ -22,74 +165,13 @@ export async function processStreamWithIdentifiers({ end: string; }; }): Promise { - const escapedIdentifierStart = escapeRegex(identifier.start); - const escapedIdentifierEnd = escapeRegex(identifier.end); - const regex = new RegExp( - `${escapedIdentifierStart}([\\s\\S]*?)${escapedIdentifierEnd}`, - 'g' - ); - - let contentSinceLastIdentifier = ''; - for await (const fragment of inputIterable) { - contentSinceLastIdentifier += fragment; - - let lastIndex = 0; - let match: RegExpExecArray | null; - while ((match = regex.exec(contentSinceLastIdentifier)) !== null) { - const endIndex = regex.lastIndex; - - // Stream content up to the end of the identifier. - const contentToStream = contentSinceLastIdentifier.slice( - lastIndex, - endIndex - ); - processStreamFragment(contentToStream); + const fragmentMatcher = new FragmentMatcher({ + identifier, + onContentMatched: onStreamIdentifier, + }); - const identifierContent = match[1]; - onStreamIdentifier(identifierContent); - - lastIndex = endIndex; - } - - if (lastIndex > 0) { - // Remove all of the processed content. - contentSinceLastIdentifier = contentSinceLastIdentifier.slice(lastIndex); - // Reset the regex. - regex.lastIndex = 0; - } else { - // Clear as much of the content as we can safely. - const maxUnprocessedLength = identifier.start.length - 1; - if (contentSinceLastIdentifier.length > maxUnprocessedLength) { - const identifierIndex = contentSinceLastIdentifier.indexOf( - identifier.start - ); - if (identifierIndex > -1) { - // We have an identifier, so clear up until the identifier. - const contentToStream = contentSinceLastIdentifier.slice( - 0, - identifierIndex - ); - processStreamFragment(contentToStream); - contentSinceLastIdentifier = - contentSinceLastIdentifier.slice(identifierIndex); - } else { - // No identifier, so clear up until the last maxUnprocessedLength. - const processUpTo = - contentSinceLastIdentifier.length - maxUnprocessedLength; - const contentToStream = contentSinceLastIdentifier.slice( - 0, - processUpTo - ); - processStreamFragment(contentToStream); - contentSinceLastIdentifier = - contentSinceLastIdentifier.slice(processUpTo); - } - } - } - } - - // Finish up anything not streamed yet. - if (contentSinceLastIdentifier.length > 0) { - processStreamFragment(contentSinceLastIdentifier); + for await (const fragment of inputIterable) { + processStreamFragment(fragment); + fragmentMatcher.process(fragment); } }