Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: update gatekeeper auth client to use onError. #2028

Merged
merged 2 commits into from
Nov 22, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 30 additions & 42 deletions examples/gatekeeper-auth/client/index.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,42 @@
import { Shape, ShapeStream } from '@electric-sql/client'
import { FetchError, Shape, ShapeStream } from '@electric-sql/client'

const API_URL = process.env.API_URL || "http://localhost:4000"

interface Definition {
table: string,
where?: string,
columns?: string
}

/*
* Fetch the shape options and start syncing. When new data is recieved,
* log the number of rows. When an auth token expires, reconnect.
* Makes a request to the gatekeeper endpoint to fetch a config object
* in the format expected by the ShapeStreamOptions including the
* proxy `url` to connect to and auth `headers`.
*/
async function sync(definition: Definition, handle?: string, offset: string = '-1') {
console.log('sync: ', offset)
async function fetchConfig() {
const url = `${API_URL}/gatekeeper/items`

const options = await fetchShapeOptions(definition)
const stream = new ShapeStream({...options, handle: handle, offset: offset})
const shape = new Shape(stream)

shape.subscribe(async ({ rows }) => {
if (shape.error && 'status' in shape.error) {
const status = shape.error.status
console.warn('fetch error: ', status)
const resp = await fetch(url, {method: "POST"})
return await resp.json()
}

// Stream the shape through the proxy, using the url and auth headers
// provided by the gatekeeper.
const config = await fetchConfig()
const stream = new ShapeStream({...config, onError: async (error) => {
if (error instanceof FetchError) {
thruflo marked this conversation as resolved.
Show resolved Hide resolved
const status = error.status
console.log('handling fetch error: ', status)

// If the auth token is invalid or expires, hit the gatekeeper
// again to update the auth headers and thus keep streaming
// without interruption.
if (status === 401 || status === 403) {
shape.unsubscribeAll()

return await sync(definition, shape.handle, shape.lastOffset)
return await fetchConfig()
}
}
else {
console.log('num rows: ', rows ? rows.length : 0)
}
})
}

/*
* Make a request to the gatekeeper endpoint to get the proxy url and
* auth headers to connect to/with.
*/
async function fetchShapeOptions(definition: Definition) {
const { table, ...params} = definition

const qs = new URLSearchParams(params).toString()
const url = `${API_URL}/gatekeeper/${table}${qs ? '?' : ''}${qs}`

const resp = await fetch(url, {method: "POST"})
return await resp.json()
}
throw error
}
})

// Start syncing.
await sync({table: 'items'})
// Materialize the stream into a `Shape` and subscibe to data changes
// so we can see the client working.
const shape = new Shape(stream)
shape.subscribe(({ rows }) => {
console.log('num rows: ', rows ? rows.length : 0)
})
Loading