Skip to content

Commit

Permalink
Global State Rewrite (#7515)
Browse files Browse the repository at this point in the history
* Global State Rewrite

* Apply suggestions from code review

Co-authored-by: Matt Gibson <[email protected]>

* Prettier

---------

Co-authored-by: Matt Gibson <[email protected]>
  • Loading branch information
justindbaur and MGibson1 authored Jan 11, 2024
1 parent 6baad76 commit 0874df8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,32 @@ describe("DefaultActiveUserState", () => {
// #3 switched state to initial state for user2
expect(emissions).toEqual([state1, updatedState, state2]);

// Should be called three time to get state, once for each user and once for the update
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(3);
// Should be called 4 time to get state, update state for user, emitting update, and switching users
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(4);
// Initial subscribe to state$
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
1,
"user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options
);
// The updating of state for user1
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
2,
"user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options
);
// The emission from being actively subscribed to user1
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
3,
"user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options
);
// Switch to user2
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
4,
"user_00000000-0000-1000-a000-000000000002_fake_fake",
any(), // options
);

// Should only have saved data for the first user
expect(diskStorageService.mock.save).toHaveBeenCalledTimes(1);
expect(diskStorageService.mock.save).toHaveBeenNthCalledWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
// We only care about the UserId but we do want to know about no user as well.
map((a) => a?.id),
// To avoid going to storage when we don't need to, only get updates when there is a true change.
distinctUntilChanged(),
distinctUntilChanged((a, b) => (a == null || b == null ? a == b : a === b)), // Treat null and undefined as equal
);

const userChangeAndInitial$ = this.activeUserId$.pipe(
Expand Down Expand Up @@ -157,15 +157,22 @@ export class DefaultActiveUserState<T> implements ActiveUserState<T> {
* The expectation is that that await is already done
*/
protected async getStateForUpdate() {
const [userId, data] = await firstValueFrom(
this.combinedState$.pipe(
const userId = await firstValueFrom(
this.activeUserId$.pipe(
timeout({
first: 1000,
with: () => throwError(() => new Error("No active user at this time.")),
with: () => throwError(() => new Error("Timeout while retrieving active user.")),
}),
),
);
return [userKeyBuilder(userId, this.keyDefinition), data] as const;
if (userId == null) {
throw new Error("No active user at this time.");
}
const fullKey = userKeyBuilder(userId, this.keyDefinition);
return [
fullKey,
await getStoredValue(fullKey, this.chosenStorageLocation, this.keyDefinition.deserializer),
] as const;
}

protected saveToStorage(key: string, data: T): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* @jest-environment ../shared/test.environment.ts
*/

import { anySymbol } from "jest-mock-extended";
import { firstValueFrom, of } from "rxjs";
import { Jsonify } from "type-fest";

Expand Down Expand Up @@ -309,41 +308,22 @@ describe("DefaultGlobalState", () => {
return newData;
});
});

test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await globalState.update((state) => {
return newData;
});

expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual("global_fake_fake");
expect(call[1]).toEqual(newData);
});
});

describe("cleanup", () => {
async function assertClean() {
const emissions = trackEmissions(globalState["stateSubject"]);
const initial = structuredClone(emissions);

diskStorageService.save(globalKey, newData);
await awaitAsync(); // storage updates are behind a promise

expect(emissions).toEqual(initial); // no longer listening to storage updates
function assertClean() {
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0);
}

it("should cleanup after last subscriber", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise

subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);

await assertClean();
assertClean();
});

it("should not cleanup if there are still subscribers", async () => {
Expand All @@ -357,7 +337,7 @@ describe("DefaultGlobalState", () => {
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);

expect(globalState["subscriberCount"].getValue()).toBe(1);
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);

// Still be listening to storage updates
diskStorageService.save(globalKey, newData);
Expand All @@ -368,7 +348,7 @@ describe("DefaultGlobalState", () => {
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);

await assertClean();
assertClean();
});

it("can re-initialize after cleanup", async () => {
Expand Down Expand Up @@ -396,12 +376,11 @@ describe("DefaultGlobalState", () => {
await awaitAsync();

subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);

expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);
});

it("state$ observables are durable to cleanup", async () => {
Expand Down
125 changes: 36 additions & 89 deletions libs/common/src/platform/state/implementations/default-global-state.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import {
BehaviorSubject,
Observable,
Subscription,
ReplaySubject,
defer,
filter,
firstValueFrom,
merge,
share,
switchMap,
timeout,
timer,
} from "rxjs";

import {
Expand All @@ -17,30 +20,43 @@ import { KeyDefinition, globalKeyBuilder } from "../key-definition";
import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options";

import { getStoredValue } from "./util";
const FAKE_DEFAULT = Symbol("fakeDefault");

export class DefaultGlobalState<T> implements GlobalState<T> {
private storageKey: string;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
private reinitialize = false;

protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT);

get state$() {
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
readonly state$: Observable<T>;

constructor(
private keyDefinition: KeyDefinition<T>,
private chosenLocation: AbstractStorageService & ObservableStorageService,
) {
this.storageKey = globalKeyBuilder(this.keyDefinition);
const initialStorageGet$ = defer(() => {
return getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer);
});

const latestStorage$ = this.chosenLocation.updates$.pipe(
filter((s) => s.key === this.storageKey),
switchMap(async (storageUpdate) => {
if (storageUpdate.updateType === "remove") {
return null;
}

return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}),
);

this.state$ = merge(initialStorageGet$, latestStorage$).pipe(
share({
connector: () => new ReplaySubject<T>(1),
resetOnRefCountZero: () => timer(this.keyDefinition.cleanupDelayMs),
}),
);
}

async update<TCombine>(
Expand Down Expand Up @@ -80,63 +96,15 @@ export class DefaultGlobalState<T> implements GlobalState<T> {
return newState;
}

private initializeObservable() {
this.storageUpdateSubscription = this.chosenLocation.updates$
.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await this.getFromState();
}),
)
.subscribe((v) => this.stateSubject.next(v));

this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});

// Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to
// trigger populating it immediately.
this.getFromState().then((s) => {
this.stateSubject.next(s);
});

return new Observable<T>((subscriber) => {
this.incrementSubscribers();

// reinitialize listeners after cleanup
if (this.reinitialize) {
this.reinitialize = false;
this.initializeObservable();
}

const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};

return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter<T>((i) => i != FAKE_DEFAULT),
)
.subscribe(subscriber);
});
}

/** For use in update methods, does not wait for update to complete before yielding state.
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT
? await getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer)
: currentValue;
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}

async getFromState(): Promise<T> {
Expand All @@ -149,25 +117,4 @@ export class DefaultGlobalState<T> implements GlobalState<T> {
this.keyDefinition.deserializer,
);
}

private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}

private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}

private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription.unsubscribe();
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
this.reinitialize = true;
}
}, this.keyDefinition.cleanupDelayMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
private storageKey: string;
private updatePromise: Promise<T> | null = null;

state$: Observable<T>;
combinedState$: Observable<CombinedState<T>>;
readonly state$: Observable<T>;
readonly combinedState$: Observable<CombinedState<T>>;

constructor(
readonly userId: UserId,
Expand Down Expand Up @@ -107,6 +107,10 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
return await firstValueFrom(this.state$);
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}
}

0 comments on commit 0874df8

Please sign in to comment.