Skip to content

Commit

Permalink
Merge pull request #3 from openartmarket/2-merge-on-update
Browse files Browse the repository at this point in the history
Merge streamed updates with existing record
  • Loading branch information
aslakhellesoy authored Dec 9, 2023
2 parents d031724 + 05694ef commit 0647f61
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Fixed
- Merge streamed updates with existing record. Fixes [#2](https://github.com/openartmarket/supabase-live-table/issues/2)

## [0.0.7] - 2023-09-23
### Fixed
- Ignore updates that arrive after the snapshot, but are timestamped before the snapshot
Expand Down
24 changes: 24 additions & 0 deletions docs/sequence-diagrams/merges-updates-with-previous-value.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
### merges updates with previous value

```mermaid
sequenceDiagram
LiveTable->>+Supabase: subscribe
Supabase->>-LiveTable: subscription active
LiveTable->>+Supabase: get snapshot
Supabase-->>LiveTable: UPDATE {"id":1,"created_at":"1","updated_at":"3","type":"vehicle","color":"red"}
Supabase->>-LiveTable: snaphot: [{"id":1,"created_at":"1","updated_at":"2","type":"vehicle","name":"Bicycle","color":"black"}]
```

### replica
```json
[
{
"id": 1,
"created_at": "1",
"updated_at": "3",
"type": "vehicle",
"name": "Bicycle",
"color": "red"
}
]
```
11 changes: 8 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type Insert<TableRow extends LiveRow> = {

type Update<TableRow extends LiveRow> = {
type: 'UPDATE';
record: TableRow;
record: Partial<TableRow>;
timestamp: string;
};

Expand Down Expand Up @@ -180,10 +180,15 @@ export class LiveTable<TableRow extends LiveRow> implements ILiveTable<TableRow>
break;
}
case 'UPDATE': {
if (!this.recordById.has(record.id)) {
const id = record.id;
if (!id) {
throw new Error(`Cannot update. Record has no id: ${JSON.stringify(record)}`);
}
const oldRecord = this.recordById.get(id);
if (oldRecord === undefined) {
throw new Error(`Cannot update. Record does not exist: ${JSON.stringify(record)}`);
}
this.recordById.set(record.id, record);
this.recordById.set(id, { ...oldRecord, ...record });
break;
}
case 'DELETE': {
Expand Down
31 changes: 31 additions & 0 deletions test/liveTableBuffering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,37 @@ describe('LiveTable Buffering', () => {
await lt.close();
});

it('merges updates with previous value', async (test) => {
const lt = new MermaidLiveTable(new LiveTable<ThingRow>(parseTimestamp), test.task.name);

lt.subscribe();
lt.subscribed();
lt.requestSnapshot();

const streamRecord: Partial<ThingRow> = {
id: 1,
created_at: t1,
updated_at: t3,
type: 'vehicle',
color: 'red',
};
lt.processEvent({ timestamp: t3, type: 'UPDATE', record: streamRecord });

const snapshotRecord = {
id: 1,
created_at: t1,
updated_at: t2,
type: 'vehicle',
name: 'Bicycle',
color: 'black',
};
lt.processSnapshot([snapshotRecord]);

expect(lt.records).toEqual([{ ...snapshotRecord, color: 'red', updated_at: t3 }]);

await lt.close();
});

it('replays deletes that arrived after the snapshot', async (test) => {
const lt = new MermaidLiveTable(new LiveTable<ThingRow>(parseTimestamp), test.task.name);

Expand Down

0 comments on commit 0647f61

Please sign in to comment.