Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Add NodeJS server example that uses Websockets/SSE for SUBSCRIBE #10

Open
chuck-alt-delete opened this issue Feb 15, 2023 · 3 comments
Labels
enhancement New feature or request

Comments

@chuck-alt-delete
Copy link
Contributor

We have a nice internal example here:

It would be nice if we could simplify this down to a minimal but useful websocket app .

We may want to wait until the websocket API endpoint is considered stable.

@chuck-alt-delete chuck-alt-delete added the enhancement New feature or request label Feb 15, 2023
@canadaduane
Copy link

I'd love to know if there is more info available on this. However, just thinking about it... if the websocket API endpoint is used as an example, I don't see how that would be a practical starting-off point for a react app? The websocket API seems to require "full access" authorization, i.e. access to the database that a web app should never have. I'd like to see a little bit more practical usage, perhaps with an intermediating auth layer?

@chuck-alt-delete
Copy link
Contributor Author

chuck-alt-delete commented Jun 27, 2023

Updating this issue since another customer came across this and wants to read updates into a frontend over websockets. A good place for a websockets API example would probably be the NodeJS backend example

This would allow the backend to handle auth, which is a good point by @canadaduane.

To disambiguate, there are two ways to receive updates from Materialize:

  1. The websocket api (wss://<host>/api/experimental/sql)
  2. Using SUBSCRIBE over pgwire protocol (eg any postgres client library) as an infinite cursor

Either way the NodeJS webserver gets the data, it can expose these updates over websockets or SSE.

@chuck-alt-delete chuck-alt-delete changed the title Feature request: Add a React + websockets example Feature request: Add a NodeJS + websockets example Jun 27, 2023
@chuck-alt-delete chuck-alt-delete changed the title Feature request: Add a NodeJS + websockets example Feature request: Add SUBSCRIBE to NodeJS example, with updates exposed over REST API endpoint Jun 27, 2023
@chuck-alt-delete chuck-alt-delete changed the title Feature request: Add SUBSCRIBE to NodeJS example, with updates exposed over REST API endpoint Feature request: Add REST API endpoint for SUBSCRIBE to NodeJS example, with updates exposed over websockets Jun 27, 2023
@chuck-alt-delete
Copy link
Contributor Author

chuck-alt-delete commented Jun 29, 2023

An alternative to websockets would be Server-Sent Events (SSE). SSE is a one-way push protocol where the server pushes events to the client over a long-lived HTTP connection. It is a little more straightforward and lightweight than websockets. Since SUBSCRIBE doesn't require 2-way communication, SSE is a perfectly good way to make updates available over HTTP.

Here is a minimal express.js app that achieves this:

require('dotenv').config();
const express = require('express');
const app = express();
const { Pool } = require('pg');

// Configure the Postgres client
const pool = new Pool({
  user: process.env.MZ_USER,
  host: process.env.MZ_HOST,
  database: process.env.MZ_DB,
  password: process.env.MZ_PASSWORD,
  port: process.env.MZ_PORT,
  ssl: true
});

app.get('/data', (req, res) => {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders();

    let client;
    let loopActive = true;

    const handleError = (err) => {
        console.error(err);
        res.end();
        loopActive = false;
    };

    req.on('close', () => {
        res.end();
        loopActive = false;
        console.log('client closed');
    });

    (async () => {
        try {
            client = await pool.connect();
            await client.query('BEGIN');
            await client.query('DECLARE c CURSOR FOR SUBSCRIBE t ENVELOPE UPSERT (KEY(id))');

            while (loopActive) {
                const data = await client.query('FETCH ALL c');
                data.rows.forEach(function(row) {
                    // map row fields
                    row = {
                        mz_timestamp: Number(row.mz_timestamp),
                        mz_state: row.mz_state,
                        id: row.id,
                        content: row.content
                    }
                    // publish server-sent events
                    res.write(`data: ${JSON.stringify(row)}\n\n`);
                });
            }
        } catch (err) {
            handleError(err);
        } finally {
            if (client) {
                console.log('closing pg client');
                client.release();
            }
        }
    })();
});


app.listen(3000, function () {
  console.log('Example app listening on port 3000!');
});

After pointing a browser at http://localhost:3000/data, updates from the view t arrive immediately:

data: {"mz_timestamp":1687974778434,"mz_state":"upsert","id":2,"content":"hi"}

data: {"mz_timestamp":1687974778434,"mz_state":"upsert","id":3,"content":"ho"}

data: {"mz_timestamp":1687974780411,"mz_state":"upsert","id":1,"content":"bloop"}

data: {"mz_timestamp":1687974805446,"mz_state":"delete","id":1,"content":null}

@chuck-alt-delete chuck-alt-delete changed the title Feature request: Add REST API endpoint for SUBSCRIBE to NodeJS example, with updates exposed over websockets Feature request: Add NodeJS server example that uses Websockets/SSE for SUBSCRIBE Jun 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants