Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add txn_id support to sliding sync #2567

Merged
merged 4 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions spec/integ/sliding-sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,225 @@ describe("SlidingSync", () => {
});
});

describe("transaction IDs", () => {
beforeAll(setupClient);
afterAll(teardownClient);
const roomId = "!foo:bar";

let slidingSync: SlidingSync;

// really this applies to them all but it's easier to just test one
it("should resolve modifyRoomSubscriptions after SlidingSync.start() is called", async () => {
const roomSubInfo = {
timeline_limit: 1,
required_state: [
["m.room.name", ""],
],
};
// add the subscription
slidingSync = new SlidingSync(proxyBaseUrl, [], roomSubInfo, client, 1);
// modification before SlidingSync.start()
const subscribePromise = slidingSync.modifyRoomSubscriptions(new Set([roomId]));
let txnId;
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("txn got ", body);
kegsay marked this conversation as resolved.
Show resolved Hide resolved
expect(body.room_subscriptions).toBeTruthy();
expect(body.room_subscriptions[roomId]).toEqual(roomSubInfo);
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
}).respond(200, function() {
return {
pos: "aaa",
txn_id: txnId,
lists: [],
extensions: {},
rooms: {
[roomId]: {
name: "foo bar",
required_state: [],
timeline: [],
},
},
};
});
slidingSync.start();
await httpBackend.flushAllExpected();
await subscribePromise;
});
it("should resolve setList during a connection", async () => {
const newList = {
ranges: [[0, 20]],
};
const promise = slidingSync.setList(0, newList);
let txnId;
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("txn got ", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.lists[0]).toEqual(newList);
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
}).respond(200, function() {
return {
pos: "bbb",
txn_id: txnId,
lists: [{ count: 5 }],
extensions: {},
};
});
await httpBackend.flushAllExpected();
await promise;
expect(txnId).toBeDefined();
});
it("should resolve setListRanges during a connection", async () => {
const promise = slidingSync.setListRanges(0, [[20, 40]]);
let txnId;
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("txn got ", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.lists[0]).toEqual({
ranges: [[20, 40]],
});
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
}).respond(200, function() {
return {
pos: "ccc",
txn_id: txnId,
lists: [{ count: 5 }],
extensions: {},
};
});
await httpBackend.flushAllExpected();
await promise;
expect(txnId).toBeDefined();
});
it("should resolve modifyRoomSubscriptionInfo during a connection", async () => {
const promise = slidingSync.modifyRoomSubscriptionInfo({
timeline_limit: 99,
});
let txnId;
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("txn got ", body);
expect(body.room_subscriptions).toBeTruthy();
expect(body.room_subscriptions[roomId]).toEqual({
timeline_limit: 99,
});
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
}).respond(200, function() {
return {
pos: "ddd",
txn_id: txnId,
extensions: {},
};
});
await httpBackend.flushAllExpected();
await promise;
expect(txnId).toBeDefined();
});
it("should reject earlier pending promises if a later transaction is acknowledged", async () => {
// i.e if we have [A,B,C] and see txn_id=C then A,B should be rejected.
const gotTxnIds = [];
const pushTxn = function(req) {
gotTxnIds.push(req.data.txn_id);
};
const failPromise = slidingSync.setListRanges(0, [[20, 40]]);
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "e" }); // missing txn_id
await httpBackend.flushAllExpected();
const failPromise2 = slidingSync.setListRanges(0, [[60, 70]]);
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "f" }); // missing txn_id
await httpBackend.flushAllExpected();

// attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection
// which is a fail.
expect(failPromise).rejects.toEqual(gotTxnIds[0]);
expect(failPromise2).rejects.toEqual(gotTxnIds[1]);

const okPromise = slidingSync.setListRanges(0, [[0, 20]]);
let txnId;
httpBackend.when("POST", syncUrl).check((req) => {
txnId = req.data.txn_id;
}).respond(200, () => {
// include the txn_id, earlier requests should now be reject()ed.
return {
pos: "g",
txn_id: txnId,
};
});
await httpBackend.flushAllExpected();
await okPromise;

