Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global State Rewrite #7515

Merged
merged 3 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,32 @@ describe("DefaultActiveUserState", () => {
// #3 switched state to initial state for user2
expect(emissions).toEqual([state1, updatedState, state2]);

// Should be called three time to get state, once for each user and once for the update
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(3);
// Should be called 4 time to get state, update state for user, emitting update, and switching users
expect(diskStorageService.mock.get).toHaveBeenCalledTimes(4);
// Initial subscribe to state$
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
1,
"user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options
);
// The updating of state for user1
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
2,
"user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options
);
// The emission from being actively subscribed to user1
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
3,
"user_00000000-0000-1000-a000-000000000001_fake_fake",
any(), // options
);
// Switch to user2
expect(diskStorageService.mock.get).toHaveBeenNthCalledWith(
4,
"user_00000000-0000-1000-a000-000000000002_fake_fake",
any(), // options
);

// Should only have saved data for the first user
expect(diskStorageService.mock.save).toHaveBeenCalledTimes(1);
expect(diskStorageService.mock.save).toHaveBeenNthCalledWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
// We only care about the UserId but we do want to know about no user as well.
map((a) => a?.id),
// To avoid going to storage when we don't need to, only get updates when there is a true change.
distinctUntilChanged(),
distinctUntilChanged((a, b) => (a == null || b == null ? a == b : a === b)), // Treat null and undefined as equal
);

const userChangeAndInitial$ = this.activeUserId$.pipe(
Expand Down Expand Up @@ -157,15 +157,22 @@
* The expectation is that that await is already done
*/
protected async getStateForUpdate() {
const [userId, data] = await firstValueFrom(
this.combinedState$.pipe(
const userId = await firstValueFrom(
this.activeUserId$.pipe(
timeout({
first: 1000,
with: () => throwError(() => new Error("No active user at this time.")),
with: () => throwError(() => new Error("Timeout while retrieving active user.")),

Check warning on line 164 in libs/common/src/platform/state/implementations/default-active-user-state.ts

View check run for this annotation

Codecov / codecov/patch

libs/common/src/platform/state/implementations/default-active-user-state.ts#L164

Added line #L164 was not covered by tests
}),
),
);
return [userKeyBuilder(userId, this.keyDefinition), data] as const;
if (userId == null) {
throw new Error("No active user at this time.");
}
const fullKey = userKeyBuilder(userId, this.keyDefinition);
return [
fullKey,
await getStoredValue(fullKey, this.chosenStorageLocation, this.keyDefinition.deserializer),
] as const;
}

protected saveToStorage(key: string, data: T): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* @jest-environment ../shared/test.environment.ts
*/

import { anySymbol } from "jest-mock-extended";
import { firstValueFrom, of } from "rxjs";
import { Jsonify } from "type-fest";

Expand Down Expand Up @@ -309,41 +308,22 @@ describe("DefaultGlobalState", () => {
return newData;
});
});

test("updates with FAKE_DEFAULT initial value should resolve correctly", async () => {
expect(globalState["stateSubject"].value).toEqual(anySymbol()); // FAKE_DEFAULT
const val = await globalState.update((state) => {
return newData;
});

expect(val).toEqual(newData);
const call = diskStorageService.mock.save.mock.calls[0];
expect(call[0]).toEqual("global_fake_fake");
expect(call[1]).toEqual(newData);
});
});

