Skip to content

Commit

Permalink
Fix file data streaming for http polling
Browse files Browse the repository at this point in the history
  • Loading branch information
msujew committed Dec 20, 2024
1 parent 8d7fa9e commit 911b81d
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions packages/filesystem/src/common/remote-file-system-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
private readonly onFileStreamEndEmitter = new Emitter<[number, Error | FileSystemProviderError | undefined]>();
private readonly onFileStreamEnd = this.onFileStreamEndEmitter.event;

private readonly onFileStreamStartEmitter = new Emitter<number>();
private readonly onFileStreamStart = this.onFileStreamStartEmitter.event;

protected readonly toDispose = new DisposableCollection(
this.onDidChangeFileEmitter,
this.onDidChangeCapabilitiesEmitter,
Expand Down Expand Up @@ -162,6 +165,8 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
protected readonly readyDeferred = new Deferred<void>();
readonly ready = this.readyDeferred.promise;

protected readonly streaming = new Set<number>();

/**
* Wrapped remote filesystem.
*/
Expand Down Expand Up @@ -194,6 +199,29 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
onInitialized.dispose();
this.toDispose.push(this.server.onDidOpenConnection(() => this.reconnect()));
});
// When using long polling, the handle is registered *after* the first events arrive
// Therefore, we need to buffer events until the handle is registered
// Once the handle is registered, we forward the events in the same order they arrived
this.onFileStreamData(([handle, data]) => {
if (!this.streaming.has(handle)) {
const disposable = this.onFileStreamStart(streamHandle => {
if (streamHandle === handle) {
disposable.dispose();
this.onFileStreamDataEmitter.fire([handle, data]);
}
});
}
});
this.onFileStreamEnd(([handle, error]) => {
if (!this.streaming.has(handle)) {
const disposable = this.onFileStreamStart(streamHandle => {
if (streamHandle === handle) {
disposable.dispose();
this.onFileStreamEndEmitter.fire([handle, error]);
}
});
}
});
}

dispose(): void {
Expand Down Expand Up @@ -258,6 +286,7 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
stream.end(cancelled());
return;
}
this.streaming.add(streamHandle);
const toDispose = new DisposableCollection(
token.onCancellationRequested(() => stream.end(cancelled())),
this.onFileStreamData(([handle, data]) => {
Expand All @@ -277,10 +306,12 @@ export class RemoteFileSystemProvider implements Required<FileSystemProvider>, D
} else {
stream.end();
}
this.streaming.delete(streamHandle);
}
})
);
stream.on('end', () => toDispose.dispose());
this.onFileStreamStartEmitter.fire(streamHandle);
}, error => stream.end(error));
return stream;
}
Expand Down

0 comments on commit 911b81d

Please sign in to comment.