Skip to content

Commit

Permalink
feat: Make YjsPartition utilize YjsSharedDocument
Browse files Browse the repository at this point in the history
  • Loading branch information
KallynGowdy committed Oct 29, 2024
1 parent c6550f7 commit f927ed7
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 120 deletions.
3 changes: 2 additions & 1 deletion src/aux-common/documents/SharedDocumentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import { RemoteCausalRepoProtocol } from '../partitions/AuxPartitionConfig';
export interface SharedDocumentConfig {
/**
* The branch of the document to load.
* If omitted, then local persistence will not be supported.
*/
branch: string;
branch?: string;

/**
* The options for local persistence of the document.
Expand Down
2 changes: 1 addition & 1 deletion src/aux-common/documents/YjsSharedDocument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ export class YjsSharedDocument implements SharedDocument {
async init(): Promise<void> {}

connect(): void {
if (this._persistence?.saveToIndexedDb) {
if (this._persistence?.saveToIndexedDb && this._branch) {
console.log('[YjsPartition] Using IndexedDB persistence');
this._indexeddb = new YjsIndexedDBPersistence(
this._branch,
Expand Down
11 changes: 6 additions & 5 deletions src/aux-common/partitions/AuxPartitionConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ export interface PartitionRemoteEvents {
export interface YjsPartitionConfig extends PartitionConfigBase {
type: 'yjs';

/**
* The branch to load.
* If omitted, then local persistence will not be supported.
*/
branch?: string;

/**
* The options for local persistence for the partition.
*/
Expand All @@ -245,11 +251,6 @@ export interface YjsPartitionConfig extends PartitionConfigBase {
*/
saveToIndexedDb: boolean;

/**
* The database that updates should be saved under.
*/
database: string;

/**
* The encryption key that should be used.
*/
Expand Down
123 changes: 10 additions & 113 deletions src/aux-common/partitions/YjsPartition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ import {
StatusUpdate,
VersionVector,
} from '../common';
import { YjsIndexedDBPersistence } from '../yjs/YjsIndexedDBPersistence';
import { fromByteArray, toByteArray } from 'base64-js';
import { YjsSharedDocument } from '../documents/YjsSharedDocument';

const APPLY_UPDATES_TO_INST_TRANSACTION_ORIGIN = '__apply_updates_to_inst';

Expand All @@ -95,27 +95,16 @@ export function createYjsPartition(config: PartitionConfig): YjsPartition {
type MapValue = Text | object | number | boolean;
type TagsMap = Map<MapValue>;

export class YjsPartitionImpl implements YjsPartition {
protected _onVersionUpdated: BehaviorSubject<CurrentVersion>;

protected _onError = new Subject<any>();
protected _onEvents = new Subject<Action[]>();
protected _onStatusUpdated = new Subject<StatusUpdate>();
export class YjsPartitionImpl
extends YjsSharedDocument
implements YjsPartition
{
protected _hasRegisteredSubs = false;
private _sub = new Subscription();

private _localId: number;
private _remoteId: number;
private _doc: Doc = new Doc();
private _bots: Map<TagsMap>;
private _masks: Map<MapValue>;
private _internalPartition: MemoryPartitionImpl;
private _currentVersion: CurrentVersion;
private _indexeddb: YjsIndexedDBPersistence;
private _persistence: YjsPartitionConfig['localPersistence'];
private _isRemoteUpdate: boolean = false;

private _isLocalTransaction: boolean = true;
private _remoteEvents: PartitionRemoteEvents | boolean;
private _connectionId: string;

Expand All @@ -135,36 +124,21 @@ export class YjsPartitionImpl implements YjsPartition {
return this._internalPartition.onStateUpdated;
}

get onVersionUpdated(): Observable<CurrentVersion> {
return this._onVersionUpdated;
}

get onError(): Observable<any> {
return this._onError;
}

get onEvents(): Observable<Action[]> {
return this._onEvents;
}

get onStatusUpdated(): Observable<StatusUpdate> {
return this._onStatusUpdated;
}

unsubscribe() {
return this._sub.unsubscribe();
}

get closed(): boolean {
return this._sub.closed;
}
// get doc() {
// return this._doc;
// }

get state(): BotsState {
return this._internalPartition.state;
}

type = 'yjs' as const;
private: boolean;

get space(): string {
return this._internalPartition.space;
}
Expand All @@ -177,38 +151,16 @@ export class YjsPartitionImpl implements YjsPartition {
return 'immediate';
}

get doc() {
return this._doc;
}

private get _remoteSite() {
return this._remoteId.toString();
}

private get _currentSite() {
return this._localId.toString();
}

constructor(config: YjsPartitionConfig) {
super(config);
this.private = config.private || false;
this._persistence = config.localPersistence;
this._remoteEvents = config.remoteEvents;
this._connectionId = config.connectionId;
this._localId = this._doc.clientID;
this._remoteId = new Doc().clientID;
this._bots = this._doc.getMap('bots');
this._masks = this._doc.getMap('masks');
this._doc.on('afterTransaction', (transaction: Transaction) => {
this._processTransaction(transaction);
});
this._currentVersion = {
currentSite: this._localId.toString(),
remoteSite: this._remoteId.toString(),
vector: {},
};
this._onVersionUpdated = new BehaviorSubject<CurrentVersion>(
this._currentVersion
);
this._internalPartition = new MemoryPartitionImpl({
type: 'memory',
initialState: {},
Expand Down Expand Up @@ -388,49 +340,6 @@ export class YjsPartitionImpl implements YjsPartition {
return [];
}

async init(): Promise<void> {}

connect(): void {
if (this._persistence?.saveToIndexedDb) {
console.log('[YjsPartition] Using IndexedDB persistence');
this._indexeddb = new YjsIndexedDBPersistence(
this._persistence.database,
this._doc,
{ broadcastChanges: true }
);
}

this._onStatusUpdated.next({
type: 'connection',
connected: true,
});

this._onStatusUpdated.next({
type: 'authentication',
authenticated: true,
});

this._onStatusUpdated.next({
type: 'authorization',
authorized: true,
});

if (this._indexeddb) {
// wait to send the initial sync event until the persistence is ready
this._indexeddb.waitForInit().then(() => {
this._onStatusUpdated.next({
type: 'sync',
synced: true,
});
});
} else {
this._onStatusUpdated.next({
type: 'sync',
synced: true,
});
}
}

private _applyEvents(
events: (AddBotAction | RemoveBotAction | UpdateBotAction)[]
) {
Expand Down Expand Up @@ -523,18 +432,6 @@ export class YjsPartitionImpl implements YjsPartition {
});
}

private _applyUpdates(updates: string[], transactionOrigin?: string) {
try {
this._isRemoteUpdate = true;
for (let updateBase64 of updates) {
const update = toByteArray(updateBase64);
applyUpdate(this._doc, update, transactionOrigin);
}
} finally {
this._isRemoteUpdate = false;
}
}

private async _processTransaction(transaction: Transaction) {
let memoryEvents: (AddBotAction | RemoveBotAction | UpdateBotAction)[] =
[];
Expand Down

0 comments on commit f927ed7

Please sign in to comment.