Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store account data in the same way as room data #377

Merged
merged 2 commits into from
Feb 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions spec/unit/sync-accumulator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe("SyncAccumulator", function() {
},
},
};
sa.accumulateRooms(res);
sa.accumulate(res);
const output = sa.getJSON();
expect(output.nextBatch).toEqual(res.next_batch);
expect(output.roomsData).toEqual(res.rooms);
Expand All @@ -69,7 +69,7 @@ describe("SyncAccumulator", function() {
it("should prune the timeline to the oldest prev_batch within the limit", () => {
// maxTimelineEntries is 10 so we should get back all
// 10 timeline messages with a prev_batch of "pinned_to_1"
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [member("alice", "join")] },
timeline: {
events: [
Expand All @@ -84,7 +84,7 @@ describe("SyncAccumulator", function() {
prev_batch: "pinned_to_1",
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [] },
timeline: {
events: [
Expand All @@ -93,7 +93,7 @@ describe("SyncAccumulator", function() {
prev_batch: "pinned_to_8",
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [] },
timeline: {
events: [
Expand All @@ -116,7 +116,7 @@ describe("SyncAccumulator", function() {
// AND give us <= 10 messages without losing messages in-between.
// It should try to find the oldest prev_batch which still fits into 10
// messages, which is "pinned to 8".
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
state: { events: [] },
timeline: {
events: [
Expand Down Expand Up @@ -153,7 +153,7 @@ describe("SyncAccumulator", function() {
}],
},
});
sa.accumulateRooms(res);
sa.accumulate(res);
expect(
sa.getJSON().roomsData.join["!foo:bar"].ephemeral.events.length,
).toEqual(0);
Expand All @@ -172,12 +172,12 @@ describe("SyncAccumulator", function() {
food: "apple",
},
};
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
account_data: {
events: [acc1],
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
account_data: {
events: [acc2],
},
Expand All @@ -190,6 +190,37 @@ describe("SyncAccumulator", function() {
).toEqual(acc2);
});

it("should clobber global account data based on event type", () => {
const acc1 = {
type: "favourite.food",
content: {
food: "banana",
},
};
const acc2 = {
type: "favourite.food",
content: {
food: "apple",
},
};
sa.accumulate({
account_data: {
events: [acc1],
},
});
sa.accumulate({
account_data: {
events: [acc2],
},
});
expect(
sa.getJSON().accountData.length,
).toEqual(1);
expect(
sa.getJSON().accountData[0],
).toEqual(acc2);
});

it("should accumulate read receipts", () => {
const receipt1 = {
type: "m.receipt",
Expand Down Expand Up @@ -218,12 +249,12 @@ describe("SyncAccumulator", function() {
},
},
};
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
ephemeral: {
events: [receipt1],
},
}));
sa.accumulateRooms(syncSkeleton({
sa.accumulate(syncSkeleton({
ephemeral: {
events: [receipt2],
},
Expand Down
27 changes: 11 additions & 16 deletions src/store/indexeddb.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ IndexedDBStoreBackend.prototype = {
/**
* Persist a list of account data events. Events with the same 'type' will
* be replaced.
* @param {MatrixEvent[]} accountData An array of user-scoped account data events
* @param {Object[]} accountData An array of raw user-scoped account data events
* @return {Promise} Resolves if the events were persisted.
*/
persistAccountData: function(accountData) {
return q.try(() => {
const txn = this.db.transaction(["accountData"], "readwrite");
const store = txn.objectStore("accountData");
for (let i = 0; i < accountData.length; i++) {
store.put(accountData[i].event); // put == UPSERT
store.put(accountData[i]); // put == UPSERT
}
return promiseifyTxn(txn);
});
Expand Down Expand Up @@ -172,14 +172,14 @@ IndexedDBStoreBackend.prototype = {

/**
* Load all the account data events from the database. This is not cached.
* @return {Promise<MatrixEvent[]>} A list of events.
* @return {Promise<Object[]>} A list of raw global account events.
*/
loadAccountData: function() {
return q.try(() => {
const txn = this.db.transaction(["accountData"], "readonly");
const store = txn.objectStore("accountData");
return selectQuery(store, undefined, (cursor) => {
return new MatrixEvent(cursor.value);
return cursor.value;
});
});
},
Expand Down Expand Up @@ -283,10 +283,9 @@ IndexedDBStore.prototype.startup = function() {
this._userModifiedMap[u.userId] = u.getLastModifiedTime();
this.storeUser(u);
});
this.storeAccountDataEvents(accountData);
this._syncTs = Date.now(); // pretend we've written so we don't rewrite
this.setSyncToken(syncData.nextBatch);
this._setSyncData(syncData.nextBatch, syncData.roomsData);
this._setSyncData(syncData.nextBatch, syncData.roomsData, accountData);
});
};

Expand Down Expand Up @@ -325,10 +324,13 @@ IndexedDBStore.prototype.save = function() {
return q();
};

IndexedDBStore.prototype._setSyncData = function(nextBatch, roomsData) {
this._syncAccumulator.accumulateRooms({
IndexedDBStore.prototype._setSyncData = function(nextBatch, roomsData, accountData) {
this._syncAccumulator.accumulate({
next_batch: nextBatch,
rooms: roomsData,
account_data: {
events: accountData,
},
});
};

Expand All @@ -344,18 +346,11 @@ IndexedDBStore.prototype._syncToDatabase = function() {
this._userModifiedMap[u.userId] = u.getLastModifiedTime();
});

// TODO: work out changed account data events. They don't have timestamps or IDs.
// so we'll need to hook into storeAccountDataEvents instead to catch them when
// they update from /sync
const changedAccountData = Object.keys(this.accountData).map((etype) => {
return this.accountData[etype];
});

const syncData = this._syncAccumulator.getJSON();

return q.all([
this.backend.persistUsers(changedUsers),
this.backend.persistAccountData(changedAccountData),
this.backend.persistAccountData(syncData.accountData),
this.backend.persistSyncData(syncData.nextBatch, syncData.roomsData),
]);
};
Expand Down
38 changes: 33 additions & 5 deletions src/sync-accumulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class SyncAccumulator {
opts = opts || {};
opts.maxTimelineEntries = opts.maxTimelineEntries || 50;
this.opts = opts;
this.accountData = {
//$event_type: Object
};
this.inviteRooms = {
//$roomId: { ... sync 'invite' json data ... }
};
Expand All @@ -71,15 +74,30 @@ class SyncAccumulator {
this.nextBatch = null;
}

accumulate(syncResponse) {
this._accumulateRooms(syncResponse);
this._accumulateAccountData(syncResponse);
this.nextBatch = syncResponse.next_batch;
}

_accumulateAccountData(syncResponse) {
if (!syncResponse.account_data || !syncResponse.account_data.events) {
return;
}
// Clobbers based on event type.
syncResponse.account_data.events.forEach((e) => {
this.accountData[e.type] = e;
});
}

/**
* Accumulate incremental /sync room data.
* @param {Object} syncResponse the complete /sync JSON
*/
accumulateRooms(syncResponse) {
_accumulateRooms(syncResponse) {
if (!syncResponse.rooms) {
return;
}
this.nextBatch = syncResponse.next_batch;
if (syncResponse.rooms.invite) {
Object.keys(syncResponse.rooms.invite).forEach((roomId) => {
this._accumulateRoom(
Expand Down Expand Up @@ -316,13 +334,15 @@ class SyncAccumulator {
* Return everything under the 'rooms' key from a /sync response which
* represents all room data that should be stored. This should be paired
* with the sync token which represents the most recent /sync response
* provided to accumulateRooms().
* @return {Object} An object with a "nextBatch" key and a "roomsData" key.
* provided to accumulate().
* @return {Object} An object with a "nextBatch", "roomsData" and "accountData"
* keys.
* The "nextBatch" key is a string which represents at what point in the
* /sync stream the accumulator reached. This token should be used when
* restarting a /sync stream at startup. Failure to do so can lead to missing
* events. The "roomsData" key is an Object which represents the entire
* /sync response from the 'rooms' key onwards.
* /sync response from the 'rooms' key onwards. The "accountData" key is
* a list of raw events which represent global account data.
*/
getJSON() {
const data = {
Expand Down Expand Up @@ -434,9 +454,17 @@ class SyncAccumulator {
});
data.join[roomId] = roomJson;
});

// Add account data
const accData = [];
Object.keys(this.accountData).forEach((evType) => {
accData.push(this.accountData[evType]);
});

return {
nextBatch: this.nextBatch,
roomsData: data,
accountData: accData,
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,9 @@ SyncApi.prototype._sync = function(syncOptions) {
this._currentSyncRequest = q.resolve({
next_batch: data.nextBatch,
rooms: data.roomsData,
account_data: {
events: data.accountData,
},
});
isCachedResponse = true;
}
Expand Down Expand Up @@ -576,7 +579,7 @@ SyncApi.prototype._sync = function(syncOptions) {
// accumulated data. We don't want to accumulate the same thing twice, so
// only accumulate if this isn't a cached response.
if (self.opts.syncAccumulator && !isCachedResponse) {
self.opts.syncAccumulator.accumulateRooms(data);
self.opts.syncAccumulator.accumulate(data);
}

// emit synced events
Expand Down