From 08b69fffe1a53f98bd0281f62931561922ccf96c Mon Sep 17 00:00:00 2001 From: Matt Gibson Date: Tue, 12 Dec 2023 08:07:42 -0500 Subject: [PATCH] Revert "Ps/avoid state emit until updated (#7124)" (#7187) This reverts commit 38c335d8fbca60003d189ce8d14e542a9d48e093. --- libs/common/spec/fake-storage.service.ts | 2 +- libs/common/spec/utils.ts | 6 +- .../default-active-user-state.spec.ts | 351 ++---------------- .../default-active-user-state.ts | 199 ++++------ .../default-global-state.spec.ts | 208 +---------- .../implementations/default-global-state.ts | 140 +++---- .../default-single-user-state.spec.ts | 229 +----------- .../default-single-user-state.ts | 145 +++----- .../src/platform/state/key-definition.spec.ts | 31 -- .../src/platform/state/key-definition.ts | 22 +- 10 files changed, 206 insertions(+), 1127 deletions(-) diff --git a/libs/common/spec/fake-storage.service.ts b/libs/common/spec/fake-storage.service.ts index 7c9e5b32315..ba3e4613466 100644 --- a/libs/common/spec/fake-storage.service.ts +++ b/libs/common/spec/fake-storage.service.ts @@ -59,7 +59,7 @@ export class FakeStorageService implements AbstractStorageService { return Promise.resolve(this.store[key] != null); } save(key: string, obj: T, options?: StorageOptions): Promise { - this.mock.save(key, obj, options); + this.mock.save(key, options); this.store[key] = obj; this.updatesSubject.next({ key: key, updateType: "save" }); return Promise.resolve(); diff --git a/libs/common/spec/utils.ts b/libs/common/spec/utils.ts index ad5907f61d3..5053a71c874 100644 --- a/libs/common/spec/utils.ts +++ b/libs/common/spec/utils.ts @@ -69,10 +69,6 @@ export function trackEmissions(observable: Observable): T[] { case "boolean": emissions.push(value); break; - case "symbol": - // Cheating types to make symbols work at all - emissions.push(value.toString() as T); - break; default: { emissions.push(clone(value)); } @@ -89,7 +85,7 @@ function clone(value: any): any { } } -export async function awaitAsync(ms = 1) { +export async function awaitAsync(ms = 0) { if (ms < 1) { await Promise.resolve(); } else { diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts index e4a3f80eecb..065f7a8e959 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.spec.ts @@ -2,7 +2,7 @@ * need to update test environment so trackEmissions works appropriately * @jest-environment ../shared/test.environment.ts */ -import { any, anySymbol, mock } from "jest-mock-extended"; +import { any, mock } from "jest-mock-extended"; import { BehaviorSubject, firstValueFrom, of, timeout } from "rxjs"; import { Jsonify } from "type-fest"; @@ -11,7 +11,7 @@ import { FakeStorageService } from "../../../../spec/fake-storage.service"; import { AccountInfo, AccountService } from "../../../auth/abstractions/account.service"; import { AuthenticationStatus } from "../../../auth/enums/authentication-status"; import { UserId } from "../../../types/guid"; -import { KeyDefinition, userKeyBuilder } from "../key-definition"; +import { KeyDefinition } from "../key-definition"; import { StateDefinition } from "../state-definition"; import { DefaultActiveUserState } from "./default-active-user-state"; @@ -32,10 +32,9 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); -const cleanupDelayMs = 10; + const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, - cleanupDelayMs, }); describe("DefaultActiveUserState", () => { @@ -57,14 +56,10 @@ describe("DefaultActiveUserState", () => { ); }); - const makeUserId = (id: string) => { - return id != null ? (`00000000-0000-1000-a000-00000000000${id}` as UserId) : undefined; - }; - const changeActiveUser = async (id: string) => { - const userId = makeUserId(id); + const userId = id != null ? `00000000-0000-1000-a000-00000000000${id}` : undefined; activeAccountSubject.next({ - id: userId, + id: userId as UserId, email: `test${id}@example.com`, name: `Test User ${id}`, status: AuthenticationStatus.Unlocked, @@ -95,7 +90,7 @@ describe("DefaultActiveUserState", () => { const emissions = trackEmissions(userState.state$); // User signs in - await changeActiveUser("1"); + changeActiveUser("1"); await awaitAsync(); // Service does an update @@ -116,17 +111,17 @@ describe("DefaultActiveUserState", () => { expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 1, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), // options + any(), ); expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 2, "user_00000000-0000-1000-a000-000000000001_fake_fake", - any(), // options + any(), ); expect(diskStorageService.mock.get).toHaveBeenNthCalledWith( 3, "user_00000000-0000-1000-a000-000000000002_fake_fake", - any(), // options + any(), ); // Should only have saved data for the first user @@ -134,8 +129,7 @@ describe("DefaultActiveUserState", () => { expect(diskStorageService.mock.save).toHaveBeenNthCalledWith( 1, "user_00000000-0000-1000-a000-000000000001_fake_fake", - updatedState, - any(), // options + any(), ); }); @@ -189,17 +183,15 @@ describe("DefaultActiveUserState", () => { }); it("should not emit a previous users value if that user is no longer active", async () => { - const user1Data: Jsonify = { - date: "2020-09-21T13:14:17.648Z", - array: ["value"], - }; - const user2Data: Jsonify = { - date: "2020-09-21T13:14:17.648Z", - array: [], - }; diskStorageService.internalUpdateStore({ - "user_00000000-0000-1000-a000-000000000001_fake_fake": user1Data, - "user_00000000-0000-1000-a000-000000000002_fake_fake": user2Data, + "user_00000000-0000-1000-a000-000000000001_fake_fake": { + date: "2020-09-21T13:14:17.648Z", + array: ["value"], + } as Jsonify, + "user_00000000-0000-1000-a000-000000000002_fake_fake": { + date: "2020-09-21T13:14:17.648Z", + array: [], + } as Jsonify, }); // This starts one subscription on the observable for tracking emissions throughout @@ -211,7 +203,7 @@ describe("DefaultActiveUserState", () => { // This should always return a value right await const value = await firstValueFrom(userState.state$); - expect(value).toEqual(user1Data); + expect(value).toBeTruthy(); // Make it such that there is no active user await changeActiveUser(undefined); @@ -230,34 +222,20 @@ describe("DefaultActiveUserState", () => { rejectedError = err; }); - expect(resolvedValue).toBeUndefined(); - expect(rejectedError).not.toBeUndefined(); + expect(resolvedValue).toBeFalsy(); + expect(rejectedError).toBeTruthy(); expect(rejectedError.message).toBe("Timeout has occurred"); // We need to figure out if something should be emitted // when there becomes no active user, if we don't want that to emit // this value is correct. - expect(emissions).toEqual([user1Data]); - }); - - it("should not emit twice if there are two listeners", async () => { - await changeActiveUser("1"); - const emissions = trackEmissions(userState.state$); - const emissions2 = trackEmissions(userState.state$); - await awaitAsync(); - - expect(emissions).toEqual([ - null, // Initial value - ]); - expect(emissions2).toEqual([ - null, // Initial value - ]); + expect(emissions).toHaveLength(2); }); describe("update", () => { const newData = { date: new Date(), array: ["test"] }; beforeEach(async () => { - await changeActiveUser("1"); + changeActiveUser("1"); }); it("should save on update", async () => { @@ -337,8 +315,6 @@ describe("DefaultActiveUserState", () => { return initialData; }); - await awaitAsync(); - await userState.update((state, dependencies) => { expect(state).toEqual(initialData); return newData; @@ -353,285 +329,4 @@ describe("DefaultActiveUserState", () => { ]); }); }); - - describe("update races", () => { - const newData = { date: new Date(), array: ["test"] }; - const userId = makeUserId("1"); - - beforeEach(async () => { - await changeActiveUser("1"); - await awaitAsync(); - }); - - test("subscriptions during an update should receive the current and latest", async () => { - const oldData = { date: new Date(2019, 1, 1), array: ["oldValue1"] }; - await userState.update(() => { - return oldData; - }); - const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] }; - await userState.update(() => { - return initialData; - }); - - await awaitAsync(); - - const emissions = trackEmissions(userState.state$); - await awaitAsync(); - expect(emissions).toEqual([initialData]); - - let emissions2: TestState[]; - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { - emissions2 = trackEmissions(userState.state$); - await originalSave(key, obj); - }); - - const val = await userState.update(() => { - return newData; - }); - - await awaitAsync(10); - - expect(val).toEqual(newData); - expect(emissions).toEqual([initialData, newData]); - expect(emissions2).toEqual([initialData, newData]); - }); - - test("subscription during an aborted update should receive the last value", async () => { - // Seed with interesting data - const initialData = { date: new Date(2020, 1, 1), array: ["value1", "value2"] }; - await userState.update(() => { - return initialData; - }); - - await awaitAsync(); - - const emissions = trackEmissions(userState.state$); - await awaitAsync(); - expect(emissions).toEqual([initialData]); - - let emissions2: TestState[]; - const val = await userState.update( - (state) => { - return newData; - }, - { - shouldUpdate: () => { - emissions2 = trackEmissions(userState.state$); - return false; - }, - }, - ); - - await awaitAsync(); - - expect(val).toEqual(initialData); - expect(emissions).toEqual([initialData]); - - expect(emissions2).toEqual([initialData]); - }); - - test("updates should wait until previous update is complete", async () => { - trackEmissions(userState.state$); - await awaitAsync(); // storage updates are behind a promise - - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest - .fn() - .mockImplementationOnce(async (key: string, obj: any) => { - let resolved = false; - await Promise.race([ - userState.update(() => { - // deadlocks - resolved = true; - return newData; - }), - awaitAsync(100), // limit test to 100ms - ]); - expect(resolved).toBe(false); - }) - .mockImplementation((...args) => { - return originalSave(...args); - }); - - await userState.update(() => { - return newData; - }); - }); - - test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { - expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT - const val = await userState.update((state) => { - return newData; - }); - - expect(val).toEqual(newData); - const call = diskStorageService.mock.save.mock.calls[0]; - expect(call[0]).toEqual(`user_${userId}_fake_fake`); - expect(call[1]).toEqual(newData); - }); - - it("does not await updates if the active user changes", async () => { - const initialUserId = (await firstValueFrom(accountService.activeAccount$)).id; - expect(initialUserId).toBe(userId); - trackEmissions(userState.state$); - await awaitAsync(); // storage updates are behind a promise - - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest - .fn() - .mockImplementationOnce(async (key: string, obj: any) => { - let resolved = false; - await changeActiveUser("2"); - await Promise.race([ - userState.update(() => { - // should not deadlock because we updated the user - resolved = true; - return newData; - }), - awaitAsync(100), // limit test to 100ms - ]); - expect(resolved).toBe(true); - }) - .mockImplementation((...args) => { - return originalSave(...args); - }); - - await userState.update(() => { - return newData; - }); - }); - - it("stores updates for users in the correct place when active user changes mid-update", async () => { - trackEmissions(userState.state$); - await awaitAsync(); // storage updates are behind a promise - - const user2Data = { date: new Date(), array: ["user 2 data"] }; - - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest - .fn() - .mockImplementationOnce(async (key: string, obj: any) => { - let resolved = false; - await changeActiveUser("2"); - await Promise.race([ - userState.update(() => { - // should not deadlock because we updated the user - resolved = true; - return user2Data; - }), - awaitAsync(100), // limit test to 100ms - ]); - expect(resolved).toBe(true); - await originalSave(key, obj); - }) - .mockImplementation((...args) => { - return originalSave(...args); - }); - - await userState.update(() => { - return newData; - }); - await awaitAsync(); - - expect(diskStorageService.mock.save).toHaveBeenCalledTimes(2); - const innerCall = diskStorageService.mock.save.mock.calls[0]; - expect(innerCall[0]).toEqual(`user_${makeUserId("2")}_fake_fake`); - expect(innerCall[1]).toEqual(user2Data); - const outerCall = diskStorageService.mock.save.mock.calls[1]; - expect(outerCall[0]).toEqual(`user_${makeUserId("1")}_fake_fake`); - expect(outerCall[1]).toEqual(newData); - }); - }); - - describe("cleanup", () => { - const newData = { date: new Date(), array: ["test"] }; - const userId = makeUserId("1"); - let userKey: string; - - beforeEach(async () => { - await changeActiveUser("1"); - userKey = userKeyBuilder(userId, testKeyDefinition); - }); - - async function assertClean() { - const emissions = trackEmissions(userState["stateSubject"]); - const initial = structuredClone(emissions); - - diskStorageService.save(userKey, newData); - await awaitAsync(); // storage updates are behind a promise - - expect(emissions).toEqual(initial); // no longer listening to storage updates - } - - it("should cleanup after last subscriber", async () => { - const subscription = userState.state$.subscribe(); - await awaitAsync(); // storage updates are behind a promise - - subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - await assertClean(); - }); - - it("should not cleanup if there are still subscribers", async () => { - const subscription1 = userState.state$.subscribe(); - const sub2Emissions: TestState[] = []; - const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v)); - await awaitAsync(); // storage updates are behind a promise - - subscription1.unsubscribe(); - - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - expect(userState["subscriberCount"].getValue()).toBe(1); - - // Still be listening to storage updates - diskStorageService.save(userKey, newData); - await awaitAsync(); // storage updates are behind a promise - expect(sub2Emissions).toEqual([null, newData]); - - subscription2.unsubscribe(); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - await assertClean(); - }); - - it("can re-initialize after cleanup", async () => { - const subscription = userState.state$.subscribe(); - await awaitAsync(); - - subscription.unsubscribe(); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - const emissions = trackEmissions(userState.state$); - await awaitAsync(); - - diskStorageService.save(userKey, newData); - await awaitAsync(); - - expect(emissions).toEqual([null, newData]); - }); - - it("should not cleanup if a subscriber joins during the cleanup delay", async () => { - const subscription = userState.state$.subscribe(); - await awaitAsync(); - - await diskStorageService.save(userKey, newData); - await awaitAsync(); - - subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); - // Do not wait long enough for cleanup - await awaitAsync(cleanupDelayMs / 2); - - expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared - expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates - }); - }); }); diff --git a/libs/common/src/platform/state/implementations/default-active-user-state.ts b/libs/common/src/platform/state/implementations/default-active-user-state.ts index ae5c25dcb40..3d36af1d61c 100644 --- a/libs/common/src/platform/state/implementations/default-active-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-active-user-state.ts @@ -4,12 +4,12 @@ import { map, shareReplay, switchMap, + tap, + defer, firstValueFrom, combineLatestWith, filter, timeout, - Subscription, - tap, } from "rxjs"; import { AccountService } from "../../../auth/abstractions/account.service"; @@ -31,21 +31,13 @@ const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultActiveUserState implements ActiveUserState { [activeMarker]: true; private formattedKey$: Observable; - private updatePromise: Promise | null = null; - private storageUpdateSubscription: Subscription; - private activeAccountUpdateSubscription: Subscription; - private subscriberCount = new BehaviorSubject(0); - private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); private stateSubject$ = this.stateSubject.asObservable(); - get state$() { - this.stateObservable = this.stateObservable ?? this.initializeObservable(); - return this.stateObservable; - } + state$: Observable; constructor( protected keyDefinition: KeyDefinition, @@ -59,12 +51,62 @@ export class DefaultActiveUserState implements ActiveUserState { ? userKeyBuilder(account.id, this.keyDefinition) : null, ), - tap(() => { - // We have a new key, so we should forget about previous update promises - this.updatePromise = null; - }), shareReplay({ bufferSize: 1, refCount: false }), ); + + const activeAccountData$ = this.formattedKey$.pipe( + switchMap(async (key) => { + if (key == null) { + return FAKE_DEFAULT; + } + return await getStoredValue( + key, + this.chosenStorageLocation, + this.keyDefinition.deserializer, + ); + }), + // Share the execution + shareReplay({ refCount: false, bufferSize: 1 }), + ); + + const storageUpdates$ = this.chosenStorageLocation.updates$.pipe( + combineLatestWith(this.formattedKey$), + filter(([update, key]) => key !== null && update.key === key), + switchMap(async ([update, key]) => { + if (update.updateType === "remove") { + return null; + } + const data = await getStoredValue( + key, + this.chosenStorageLocation, + this.keyDefinition.deserializer, + ); + return data; + }), + ); + + // Whomever subscribes to this data, should be notified of updated data + // if someone calls my update() method, or the active user changes. + this.state$ = defer(() => { + const accountChangeSubscription = activeAccountData$.subscribe((data) => { + this.stateSubject.next(data); + }); + const storageUpdateSubscription = storageUpdates$.subscribe((data) => { + this.stateSubject.next(data); + }); + + return this.stateSubject$.pipe( + tap({ + complete: () => { + accountChangeSubscription.unsubscribe(); + storageUpdateSubscription.unsubscribe(); + }, + }), + ); + }) + // I fake the generic here because I am filtering out the other union type + // and this makes it so that typescript understands the true type + .pipe(filter((value) => value != FAKE_DEFAULT)); } async update( @@ -72,34 +114,8 @@ export class DefaultActiveUserState implements ActiveUserState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); - try { - if (this.updatePromise != null) { - await this.updatePromise; - } - this.updatePromise = this.internalUpdate(configureState, options); - const newState = await this.updatePromise; - return newState; - } finally { - this.updatePromise = null; - } - } - - // TODO: this should be removed - async getFromState(): Promise { - const key = await this.createKey(); - return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); - } - - createDerived(converter: Converter): DerivedUserState { - return new DefaultDerivedUserState(converter, this.encryptService, this); - } - - private async internalUpdate( - configureState: (state: T, dependency: TCombine) => T, - options: StateUpdateOptions, - ) { const key = await this.createKey(); - const currentState = await this.getStateForUpdate(key); + const currentState = await this.getGuaranteedState(key); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -114,53 +130,13 @@ export class DefaultActiveUserState implements ActiveUserState { return newState; } - private initializeObservable() { - this.storageUpdateSubscription = this.chosenStorageLocation.updates$ - .pipe( - combineLatestWith(this.formattedKey$), - filter(([update, key]) => key !== null && update.key === key), - switchMap(async ([update, key]) => { - if (update.updateType === "remove") { - return null; - } - return await this.getState(key); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.activeAccountUpdateSubscription = this.formattedKey$ - .pipe( - switchMap(async (key) => { - if (key == null) { - return FAKE_DEFAULT; - } - return await this.getState(key); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.subscriberCount.subscribe((count) => { - if (count === 0 && this.stateObservable != null) { - this.triggerCleanup(); - } - }); - - return new Observable((subscriber) => { - this.incrementSubscribers(); - - const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); - subscriber.unsubscribe = () => { - this.decrementSubscribers(); - prevUnsubscribe(); - }; - - return this.stateSubject - .pipe( - // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. - filter((i) => i !== FAKE_DEFAULT), - ) - .subscribe(subscriber); - }); + async getFromState(): Promise { + const key = await this.createKey(); + return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); + } + + createDerived(converter: Converter): DerivedUserState { + return new DefaultDerivedUserState(converter, this.encryptService, this); } protected async createKey(): Promise { @@ -171,47 +147,22 @@ export class DefaultActiveUserState implements ActiveUserState { return formattedKey; } - /** For use in update methods, does not wait for update to complete before yielding state. - * The expectation is that that await is already done - */ - protected async getStateForUpdate(key: string) { + protected async getGuaranteedState(key: string) { const currentValue = this.stateSubject.getValue(); - return currentValue === FAKE_DEFAULT - ? await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer) - : currentValue; + return currentValue === FAKE_DEFAULT ? await this.seedInitial(key) : currentValue; } - /** To be used in observables. Awaits updates to ensure they are complete */ - private async getState(key: string): Promise { - if (this.updatePromise != null) { - await this.updatePromise; - } - return await getStoredValue(key, this.chosenStorageLocation, this.keyDefinition.deserializer); + private async seedInitial(key: string): Promise { + const value = await getStoredValue( + key, + this.chosenStorageLocation, + this.keyDefinition.deserializer, + ); + this.stateSubject.next(value); + return value; } protected saveToStorage(key: string, data: T): Promise { return this.chosenStorageLocation.save(key, data); } - - private incrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value + 1); - } - - private decrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value - 1); - } - - private triggerCleanup() { - setTimeout(() => { - if (this.subscriberCount.value === 0) { - this.updatePromise = null; - this.storageUpdateSubscription?.unsubscribe(); - this.activeAccountUpdateSubscription?.unsubscribe(); - this.stateObservable = null; - this.subscriberCount.complete(); - this.subscriberCount = new BehaviorSubject(0); - this.stateSubject.next(FAKE_DEFAULT); - } - }, this.keyDefinition.cleanupDelayMs); - } } diff --git a/libs/common/src/platform/state/implementations/default-global-state.spec.ts b/libs/common/src/platform/state/implementations/default-global-state.spec.ts index f9f95c652d8..ae6cd1adbfd 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.spec.ts @@ -3,7 +3,6 @@ * @jest-environment ../shared/test.environment.ts */ -import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -29,10 +28,9 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); -const cleanupDelayMs = 10; + const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, - cleanupDelayMs, }); const globalKey = globalKeyBuilder(testKeyDefinition); @@ -81,19 +79,6 @@ describe("DefaultGlobalState", () => { expect(diskStorageService.mock.get).toHaveBeenCalledWith("global_fake_fake", undefined); expect(state).toBeTruthy(); }); - - it("should not emit twice if there are two listeners", async () => { - const emissions = trackEmissions(globalState.state$); - const emissions2 = trackEmissions(globalState.state$); - await awaitAsync(); - - expect(emissions).toEqual([ - null, // Initial value - ]); - expect(emissions2).toEqual([ - null, // Initial value - ]); - }); }); describe("update", () => { @@ -148,7 +133,6 @@ describe("DefaultGlobalState", () => { it("should not update if shouldUpdate returns false", async () => { const emissions = trackEmissions(globalState.state$); - await awaitAsync(); // storage updates are behind a promise const result = await globalState.update( (state) => { @@ -214,194 +198,4 @@ describe("DefaultGlobalState", () => { expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); }); }); - - describe("update races", () => { - test("subscriptions during an update should receive the current and latest data", async () => { - const oldData = { date: new Date(2019, 1, 1) }; - await globalState.update(() => { - return oldData; - }); - const initialData = { date: new Date(2020, 1, 1) }; - await globalState.update(() => { - return initialData; - }); - - await awaitAsync(); - - const emissions = trackEmissions(globalState.state$); - await awaitAsync(); - expect(emissions).toEqual([initialData]); - - let emissions2: TestState[]; - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { - emissions2 = trackEmissions(globalState.state$); - await originalSave(key, obj); - }); - - const val = await globalState.update(() => { - return newData; - }); - - await awaitAsync(10); - - expect(val).toEqual(newData); - expect(emissions).toEqual([initialData, newData]); - expect(emissions2).toEqual([initialData, newData]); - }); - - test("subscription during an aborted update should receive the last value", async () => { - // Seed with interesting data - const initialData = { date: new Date(2020, 1, 1) }; - await globalState.update(() => { - return initialData; - }); - - await awaitAsync(); - - const emissions = trackEmissions(globalState.state$); - await awaitAsync(); - expect(emissions).toEqual([initialData]); - - let emissions2: TestState[]; - const val = await globalState.update( - () => { - return newData; - }, - { - shouldUpdate: () => { - emissions2 = trackEmissions(globalState.state$); - return false; - }, - }, - ); - - await awaitAsync(); - - expect(val).toEqual(initialData); - expect(emissions).toEqual([initialData]); - - expect(emissions2).toEqual([initialData]); - }); - - test("updates should wait until previous update is complete", async () => { - trackEmissions(globalState.state$); - await awaitAsync(); // storage updates are behind a promise - - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest - .fn() - .mockImplementationOnce(async () => { - let resolved = false; - await Promise.race([ - globalState.update(() => { - // deadlocks - resolved = true; - return newData; - }), - awaitAsync(100), // limit test to 100ms - ]); - expect(resolved).toBe(false); - }) - .mockImplementation(originalSave); - - await globalState.update((state) => { - return newData; - }); - }); - - test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { - expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT - const val = await globalState.update((state) => { - return newData; - }); - - expect(val).toEqual(newData); - const call = diskStorageService.mock.save.mock.calls[0]; - expect(call[0]).toEqual("global_fake_fake"); - expect(call[1]).toEqual(newData); - }); - }); - - describe("cleanup", () => { - async function assertClean() { - const emissions = trackEmissions(globalState["stateSubject"]); - const initial = structuredClone(emissions); - - diskStorageService.save(globalKey, newData); - await awaitAsync(); // storage updates are behind a promise - - expect(emissions).toEqual(initial); // no longer listening to storage updates - } - - it("should cleanup after last subscriber", async () => { - const subscription = globalState.state$.subscribe(); - await awaitAsync(); // storage updates are behind a promise - - subscription.unsubscribe(); - expect(globalState["subscriberCount"].getValue()).toBe(0); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - await assertClean(); - }); - - it("should not cleanup if there are still subscribers", async () => { - const subscription1 = globalState.state$.subscribe(); - const sub2Emissions: TestState[] = []; - const subscription2 = globalState.state$.subscribe((v) => sub2Emissions.push(v)); - await awaitAsync(); // storage updates are behind a promise - - subscription1.unsubscribe(); - - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - expect(globalState["subscriberCount"].getValue()).toBe(1); - - // Still be listening to storage updates - diskStorageService.save(globalKey, newData); - await awaitAsync(); // storage updates are behind a promise - expect(sub2Emissions).toEqual([null, newData]); - - subscription2.unsubscribe(); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - await assertClean(); - }); - - it("can re-initialize after cleanup", async () => { - const subscription = globalState.state$.subscribe(); - await awaitAsync(); - - subscription.unsubscribe(); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - const emissions = trackEmissions(globalState.state$); - await awaitAsync(); - - diskStorageService.save(globalKey, newData); - await awaitAsync(); - - expect(emissions).toEqual([null, newData]); - }); - - it("should not cleanup if a subscriber joins during the cleanup delay", async () => { - const subscription = globalState.state$.subscribe(); - await awaitAsync(); - - await diskStorageService.save(globalKey, newData); - await awaitAsync(); - - subscription.unsubscribe(); - expect(globalState["subscriberCount"].getValue()).toBe(0); - // Do not wait long enough for cleanup - await awaitAsync(cleanupDelayMs / 2); - - expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared - expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates - }); - }); }); diff --git a/libs/common/src/platform/state/implementations/default-global-state.ts b/libs/common/src/platform/state/implementations/default-global-state.ts index 39430799a8f..8e08717f721 100644 --- a/libs/common/src/platform/state/implementations/default-global-state.ts +++ b/libs/common/src/platform/state/implementations/default-global-state.ts @@ -1,10 +1,12 @@ import { BehaviorSubject, Observable, - Subscription, + defer, filter, firstValueFrom, + shareReplay, switchMap, + tap, timeout, } from "rxjs"; @@ -21,25 +23,54 @@ const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultGlobalState implements GlobalState { private storageKey: string; - private updatePromise: Promise | null = null; - private storageUpdateSubscription: Subscription; - private subscriberCount = new BehaviorSubject(0); - private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); - get state$() { - this.stateObservable = this.stateObservable ?? this.initializeObservable(); - return this.stateObservable; - } + state$: Observable; constructor( private keyDefinition: KeyDefinition, private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = globalKeyBuilder(this.keyDefinition); + + const storageUpdates$ = this.chosenLocation.updates$.pipe( + filter((update) => update.key === this.storageKey), + switchMap(async (update) => { + if (update.updateType === "remove") { + return null; + } + return await getStoredValue( + this.storageKey, + this.chosenLocation, + this.keyDefinition.deserializer, + ); + }), + shareReplay({ bufferSize: 1, refCount: false }), + ); + + this.state$ = defer(() => { + const storageUpdateSubscription = storageUpdates$.subscribe((value) => { + this.stateSubject.next(value); + }); + + this.getFromState().then((s) => { + this.stateSubject.next(s); + }); + + return this.stateSubject.pipe( + tap({ + complete: () => { + storageUpdateSubscription.unsubscribe(); + }, + }), + ); + }).pipe( + shareReplay({ refCount: false, bufferSize: 1 }), + filter((i) => i != FAKE_DEFAULT), + ); } async update( @@ -47,24 +78,7 @@ export class DefaultGlobalState implements GlobalState { options: StateUpdateOptions = {}, ): Promise { options = populateOptionsWithDefault(options); - if (this.updatePromise != null) { - await this.updatePromise; - } - - try { - this.updatePromise = this.internalUpdate(configureState, options); - const newState = await this.updatePromise; - return newState; - } finally { - this.updatePromise = null; - } - } - - private async internalUpdate( - configureState: (state: T, dependency: TCombine) => T, - options: StateUpdateOptions, - ): Promise { - const currentState = await this.getStateForUpdate(); + const currentState = await this.getGuaranteedState(); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -79,86 +93,16 @@ export class DefaultGlobalState implements GlobalState { return newState; } - private initializeObservable() { - this.storageUpdateSubscription = this.chosenLocation.updates$ - .pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await this.getFromState(); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.subscriberCount.subscribe((count) => { - if (count === 0 && this.stateObservable != null) { - this.triggerCleanup(); - } - }); - - // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to - // trigger populating it immediately. - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return new Observable((subscriber) => { - this.incrementSubscribers(); - - const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); - subscriber.unsubscribe = () => { - this.decrementSubscribers(); - prevUnsubscribe(); - }; - - return this.stateSubject - .pipe( - // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. - filter((i) => i != FAKE_DEFAULT), - ) - .subscribe(subscriber); - }); - } - - /** For use in update methods, does not wait for update to complete before yielding state. - * The expectation is that that await is already done - */ - private async getStateForUpdate() { + private async getGuaranteedState() { const currentValue = this.stateSubject.getValue(); return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; } async getFromState(): Promise { - if (this.updatePromise != null) { - return await this.updatePromise; - } return await getStoredValue( this.storageKey, this.chosenLocation, this.keyDefinition.deserializer, ); } - - private incrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value + 1); - } - - private decrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value - 1); - } - - private triggerCleanup() { - setTimeout(() => { - if (this.subscriberCount.value === 0) { - this.updatePromise = null; - this.storageUpdateSubscription.unsubscribe(); - this.stateObservable = null; - this.subscriberCount.complete(); - this.subscriberCount = new BehaviorSubject(0); - this.stateSubject.next(FAKE_DEFAULT); - } - }, this.keyDefinition.cleanupDelayMs); - } } diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts index 1c24c5f48c7..a25ee863e6b 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.spec.ts @@ -3,7 +3,6 @@ * @jest-environment ../shared/test.environment.ts */ -import { anySymbol } from "jest-mock-extended"; import { firstValueFrom, of } from "rxjs"; import { Jsonify } from "type-fest"; @@ -31,22 +30,21 @@ class TestState { } const testStateDefinition = new StateDefinition("fake", "disk"); -const cleanupDelayMs = 10; + const testKeyDefinition = new KeyDefinition(testStateDefinition, "fake", { deserializer: TestState.fromJSON, - cleanupDelayMs, }); const userId = Utils.newGuid() as UserId; const userKey = userKeyBuilder(userId, testKeyDefinition); describe("DefaultSingleUserState", () => { let diskStorageService: FakeStorageService; - let userState: DefaultSingleUserState; + let globalState: DefaultSingleUserState; const newData = { date: new Date() }; beforeEach(() => { diskStorageService = new FakeStorageService(); - userState = new DefaultSingleUserState( + globalState = new DefaultSingleUserState( userId, testKeyDefinition, null, // Not testing anything with encrypt service @@ -60,7 +58,7 @@ describe("DefaultSingleUserState", () => { describe("state$", () => { it("should emit when storage updates", async () => { - const emissions = trackEmissions(userState.state$); + const emissions = trackEmissions(globalState.state$); await diskStorageService.save(userKey, newData); await awaitAsync(); @@ -71,7 +69,7 @@ describe("DefaultSingleUserState", () => { }); it("should not emit when update key does not match", async () => { - const emissions = trackEmissions(userState.state$); + const emissions = trackEmissions(globalState.state$); await diskStorageService.save("wrong_key", newData); expect(emissions).toHaveLength(0); @@ -84,7 +82,7 @@ describe("DefaultSingleUserState", () => { }); diskStorageService.internalUpdateStore(initialStorage); - const state = await firstValueFrom(userState.state$); + const state = await firstValueFrom(globalState.state$); expect(diskStorageService.mock.get).toHaveBeenCalledTimes(1); expect(diskStorageService.mock.get).toHaveBeenCalledWith( `user_${userId}_fake_fake`, @@ -96,7 +94,7 @@ describe("DefaultSingleUserState", () => { describe("update", () => { it("should save on update", async () => { - const result = await userState.update((state) => { + const result = await globalState.update((state) => { return newData; }); @@ -105,10 +103,10 @@ describe("DefaultSingleUserState", () => { }); it("should emit once per update", async () => { - const emissions = trackEmissions(userState.state$); + const emissions = trackEmissions(globalState.state$); await awaitAsync(); // storage updates are behind a promise - await userState.update((state) => { + await globalState.update((state) => { return newData; }); @@ -121,12 +119,12 @@ describe("DefaultSingleUserState", () => { }); it("should provided combined dependencies", async () => { - const emissions = trackEmissions(userState.state$); + const emissions = trackEmissions(globalState.state$); await awaitAsync(); // storage updates are behind a promise const combinedDependencies = { date: new Date() }; - await userState.update( + await globalState.update( (state, dependencies) => { expect(dependencies).toEqual(combinedDependencies); return newData; @@ -145,10 +143,9 @@ describe("DefaultSingleUserState", () => { }); it("should not update if shouldUpdate returns false", async () => { - const emissions = trackEmissions(userState.state$); - await awaitAsync(); // storage updates are behind a promise + const emissions = trackEmissions(globalState.state$); - const result = await userState.update( + const result = await globalState.update( (state) => { return newData; }, @@ -163,18 +160,18 @@ describe("DefaultSingleUserState", () => { }); it("should provide the update callback with the current State", async () => { - const emissions = trackEmissions(userState.state$); + const emissions = trackEmissions(globalState.state$); await awaitAsync(); // storage updates are behind a promise // Seed with interesting data const initialData = { date: new Date(2020, 1, 1) }; - await userState.update((state, dependencies) => { + await globalState.update((state, dependencies) => { return initialData; }); await awaitAsync(); - await userState.update((state) => { + await globalState.update((state) => { expect(state).toEqual(initialData); return newData; }); @@ -196,14 +193,14 @@ describe("DefaultSingleUserState", () => { initialStorage[userKey] = initialState; diskStorageService.internalUpdateStore(initialStorage); - const emissions = trackEmissions(userState.state$); + const emissions = trackEmissions(globalState.state$); await awaitAsync(); // storage updates are behind a promise const newState = { ...initialState, date: new Date(initialState.date.getFullYear(), initialState.date.getMonth() + 1), }; - const actual = await userState.update((existingState) => newState); + const actual = await globalState.update((existingState) => newState); await awaitAsync(); @@ -212,194 +209,4 @@ describe("DefaultSingleUserState", () => { expect(emissions).toEqual(expect.arrayContaining([initialState, newState])); }); }); - - describe("update races", () => { - test("subscriptions during an update should receive the current and latest data", async () => { - const oldData = { date: new Date(2019, 1, 1) }; - await userState.update(() => { - return oldData; - }); - const initialData = { date: new Date(2020, 1, 1) }; - await userState.update(() => { - return initialData; - }); - - await awaitAsync(); - - const emissions = trackEmissions(userState.state$); - await awaitAsync(); - expect(emissions).toEqual([initialData]); - - let emissions2: TestState[]; - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest.fn().mockImplementation(async (key: string, obj: any) => { - emissions2 = trackEmissions(userState.state$); - await originalSave(key, obj); - }); - - const val = await userState.update(() => { - return newData; - }); - - await awaitAsync(10); - - expect(val).toEqual(newData); - expect(emissions).toEqual([initialData, newData]); - expect(emissions2).toEqual([initialData, newData]); - }); - - test("subscription during an aborted update should receive the last value", async () => { - // Seed with interesting data - const initialData = { date: new Date(2020, 1, 1) }; - await userState.update(() => { - return initialData; - }); - - await awaitAsync(); - - const emissions = trackEmissions(userState.state$); - await awaitAsync(); - expect(emissions).toEqual([initialData]); - - let emissions2: TestState[]; - const val = await userState.update( - (state) => { - return newData; - }, - { - shouldUpdate: () => { - emissions2 = trackEmissions(userState.state$); - return false; - }, - }, - ); - - await awaitAsync(); - - expect(val).toEqual(initialData); - expect(emissions).toEqual([initialData]); - - expect(emissions2).toEqual([initialData]); - }); - - test("updates should wait until previous update is complete", async () => { - trackEmissions(userState.state$); - await awaitAsync(); // storage updates are behind a promise - - const originalSave = diskStorageService.save.bind(diskStorageService); - diskStorageService.save = jest - .fn() - .mockImplementationOnce(async () => { - let resolved = false; - await Promise.race([ - userState.update(() => { - // deadlocks - resolved = true; - return newData; - }), - awaitAsync(100), // limit test to 100ms - ]); - expect(resolved).toBe(false); - }) - .mockImplementation(originalSave); - - await userState.update((state) => { - return newData; - }); - }); - - test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => { - expect(userState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT - const val = await userState.update((state) => { - return newData; - }); - - expect(val).toEqual(newData); - const call = diskStorageService.mock.save.mock.calls[0]; - expect(call[0]).toEqual(`user_${userId}_fake_fake`); - expect(call[1]).toEqual(newData); - }); - }); - - describe("cleanup", () => { - async function assertClean() { - const emissions = trackEmissions(userState["stateSubject"]); - const initial = structuredClone(emissions); - - diskStorageService.save(userKey, newData); - await awaitAsync(); // storage updates are behind a promise - - expect(emissions).toEqual(initial); // no longer listening to storage updates - } - - it("should cleanup after last subscriber", async () => { - const subscription = userState.state$.subscribe(); - await awaitAsync(); // storage updates are behind a promise - - subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - await assertClean(); - }); - - it("should not cleanup if there are still subscribers", async () => { - const subscription1 = userState.state$.subscribe(); - const sub2Emissions: TestState[] = []; - const subscription2 = userState.state$.subscribe((v) => sub2Emissions.push(v)); - await awaitAsync(); // storage updates are behind a promise - - subscription1.unsubscribe(); - - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - expect(userState["subscriberCount"].getValue()).toBe(1); - - // Still be listening to storage updates - diskStorageService.save(userKey, newData); - await awaitAsync(); // storage updates are behind a promise - expect(sub2Emissions).toEqual([null, newData]); - - subscription2.unsubscribe(); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - await assertClean(); - }); - - it("can re-initialize after cleanup", async () => { - const subscription = userState.state$.subscribe(); - await awaitAsync(); - - subscription.unsubscribe(); - // Wait for cleanup - await awaitAsync(cleanupDelayMs * 2); - - const emissions = trackEmissions(userState.state$); - await awaitAsync(); - - diskStorageService.save(userKey, newData); - await awaitAsync(); - - expect(emissions).toEqual([null, newData]); - }); - - it("should not cleanup if a subscriber joins during the cleanup delay", async () => { - const subscription = userState.state$.subscribe(); - await awaitAsync(); - - await diskStorageService.save(userKey, newData); - await awaitAsync(); - - subscription.unsubscribe(); - expect(userState["subscriberCount"].getValue()).toBe(0); - // Do not wait long enough for cleanup - await awaitAsync(cleanupDelayMs / 2); - - expect(userState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared - expect(userState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates - }); - }); }); diff --git a/libs/common/src/platform/state/implementations/default-single-user-state.ts b/libs/common/src/platform/state/implementations/default-single-user-state.ts index 0e9cacae511..46fa00ffb35 100644 --- a/libs/common/src/platform/state/implementations/default-single-user-state.ts +++ b/libs/common/src/platform/state/implementations/default-single-user-state.ts @@ -1,10 +1,12 @@ import { BehaviorSubject, Observable, - Subscription, + defer, filter, firstValueFrom, + shareReplay, switchMap, + tap, timeout, } from "rxjs"; @@ -21,24 +23,16 @@ import { Converter, SingleUserState } from "../user-state"; import { DefaultDerivedUserState } from "./default-derived-state"; import { getStoredValue } from "./util"; - const FAKE_DEFAULT = Symbol("fakeDefault"); export class DefaultSingleUserState implements SingleUserState { private storageKey: string; - private updatePromise: Promise | null = null; - private storageUpdateSubscription: Subscription; - private subscriberCount = new BehaviorSubject(0); - private stateObservable: Observable; protected stateSubject: BehaviorSubject = new BehaviorSubject< T | typeof FAKE_DEFAULT >(FAKE_DEFAULT); - get state$() { - this.stateObservable = this.stateObservable ?? this.initializeObservable(); - return this.stateObservable; - } + state$: Observable; constructor( readonly userId: UserId, @@ -47,35 +41,50 @@ export class DefaultSingleUserState implements SingleUserState { private chosenLocation: AbstractStorageService & ObservableStorageService, ) { this.storageKey = userKeyBuilder(this.userId, this.keyDefinition); - } - async update( - configureState: (state: T, dependency: TCombine) => T, - options: StateUpdateOptions = {}, - ): Promise { - options = populateOptionsWithDefault(options); - if (this.updatePromise != null) { - await this.updatePromise; - } + const storageUpdates$ = this.chosenLocation.updates$.pipe( + filter((update) => update.key === this.storageKey), + switchMap(async (update) => { + if (update.updateType === "remove") { + return null; + } + return await getStoredValue( + this.storageKey, + this.chosenLocation, + this.keyDefinition.deserializer, + ); + }), + shareReplay({ bufferSize: 1, refCount: false }), + ); - try { - this.updatePromise = this.internalUpdate(configureState, options); - const newState = await this.updatePromise; - return newState; - } finally { - this.updatePromise = null; - } - } + this.state$ = defer(() => { + const storageUpdateSubscription = storageUpdates$.subscribe((value) => { + this.stateSubject.next(value); + }); - createDerived(converter: Converter): DerivedUserState { - return new DefaultDerivedUserState(converter, this.encryptService, this); + this.getFromState().then((s) => { + this.stateSubject.next(s); + }); + + return this.stateSubject.pipe( + tap({ + complete: () => { + storageUpdateSubscription.unsubscribe(); + }, + }), + ); + }).pipe( + shareReplay({ refCount: false, bufferSize: 1 }), + filter((i) => i != FAKE_DEFAULT), + ); } - private async internalUpdate( + async update( configureState: (state: T, dependency: TCombine) => T, - options: StateUpdateOptions, + options: StateUpdateOptions = {}, ): Promise { - const currentState = await this.getStateForUpdate(); + options = populateOptionsWithDefault(options); + const currentState = await this.getGuaranteedState(); const combinedDependencies = options.combineLatestWith != null ? await firstValueFrom(options.combineLatestWith.pipe(timeout(options.msTimeout))) @@ -90,86 +99,20 @@ export class DefaultSingleUserState implements SingleUserState { return newState; } - private initializeObservable() { - this.storageUpdateSubscription = this.chosenLocation.updates$ - .pipe( - filter((update) => update.key === this.storageKey), - switchMap(async (update) => { - if (update.updateType === "remove") { - return null; - } - return await this.getFromState(); - }), - ) - .subscribe((v) => this.stateSubject.next(v)); - - this.subscriberCount.subscribe((count) => { - if (count === 0 && this.stateObservable != null) { - this.triggerCleanup(); - } - }); - - // Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to - // trigger populating it immediately. - this.getFromState().then((s) => { - this.stateSubject.next(s); - }); - - return new Observable((subscriber) => { - this.incrementSubscribers(); - - const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber); - subscriber.unsubscribe = () => { - this.decrementSubscribers(); - prevUnsubscribe(); - }; - - return this.stateSubject - .pipe( - // Filter out fake default, which is used to indicate that state is not ready to be emitted yet. - filter((i) => i != FAKE_DEFAULT), - ) - .subscribe(subscriber); - }); + createDerived(converter: Converter): DerivedUserState { + return new DefaultDerivedUserState(converter, this.encryptService, this); } - /** For use in update methods, does not wait for update to complete before yielding state. - * The expectation is that that await is already done - */ - private async getStateForUpdate() { + private async getGuaranteedState() { const currentValue = this.stateSubject.getValue(); return currentValue === FAKE_DEFAULT ? await this.getFromState() : currentValue; } async getFromState(): Promise { - if (this.updatePromise != null) { - return await this.updatePromise; - } return await getStoredValue( this.storageKey, this.chosenLocation, this.keyDefinition.deserializer, ); } - - private incrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value + 1); - } - - private decrementSubscribers() { - this.subscriberCount.next(this.subscriberCount.value - 1); - } - - private triggerCleanup() { - setTimeout(() => { - if (this.subscriberCount.value === 0) { - this.updatePromise = null; - this.storageUpdateSubscription.unsubscribe(); - this.stateObservable = null; - this.subscriberCount.complete(); - this.subscriberCount = new BehaviorSubject(0); - this.stateSubject.next(FAKE_DEFAULT); - } - }, this.keyDefinition.cleanupDelayMs); - } } diff --git a/libs/common/src/platform/state/key-definition.spec.ts b/libs/common/src/platform/state/key-definition.spec.ts index ee926bccd8e..cbb1e49a9a1 100644 --- a/libs/common/src/platform/state/key-definition.spec.ts +++ b/libs/common/src/platform/state/key-definition.spec.ts @@ -18,37 +18,6 @@ describe("KeyDefinition", () => { }); }); - describe("cleanupDelayMs", () => { - it("defaults to 1000ms", () => { - const keyDefinition = new KeyDefinition(fakeStateDefinition, "fake", { - deserializer: (value) => value, - }); - - expect(keyDefinition).toBeTruthy(); - expect(keyDefinition.cleanupDelayMs).toBe(1000); - }); - - it("can be overridden", () => { - const keyDefinition = new KeyDefinition(fakeStateDefinition, "fake", { - deserializer: (value) => value, - cleanupDelayMs: 500, - }); - - expect(keyDefinition).toBeTruthy(); - expect(keyDefinition.cleanupDelayMs).toBe(500); - }); - - it.each([0, -1])("throws on 0 or negative (%s)", (testValue: number) => { - expect( - () => - new KeyDefinition(fakeStateDefinition, "fake", { - deserializer: (value) => value, - cleanupDelayMs: testValue, - }), - ).toThrow(); - }); - }); - describe("record", () => { it("runs custom deserializer for each record value", () => { const recordDefinition = KeyDefinition.record(fakeStateDefinition, "fake", { diff --git a/libs/common/src/platform/state/key-definition.ts b/libs/common/src/platform/state/key-definition.ts index 9989bf37a24..db65740388e 100644 --- a/libs/common/src/platform/state/key-definition.ts +++ b/libs/common/src/platform/state/key-definition.ts @@ -19,11 +19,6 @@ type KeyDefinitionOptions = { * @returns The fully typed version of your state. */ readonly deserializer: (jsonValue: Jsonify) => T; - /** - * The number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed. - * Defaults to 1000ms. - */ - readonly cleanupDelayMs?: number; }; /** @@ -47,12 +42,8 @@ export class KeyDefinition { private readonly options: KeyDefinitionOptions, ) { if (options.deserializer == null) { - throw new Error(`'deserializer' is a required property on key ${this.errorKeyName}`); - } - - if (options.cleanupDelayMs <= 0) { throw new Error( - `'cleanupDelayMs' must be greater than 0. Value of ${options.cleanupDelayMs} passed to key ${this.errorKeyName} `, + `'deserializer' is a required property on key ${stateDefinition.name} > ${key}`, ); } } @@ -64,13 +55,6 @@ export class KeyDefinition { return this.options.deserializer; } - /** - * Gets the number of milliseconds to wait before cleaning up the state after the last subscriber has unsubscribed. - */ - get cleanupDelayMs() { - return this.options.cleanupDelayMs < 0 ? 0 : this.options.cleanupDelayMs ?? 1000; - } - /** * Creates a {@link KeyDefinition} for state that is an array. * @param stateDefinition The state definition to be added to the KeyDefinition @@ -153,10 +137,6 @@ export class KeyDefinition { ? `${scope}_${userId}_${this.stateDefinition.name}_${this.key}` : `${scope}_${this.stateDefinition.name}_${this.key}`; } - - private get errorKeyName() { - return `${this.stateDefinition.name} > ${this.key}`; - } } export type StorageKey = Opaque;