Skip to content

Commit

Permalink
feat: Implement token callback; fix CI testing
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Nov 29, 2024
1 parent 9c8a5d3 commit 9e5fab4
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 615 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"jsdom": "^16.7.0",
"jsdom-global": "3.0.0",
"jsonwebtoken": "^9.0.2",
"mock-socket": "^9.0.3",
"mock-socket": "^9.3.1",
"npm-run-all": "^4.1.5",
"nyc": "^15.1.0",
"prettier": "^2.1.2",
Expand Down
135 changes: 67 additions & 68 deletions src/RealtimeChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ export enum REALTIME_SUBSCRIBE_STATES {

export const REALTIME_CHANNEL_STATES = CHANNEL_STATES

interface PostgresChangesFilters {
postgres_changes: {
id: string
event: string
schema?: string
table?: string
filter?: string
}[]
}
/** A channel is the basic building block of Realtime
* and narrows the scope of data flow to subscribed clients.
* You can think of a channel as a chatroom where participants are able to see who's online
Expand Down Expand Up @@ -202,7 +211,7 @@ export default class RealtimeChannel {

/** Subscribe registers your client with the server */
subscribe(
callback?: (status: `${REALTIME_SUBSCRIBE_STATES}`, err?: Error) => void,
callback?: (status: REALTIME_SUBSCRIBE_STATES, err?: Error) => void,
timeout = this.timeout
): RealtimeChannel {
if (!this.socket.isConnected()) {
Expand All @@ -215,8 +224,10 @@ export default class RealtimeChannel {
const {
config: { broadcast, presence, private: isPrivate },
} = this.params
this._onError((e: Error) => callback && callback('CHANNEL_ERROR', e))
this._onClose(() => callback && callback('CLOSED'))
this._onError((e: Error) =>
callback?.(REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR, e)
)
this._onClose(() => callback?.(REALTIME_SUBSCRIBE_STATES.CLOSED))

const accessTokenPayload: { access_token?: string } = {}
const config = {
Expand All @@ -237,81 +248,71 @@ export default class RealtimeChannel {
this._rejoin(timeout)

this.joinPush
.receive(
'ok',
({
postgres_changes: serverPostgresFilters,
}: {
postgres_changes: {
id: string
event: string
schema?: string
table?: string
filter?: string
}[]
}) => {
.receive('ok', async ({ postgres_changes }: PostgresChangesFilters) => {
if (this.socket.accessTokenCallback) {
let token = await this.socket.accessTokenCallback()
this.socket.setAuth(token)
} else {
this.socket.accessToken &&
this.socket.setAuth(this.socket.accessToken)
}

if (serverPostgresFilters === undefined) {
callback && callback('SUBSCRIBED')
return
} else {
const clientPostgresBindings = this.bindings.postgres_changes
const bindingsLen = clientPostgresBindings?.length ?? 0
const newPostgresBindings = []

for (let i = 0; i < bindingsLen; i++) {
const clientPostgresBinding = clientPostgresBindings[i]
const {
filter: { event, schema, table, filter },
} = clientPostgresBinding
const serverPostgresFilter =
serverPostgresFilters && serverPostgresFilters[i]

if (
serverPostgresFilter &&
serverPostgresFilter.event === event &&
serverPostgresFilter.schema === schema &&
serverPostgresFilter.table === table &&
serverPostgresFilter.filter === filter
) {
newPostgresBindings.push({
...clientPostgresBinding,
id: serverPostgresFilter.id,
})
} else {
this.unsubscribe()
callback &&
callback(
'CHANNEL_ERROR',
new Error(
'mismatch between server and client bindings for postgres changes'
)
)
return
}
if (postgres_changes === undefined) {
callback?.(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
return
} else {
const clientPostgresBindings = this.bindings.postgres_changes
const bindingsLen = clientPostgresBindings?.length ?? 0
const newPostgresBindings = []

for (let i = 0; i < bindingsLen; i++) {
const clientPostgresBinding = clientPostgresBindings[i]
const {
filter: { event, schema, table, filter },
} = clientPostgresBinding
const serverPostgresFilter =
postgres_changes && postgres_changes[i]

if (
serverPostgresFilter &&
serverPostgresFilter.event === event &&
serverPostgresFilter.schema === schema &&
serverPostgresFilter.table === table &&
serverPostgresFilter.filter === filter
) {
newPostgresBindings.push({
...clientPostgresBinding,
id: serverPostgresFilter.id,
})
} else {
this.unsubscribe()
callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(
'mismatch between server and client bindings for postgres changes'
)
)
return
}
}

this.bindings.postgres_changes = newPostgresBindings
this.bindings.postgres_changes = newPostgresBindings

callback && callback('SUBSCRIBED')
return
}
callback && callback(REALTIME_SUBSCRIBE_STATES.SUBSCRIBED)
return
}
)
})
.receive('error', (error: { [key: string]: any }) => {
callback &&
callback(
'CHANNEL_ERROR',
new Error(
JSON.stringify(Object.values(error).join(', ') || 'error')
)
callback?.(
REALTIME_SUBSCRIBE_STATES.CHANNEL_ERROR,
new Error(
JSON.stringify(Object.values(error).join(', ') || 'error')
)
)
return
})
.receive('timeout', () => {
callback && callback('TIMED_OUT')
callback?.(REALTIME_SUBSCRIBE_STATES.TIMED_OUT)
return
})
}
Expand Down Expand Up @@ -523,7 +524,6 @@ export default class RealtimeChannel {

return new Promise((resolve) => {
const leavePush = new Push(this, CHANNEL_EVENTS.leave, {}, timeout)

leavePush
.receive('ok', () => {
onClose()
Expand All @@ -538,7 +538,6 @@ export default class RealtimeChannel {
})

leavePush.send()

if (!this._canPush()) {
leavePush.trigger('ok', {})
}
Expand Down
Loading

0 comments on commit 9e5fab4

Please sign in to comment.