expect(txnId).toBeDefined();
});
it("should not reject later pending promises if an earlier transaction is acknowledged", async () => {
// i.e if we have [A,B,C] and see txn_id=B then C should not be rejected but A should.
const gotTxnIds = [];
const pushTxn = function(req) {
gotTxnIds.push(req.data.txn_id);
};
const A = slidingSync.setListRanges(0, [[20, 40]]);
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "A" });
await httpBackend.flushAllExpected();
const B = slidingSync.setListRanges(0, [[60, 70]]);
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, { pos: "B" }); // missing txn_id
await httpBackend.flushAllExpected();

// attach rejection handlers now else if we do it later Jest treats that as an unhandled rejection
// which is a fail.
expect(A).rejects.toEqual(gotTxnIds[0]);

const C = slidingSync.setListRanges(0, [[0, 20]]);
let pendingC = true;
C.finally(() => {
pendingC = false;
});
httpBackend.when("POST", syncUrl).check(pushTxn).respond(200, () => {
// include the txn_id for B, so C's promise is outstanding
return {
pos: "C",
txn_id: gotTxnIds[1],
};
});
await httpBackend.flushAllExpected();
// A is rejected, see above
expect(B).resolves.toEqual(gotTxnIds[1]); // B is resolved
expect(pendingC).toBe(true); // C is pending still
});
it("should do nothing for unknown txn_ids", async () => {
const promise = slidingSync.setListRanges(0, [[20, 40]]);
let pending = true;
promise.finally(() => {
pending = false;
});
let txnId;
httpBackend.when("POST", syncUrl).check(function(req) {
const body = req.data;
logger.log("txn got ", body);
expect(body.room_subscriptions).toBeFalsy();
expect(body.lists[0]).toEqual({
ranges: [[20, 40]],
});
expect(body.txn_id).toBeTruthy();
txnId = body.txn_id;
}).respond(200, function() {
return {
pos: "ccc",
txn_id: "bogus transaction id",
lists: [{ count: 5 }],
extensions: {},
};
});
await httpBackend.flushAllExpected();
expect(txnId).toBeDefined();
expect(pending).toBe(true);
slidingSync.stop();
});
});