describe("cleanup", () => {
async function assertClean() {
const emissions = trackEmissions(globalState["stateSubject"]);
const initial = structuredClone(emissions);

diskStorageService.save(globalKey, newData);
await awaitAsync(); // storage updates are behind a promise

expect(emissions).toEqual(initial); // no longer listening to storage updates
function assertClean() {
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(0);
}

it("should cleanup after last subscriber", async () => {
const subscription = globalState.state$.subscribe();
await awaitAsync(); // storage updates are behind a promise

subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);

await assertClean();
assertClean();
});

it("should not cleanup if there are still subscribers", async () => {
Expand All @@ -357,7 +337,7 @@ describe("DefaultGlobalState", () => {
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);

expect(globalState["subscriberCount"].getValue()).toBe(1);
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);

// Still be listening to storage updates
diskStorageService.save(globalKey, newData);
Expand All @@ -368,7 +348,7 @@ describe("DefaultGlobalState", () => {
// Wait for cleanup
await awaitAsync(cleanupDelayMs * 2);

await assertClean();
assertClean();
});

it("can re-initialize after cleanup", async () => {
Expand Down Expand Up @@ -396,12 +376,11 @@ describe("DefaultGlobalState", () => {
await awaitAsync();

subscription.unsubscribe();
expect(globalState["subscriberCount"].getValue()).toBe(0);
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);
// Do not wait long enough for cleanup
await awaitAsync(cleanupDelayMs / 2);

expect(globalState["stateSubject"].value).toEqual(newData); // digging in to check that it hasn't been cleared
expect(globalState["storageUpdateSubscription"]).not.toBeNull(); // still listening to storage updates
expect(diskStorageService["updatesSubject"]["observers"]).toHaveLength(1);
});

it("state$ observables are durable to cleanup", async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import {
BehaviorSubject,
Observable,
Subscription,
ReplaySubject,
defer,
filter,
firstValueFrom,
merge,
share,
switchMap,
timeout,
timer,
} from "rxjs";

import {
Expand All @@ -17,30 +20,43 @@
import { StateUpdateOptions, populateOptionsWithDefault } from "../state-update-options";

import { getStoredValue } from "./util";
const FAKE_DEFAULT = Symbol("fakeDefault");

export class DefaultGlobalState<T> implements GlobalState<T> {
private storageKey: string;
private updatePromise: Promise<T> | null = null;
private storageUpdateSubscription: Subscription;
private subscriberCount = new BehaviorSubject<number>(0);
private stateObservable: Observable<T>;
private reinitialize = false;

protected stateSubject: BehaviorSubject<T | typeof FAKE_DEFAULT> = new BehaviorSubject<
T | typeof FAKE_DEFAULT
>(FAKE_DEFAULT);

get state$() {
this.stateObservable = this.stateObservable ?? this.initializeObservable();
return this.stateObservable;
}
readonly state$: Observable<T>;

constructor(
private keyDefinition: KeyDefinition<T>,
private chosenLocation: AbstractStorageService & ObservableStorageService,
) {
this.storageKey = globalKeyBuilder(this.keyDefinition);
const initialStorageGet$ = defer(() => {
return getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer);
});

const latestStorage$ = this.chosenLocation.updates$.pipe(
filter((s) => s.key === this.storageKey),
switchMap(async (storageUpdate) => {
if (storageUpdate.updateType === "remove") {
return null;

Check warning on line 43 in libs/common/src/platform/state/implementations/default-global-state.ts

View check run for this annotation

Codecov / codecov/patch

libs/common/src/platform/state/implementations/default-global-state.ts#L43

Added line #L43 was not covered by tests
}

return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}),
);

this.state$ = merge(initialStorageGet$, latestStorage$).pipe(
share({
connector: () => new ReplaySubject<T>(1),
resetOnRefCountZero: () => timer(this.keyDefinition.cleanupDelayMs),
}),
);
}

async update<TCombine>(
Expand Down Expand Up @@ -80,63 +96,15 @@
return newState;
}

private initializeObservable() {
this.storageUpdateSubscription = this.chosenLocation.updates$
.pipe(
filter((update) => update.key === this.storageKey),
switchMap(async (update) => {
if (update.updateType === "remove") {
return null;
}
return await this.getFromState();
}),
)
.subscribe((v) => this.stateSubject.next(v));

this.subscriberCount.subscribe((count) => {
if (count === 0 && this.stateObservable != null) {
this.triggerCleanup();
}
});

// Intentionally un-awaited promise, we don't want to delay return of observable, but we do want to
// trigger populating it immediately.
this.getFromState().then((s) => {
this.stateSubject.next(s);
});

return new Observable<T>((subscriber) => {
this.incrementSubscribers();

// reinitialize listeners after cleanup
if (this.reinitialize) {
this.reinitialize = false;
this.initializeObservable();
}

const prevUnsubscribe = subscriber.unsubscribe.bind(subscriber);
subscriber.unsubscribe = () => {
this.decrementSubscribers();
prevUnsubscribe();
};

return this.stateSubject
.pipe(
// Filter out fake default, which is used to indicate that state is not ready to be emitted yet.
filter<T>((i) => i != FAKE_DEFAULT),
)
.subscribe(subscriber);
});
}

/** For use in update methods, does not wait for update to complete before yielding state.
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
const currentValue = this.stateSubject.getValue();
return currentValue === FAKE_DEFAULT
? await getStoredValue(this.storageKey, this.chosenLocation, this.keyDefinition.deserializer)
: currentValue;
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}

async getFromState(): Promise<T> {
Expand All @@ -149,25 +117,4 @@
this.keyDefinition.deserializer,
);
}

private incrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value + 1);
}

private decrementSubscribers() {
this.subscriberCount.next(this.subscriberCount.value - 1);
}

private triggerCleanup() {
setTimeout(() => {
if (this.subscriberCount.value === 0) {
this.updatePromise = null;
this.storageUpdateSubscription.unsubscribe();
this.subscriberCount.complete();
this.subscriberCount = new BehaviorSubject<number>(0);
this.stateSubject.next(FAKE_DEFAULT);
this.reinitialize = true;
}
}, this.keyDefinition.cleanupDelayMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
private storageKey: string;
private updatePromise: Promise<T> | null = null;

state$: Observable<T>;
combinedState$: Observable<CombinedState<T>>;
readonly state$: Observable<T>;
readonly combinedState$: Observable<CombinedState<T>>;

constructor(
readonly userId: UserId,
Expand Down Expand Up @@ -107,6 +107,10 @@ export class DefaultSingleUserState<T> implements SingleUserState<T> {
* The expectation is that that await is already done
*/
private async getStateForUpdate() {
return await firstValueFrom(this.state$);
return await getStoredValue(
this.storageKey,
this.chosenLocation,
this.keyDefinition.deserializer,
);
}
}
Loading