Skip to content

Commit

Permalink
Replace regex fragment matching with streaming KMP
Browse files Browse the repository at this point in the history
  • Loading branch information
nirinchev committed Sep 26, 2024
1 parent 7eef0e7 commit bfe1f72
Showing 1 changed file with 152 additions and 70 deletions.
222 changes: 152 additions & 70 deletions src/participant/streamParsing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,148 @@
function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
// This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm
// for substring search. It supports being invoked with multiple fragments of the
// haystack and is capable of finding matches spanning multiple fragments.
class StreamingKMP {
public needle: string;
private _lookupVector: number[];

// In cases where we are fed a string that has a suffix that matches a prefix
// of the needle, we're storing the index in the needle which we last matched.
// Then when we get a new haystack, we start matching from that needle.
private _lastMatchingIndex = 0;

constructor(needle: string) {
this.needle = needle;
this._lookupVector = this._createLookupVector();
}

private _createLookupVector(): number[] {
const vector = new Array<number>(this.needle.length);
let j = 0;
vector[0] = 0;

for (let i = 1; i < this.needle.length; i++) {
while (j > 0 && this.needle[i] !== this.needle[j]) {
j = vector[j - 1];
}

if (this.needle[i] === this.needle[j]) {
j++;
}

vector[i] = j;
}

return vector;
}

// Returns the index in the haystackFragment **after** the needle.
// This is done because the match may have occurred over multiple fragments,
// so the index of the needle start would be negative.
public match(haystackFragment: string): number {
let j = this._lastMatchingIndex; // index in needle
let i = 0; // index in haystack

while (i < haystackFragment.length) {
if (haystackFragment[i] === this.needle[j]) {
i++;
j++;
}

if (j === this.needle.length) {
this._lastMatchingIndex = 0;
return i;
}

if (
i < haystackFragment.length &&
haystackFragment[i] !== this.needle[j]
) {
if (j !== 0) {
j = this._lookupVector[j - 1];
} else {
i++;
}
}
}

this._lastMatchingIndex = j;
return -1;
}

public reset(): void {
this._lastMatchingIndex = 0;
}
}

class FragmentMatcher {
private _startMatcher: StreamingKMP;
private _endMatcher: StreamingKMP;
private _matchedContent?: string;
private _onContentMatched: (content: string) => void;

constructor({
identifier,
onContentMatched,
}: {
identifier: {
start: string;
end: string;
};
onContentMatched: (content: string) => void;
}) {
this._startMatcher = new StreamingKMP(identifier.start);
this._endMatcher = new StreamingKMP(identifier.end);
this._onContentMatched = onContentMatched;
}

private _contentMatched(): void {
const content = this._matchedContent;
if (content !== undefined) {
// Strip the trailing end identifier from the matched content
this._onContentMatched(
content.slice(0, content.length - this._endMatcher.needle.length)
);
}

this._matchedContent = undefined;
this._startMatcher.reset();
this._endMatcher.reset();
}

public process(fragment: string): void {
if (this._matchedContent === undefined) {
// We haven't matched the start identifier yet, so try and do that
const startIndex = this._startMatcher.match(fragment);
if (startIndex !== -1) {
let endIndex = this._endMatcher.match(fragment.slice(startIndex));
if (endIndex !== -1) {
// This is the case where both the start and the end identifiers are contained in the same fragment.
// In this case, we emit the content between the two identifiers and continue processing the rest of the fragment.

// endIndex is relative to the slice, so we need to add startIndex to it.
endIndex = startIndex + endIndex;

this._matchedContent = fragment.slice(startIndex, endIndex);
this._contentMatched();
this.process(fragment.slice(endIndex));
} else {
// If we only matched the start, we add the partial fragment to the matched content and
// wait for another fragment to complete the match.
this._matchedContent = fragment.slice(startIndex);
}
}
} else {
const endIndex = this._endMatcher.match(fragment);
if (endIndex !== -1) {
// We've matched the end - emit the matched content and continue processing the partial fragment
this._matchedContent += fragment.slice(0, endIndex);
this._contentMatched();
this.process(fragment.slice(endIndex));
} else {
this._matchedContent += fragment;
}
}
}
}

/**
Expand All @@ -22,74 +165,13 @@ export async function processStreamWithIdentifiers({
end: string;
};
}): Promise<void> {
const escapedIdentifierStart = escapeRegex(identifier.start);
const escapedIdentifierEnd = escapeRegex(identifier.end);
const regex = new RegExp(
`${escapedIdentifierStart}([\\s\\S]*?)${escapedIdentifierEnd}`,
'g'
);

let contentSinceLastIdentifier = '';
for await (const fragment of inputIterable) {
contentSinceLastIdentifier += fragment;

let lastIndex = 0;
let match: RegExpExecArray | null;
while ((match = regex.exec(contentSinceLastIdentifier)) !== null) {
const endIndex = regex.lastIndex;

// Stream content up to the end of the identifier.
const contentToStream = contentSinceLastIdentifier.slice(
lastIndex,
endIndex
);
processStreamFragment(contentToStream);
const fragmentMatcher = new FragmentMatcher({
identifier,
onContentMatched: onStreamIdentifier,
});

const identifierContent = match[1];
onStreamIdentifier(identifierContent);

lastIndex = endIndex;
}

if (lastIndex > 0) {
// Remove all of the processed content.
contentSinceLastIdentifier = contentSinceLastIdentifier.slice(lastIndex);
// Reset the regex.
regex.lastIndex = 0;
} else {
// Clear as much of the content as we can safely.
const maxUnprocessedLength = identifier.start.length - 1;
if (contentSinceLastIdentifier.length > maxUnprocessedLength) {
const identifierIndex = contentSinceLastIdentifier.indexOf(
identifier.start
);
if (identifierIndex > -1) {
// We have an identifier, so clear up until the identifier.
const contentToStream = contentSinceLastIdentifier.slice(
0,
identifierIndex
);
processStreamFragment(contentToStream);
contentSinceLastIdentifier =
contentSinceLastIdentifier.slice(identifierIndex);
} else {
// No identifier, so clear up until the last maxUnprocessedLength.
const processUpTo =
contentSinceLastIdentifier.length - maxUnprocessedLength;
const contentToStream = contentSinceLastIdentifier.slice(
0,
processUpTo
);
processStreamFragment(contentToStream);
contentSinceLastIdentifier =
contentSinceLastIdentifier.slice(processUpTo);
}
}
}
}

// Finish up anything not streamed yet.
if (contentSinceLastIdentifier.length > 0) {
processStreamFragment(contentSinceLastIdentifier);
for await (const fragment of inputIterable) {
processStreamFragment(fragment);
fragmentMatcher.process(fragment);
}
}

0 comments on commit bfe1f72

Please sign in to comment.