describe("extensions", () => {
beforeAll(setupClient);
afterAll(teardownClient);
Expand Down
74 changes: 64 additions & 10 deletions src/sliding-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface MSC3575SlidingSyncRequest {
unsubscribe_rooms?: string[];
room_subscriptions?: Record<string, MSC3575RoomSubscription>;
extensions?: object;
txn_id?: string;

// query params
pos?: string;
Expand Down Expand Up @@ -126,6 +127,7 @@ type Operation = DeleteOperation | InsertOperation | InvalidateOperation | SyncO
*/
export interface MSC3575SlidingSyncResponse {
pos: string;
txn_id?: string;
lists: ListResponse[];
rooms: Record<string, MSC3575RoomData>;
extensions: object;
Expand Down Expand Up @@ -334,6 +336,11 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
private terminated = false;
// flag set when resend() is called because we cannot rely on detecting AbortError in JS SDK :(
private needsResend = false;
// the txn_id to send with the next request.
private txnId?: string = null;
// a list (in chronological order of when they were sent) of objects containing the txn ID and
// a defer to resolve/reject depending on whether they were successfully sent or not.
private txnIdDefers: {txnId: string, resolve: Function, reject: Function}[] = [];
kegsay marked this conversation as resolved.
Show resolved Hide resolved
// map of extension name to req/resp handler
private extensions: Record<string, Extension> = {};

Expand Down Expand Up @@ -404,9 +411,9 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* @param index The list index to modify
* @param ranges The new ranges to apply.
*/
public setListRanges(index: number, ranges: number[][]): void {
public setListRanges(index: number, ranges: number[][]): Promise<string> {
this.lists[index].updateListRange(ranges);
this.resend();
return this.resend();
}

/**
Expand All @@ -415,14 +422,14 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* @param index The index to modify
* @param list The new list parameters.
*/
public setList(index: number, list: MSC3575List): void {
public setList(index: number, list: MSC3575List): Promise<string> {
if (this.lists[index]) {
this.lists[index].replaceList(list);
} else {
this.lists[index] = new SlidingList(list);
}
this.listModifiedCount += 1;
this.resend();
return this.resend();
}

/**
Expand All @@ -439,20 +446,20 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
* prepare the room subscriptions for when start() is called.
* @param s The new desired room subscriptions.
*/
public modifyRoomSubscriptions(s: Set<string>) {
public modifyRoomSubscriptions(s: Set<string>): Promise<string> {
this.desiredRoomSubscriptions = s;
this.resend();
return this.resend();
}

/**
* Modify which events to retrieve for room subscriptions. Invalidates all room subscriptions
* such that they will be sent up afresh.
* @param rs The new room subscription fields to fetch.
*/
public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): void {
public modifyRoomSubscriptionInfo(rs: MSC3575RoomSubscription): Promise<string> {
kegsay marked this conversation as resolved.
Show resolved Hide resolved
this.roomSubscriptionInfo = rs;
this.confirmedRoomSubscriptions = new Set<string>();
this.resend();
return this.resend();
}

/**
Expand Down Expand Up @@ -615,11 +622,52 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
}

/**
* Resend a Sliding Sync request. Used when something has changed in the request.
* Resend a Sliding Sync request. Used when something has changed in the request. Resolves with
* the transaction ID of this request on success. Rejects with the transaction ID of this request
* on failure.
*/
public resend(): void {
public resend(): Promise<string> {
this.needsResend = true;
kegsay marked this conversation as resolved.
Show resolved Hide resolved
this.txnId = ""+Math.random();
kegsay marked this conversation as resolved.
Show resolved Hide resolved
const p: Promise<string> = new Promise((resolve, reject) => {
kegsay marked this conversation as resolved.
Show resolved Hide resolved
this.txnIdDefers.push({
txnId: this.txnId,
resolve: resolve,
reject: reject,
});
});
this.pendingReq?.abort();
return p;
}

private resolveTransactionDefers(txnId?: string) {
if (!txnId) {
kegsay marked this conversation as resolved.
Show resolved Hide resolved
return;
}
// find the matching index
let txnIndex = -1;
for (let i = 0; i < this.txnIdDefers.length; i++) {
if (this.txnIdDefers[i].txnId === txnId) {
txnIndex = i;
break;
}
}
if (txnIndex === -1) {
// this shouldn't happen; we shouldn't be seeing txn_ids for things we don't know about,
// whine about it.
logger.warn(`resolveTransactionDefers: seen ${txnId} but it isn't a pending txn, ignoring.`);
return;
}
// This list is sorted in time, so if the input txnId ACKs in the middle of this array,
// then everything before it that hasn't been ACKed yet never will and we should reject them.
for (let i = 0; i < txnIndex; i++) {
if (i < txnIndex) {
kegsay marked this conversation as resolved.
Show resolved Hide resolved
this.txnIdDefers[i].reject(this.txnIdDefers[i].txnId);
}
}
this.txnIdDefers[txnIndex].resolve(txnId);
// clear out settled promises, incuding the one we resolved.
this.txnIdDefers = this.txnIdDefers.slice(txnIndex+1);
}

/**
Expand Down Expand Up @@ -666,6 +714,10 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
reqBody.room_subscriptions[roomId] = this.roomSubscriptionInfo;
}
}
if (this.txnId) {
reqBody.txn_id = this.txnId;
this.txnId = null;
}
this.pendingReq = this.client.slidingSync(reqBody, this.proxyBaseUrl);
resp = await this.pendingReq;
logger.debug(resp);
Expand Down Expand Up @@ -747,6 +799,8 @@ export class SlidingSync extends TypedEventEmitter<SlidingSyncEvent, SlidingSync
i, this.lists[i].joinedCount, Object.assign({}, this.lists[i].roomIndexToRoomId),
);
});

this.resolveTransactionDefers(resp.txn_id);
}
}
}
Expand Down