Skip to content

Commit

Permalink
feat: experimental sse adapter (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 authored Aug 6, 2024
1 parent 31f759f commit d662e48
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 182 deletions.
62 changes: 62 additions & 0 deletions docs/2.adapters/sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
---
icon: oui:token-event
---

# SSE

> Integrate CrossWS with [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).
If your deployment server is incapable of of handling WebSocket upgrades but support standard web API ([`Request`](https://developer.mozilla.org/en-US/docs/Web/API/Request) and [`Response`](https://developer.mozilla.org/en-US/docs/Web/API/Response)) you can integrate crossws to act as a one way (server to client) handler using [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events).

> [!IMPORTANT]
> This is an experimental adapter and works only with a limited subset of CrossWS functionalities.
> [!IMPORTANT]
> Instead of [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) client you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client to connect such server.
```ts
import sseAdapter from "crossws/adapters/sse";

const sse = sseAdapter({
hooks: {
upgrade(request) {
// Handle upgrade logic
// You can return a custom response to abort
// You can return { headers } to override default headers
},
open(peer) {
// Use this hook to send messages to peer
peer.send("hello!");
},
},
});
```

Inside your Web compatible server handler:

```js
async fetch(request) {
const url = new URL(request.url)

// Handle SSE
if (url.pathname === "/sse" && request.headers.get("accept") === "text/event-stream") {
return sse.fetch(request);
}

return new Response("server is up!")
}
```

In order to connect to the server, you need to use [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) as client:

```js
const ev = new EventSource("http://<server>/sse");

ev.addEventListener("message", (event) => {
console.log(event.data); // hello!
});
```

::read-more
See [`playground/sse.ts`](https://github.com/unjs/crossws/tree/main/playground/sse.ts) for demo and [`src/adapters/sse.ts`](https://github.com/unjs/crossws/tree/main/src/adapters/sse.ts) for implementation.
::
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"play:cf-durable": "wrangler dev --port 3001 -c test/fixture/wrangler-durable.toml",
"play:deno": "deno run -A test/fixture/deno.ts",
"play:node": "jiti test/fixture/node.ts",
"play:sse": "bun test/fixture/sse.ts",
"play:uws": "jiti test/fixture/uws.ts",
"release": "pnpm test && pnpm build && changelogen --release && npm publish && git push --follow-tags",
"test": "pnpm lint && pnpm test:types && vitest run",
Expand All @@ -87,6 +88,7 @@
"@cloudflare/workers-types": "^4.20240729.0",
"@deno/types": "^0.0.1",
"@types/bun": "^1.1.6",
"@types/eventsource": "^1.1.15",
"@types/node": "^22.1.0",
"@types/web": "^0.0.153",
"@types/ws": "^8.5.12",
Expand All @@ -96,6 +98,7 @@
"consola": "^3.2.3",
"eslint": "^9.8.0",
"eslint-config-unjs": "^0.3.2",
"eventsource": "^2.0.2",
"execa": "^9.3.0",
"get-port-please": "^3.1.2",
"h3": "^1.12.0",
Expand Down
17 changes: 17 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 108 additions & 0 deletions src/adapters/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events

import { WebSocketServer as _WebSocketServer } from "ws";
import { Peer } from "../peer";
import {
AdapterOptions,
AdapterInstance,
defineWebSocketAdapter,
} from "../types";
import { AdapterHookable } from "../hooks";
import { adapterUtils, toBufferLike } from "../_utils";

export interface SSEAdapter extends AdapterInstance {
fetch(req: Request): Promise<Response>;
}

export interface SSEOptions extends AdapterOptions {}

export default defineWebSocketAdapter<SSEAdapter, SSEOptions>(
(options = {}) => {
const hooks = new AdapterHookable(options);
const peers = new Set<SSEPeer>();

return {
...adapterUtils(peers),
fetch: async (request: Request) => {
const _res = await hooks.callHook("upgrade", request);
if (_res instanceof Response) {
return _res;
}

const peer = new SSEPeer({ peers, sse: { request, hooks } });

let headers: HeadersInit = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
};
if (_res?.headers) {
headers = new Headers(headers);
for (const [key, value] of new Headers(_res.headers)) {
headers.set(key, value);
}
}

return new Response(peer._sseStream, { ..._res, headers });
},
};
},
);

class SSEPeer extends Peer<{
peers: Set<SSEPeer>;
sse: {
request: Request;
hooks: AdapterHookable;
};
}> {
_sseStream: ReadableStream;
_sseStreamController?: ReadableStreamDefaultController;
constructor(internal: SSEPeer["_internal"]) {
super(internal);
this._sseStream = new ReadableStream({
start: (controller) => {
this._sseStreamController = controller;
this._internal.sse.hooks.callHook("open", this);
},
cancel: () => {
this._internal.sse.hooks.callHook("close", this);
},
});
}

get url() {
return this._internal.sse.request.url;
}

get headers() {
return this._internal.sse.request.headers;
}

send(message: any) {
let data = toBufferLike(message);
if (typeof data !== "string") {
// eslint-disable-next-line unicorn/prefer-code-point
data = btoa(String.fromCharCode(...new Uint8Array(data)));
}
this._sseStreamController?.enqueue(`event: message\ndata: ${data}\n\n`);
return 0;
}

publish(topic: string, message: any) {
const data = toBufferLike(message);
for (const peer of this._internal.peers) {
if (peer !== this && peer._topics.has(topic)) {
peer._sseStreamController?.enqueue(data);
}
}
}

close() {
this._sseStreamController?.close();
}

terminate() {
this.close();
}
}
8 changes: 6 additions & 2 deletions test/_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ export function wsConnect(
return Object.assign(connectPromise, res) as Promise<typeof res>;
}

export function wsTestsExec(cmd: string, opts: Parameters<typeof wsTests>[1]) {
export function wsTestsExec(
cmd: string,
opts: Parameters<typeof wsTests>[1],
tests = wsTests,
) {
let childProc: ExecaRes;
let url: string;
beforeAll(async () => {
Expand Down Expand Up @@ -132,5 +136,5 @@ export function wsTestsExec(cmd: string, opts: Parameters<typeof wsTests>[1]) {
afterAll(async () => {
await childProc.kill();
});
wsTests(() => url, opts);
tests(() => url, opts);
}
19 changes: 19 additions & 0 deletions test/adapters/sse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { describe, test, expect } from "vitest";
import { wsTestsExec } from "../_utils";
import EventSource from "eventsource";

describe("sse", () => {
wsTestsExec("bun run ./sse.ts", { adapter: "sse" }, (getURL, opts) => {
test("connects to the server", async () => {
const url = getURL().replace("ws", "http");
const ev = new EventSource(url);
const messages: string[] = [];
ev.addEventListener("message", (event) => {
messages.push(event.data);
});
await new Promise((resolve) => ev.addEventListener("open", resolve));
ev.close();
expect(messages).toMatchObject(["Welcome to the server #1!"]);
});
});
});
Loading

0 comments on commit d662e48

Please sign in to comment.