-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.js
160 lines (140 loc) · 6.65 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
//This code is still being developed. Recovery after error working, but not at all set up for multiple connections
async function app () {
let supaclient = supabase.createClient(SUPABASE_URL, SUPABASE_ANON_KEY)
// NEXT SECTION IS JUST FOR TESTING
// this is a loop to generate 20 updates or inserts for testing while the subscription is setup
// The result should be a table with 20 rows all having after-x for message
// First set table to before for multiple runs
const {data:data1,error:error1} = await supaclient.from('realtest').delete().not('id','is',null)
console.log(data1,error1)
for (let i=1; i <= 20; i++ ) {
const {data:data2,error:error2} = await supaclient.from('realtest').insert({id:i,message:'before'+i})
}
console.log('table initialized')
let i = 1
let interval = setInterval(function() {
if (i <= 20) {
console.log('update',i)
supaclient.from('realtest').insert({id:i+20,message:'insert'+(i+20)}).then() // add 20 new rows
supaclient.from('realtest').update({message: 'after' + i}).eq('id', i).then() //update first 20 rows
i++
}
else {
clearInterval(interval)
}
}, 50)
// END OF TESTING SETUP
let mySubscription = {}
async function startStream(memoryTable,tableName,primaryCol,eventHandler,handleTableInit) {
console.log('startStream',tableName)
let eventQueue = [] // holds realtime events before initial table load
let initMemoryTable = false // Don't need to queue anymore when true
let connected = false // The postgres_changes event fires continuously so need flag
mySubscription = supaclient
.channel('myChannel')
.on('postgres_changes',
{event: '*', schema: 'public', table: tableName,}, (payload) => {
console.log('event, ', payload.eventType, payload.new.id) // console only works with a column of id
if (initMemoryTable)
eventHandler(payload,memoryTable,primaryCol) // running normally
else
eventQueue.unshift(payload) // insert for later
})
.subscribe((status) => {
console.log('subscribe_status, ' + status);
if (status !== 'SUBSCRIBED') {
connectionErrorHandler(status)
}
})
.on('system', {}, payload => { // need this to know when REALLY connected
if (payload.extension === 'postgres_changes') {
if (!connected) {
console.log('postgres_changes received, load initial data')
connected = true
handleTableInit(memoryTable,tableName,primaryCol).then(result=>{
//merge previous events into data table
eventQueue.forEach((row) => {
eventHandler(row, memoryTable, primaryCol)
})
initMemoryTable = true
})
}
}
})
}
//handleEvent and handleTableEvent is separate as it might be different based on insert/update/delete pattern desired
function handleEvent(payload,memoryTable,primaryCol) {
console.log('handleEvent', payload.eventType, payload.new.id,primaryCol) //console only works with an id col.
switch (payload.eventType) {
case "INSERT": //Doing an "upsert" to handle inserting to existing ids from the eventqueue
let objIndex1 = memoryTable.findIndex((obj => obj[primaryCol] === payload.new[primaryCol]));
if (objIndex1 !== -1)
memoryTable[objIndex1] = payload.new
else
memoryTable.unshift(payload.new) //insert here just adds to end -- no resort of order in this test
break
case "UPDATE":
let objIndex2 = memoryTable.findIndex((obj => obj[primaryCol] === payload.new[primaryCol]));
if (objIndex2 !== -1)
memoryTable[objIndex2] = payload.new
break
case "DELETE":
let objIndex3 = memoryTable.findIndex((obj => obj[primaryCol] === payload.old[primaryCol]));
if (objIndex3 !== -1)
memoryTable.splice(objIndex3, 1)
break
}
}
async function handleTableInit(memoryTable,tableName,primaryCol) {
const result = await supaclient.from(tableName).select('*').order(primaryCol)
console.log('initial data loaded', result)
memoryTable.push(...result.data)
}
console.log('start subscription')
let restart = false
let testTable = []
const pCol = 'id' // would need changes for composite primary
const table = "realtest"
startStream(testTable,table,pCol,handleEvent,handleTableInit)
//test need a counter for additional ids....
let afterid=41
async function start_up() {
console.log('start_up')
if (document.visibilityState === 'visible' && restart) {
console.log('start stream')
startStream(testTable,table,pCol,handleEvent,handleTableInit)
restart = false
//test inserts
console.log('test insert after')
supaclient.from('realtest').insert({id:afterid,message:'insert'+(afterid)}).then() // test to add row after reconnect
afterid++ //test
}
}
async function connectionErrorHandler (status) {
restart = true
if (status !== 'CLOSED') mySubscription.unsubscribe()
console.log('disconnect',status)
if (document.visibilityState === 'visible') {
start_up() // got an error, but tab still running so restart
}
}
document.onvisibilitychange = () => {
console.log('visibility change',document.visibilityState)
if (document.visibilityState === 'visible')
start_up()
else {} //right now doing nothing on hidden. Another option is to set a time to close the subscription after x minutes
};
// just to show final result
setTimeout(function(){
console.log('table after ', testTable)
}, 2000)
}
document.addEventListener("DOMContentLoaded", function(event) {
app()
})
let startTime = Date.now()
function logToHTML (message) {
let node = document.createElement("div");
node.appendChild(document.createTextNode((Date.now()-startTime) +', '+ message));
document.getElementById("log").appendChild(node);
}