Skip to content

Commit

Permalink
feat (ai/core): reworked data stream management (#3919)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgrammel authored Nov 28, 2024
1 parent c738971 commit fda9695
Show file tree
Hide file tree
Showing 49 changed files with 1,606 additions and 359 deletions.
5 changes: 5 additions & 0 deletions .changeset/big-balloons-compete.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'ai': patch
---

feat (ai/core): reworked data stream management
33 changes: 19 additions & 14 deletions content/cookbook/15-api-servers/10-node-http-server.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,34 @@ createServer(async (req, res) => {
}).listen(8080);
```

### Data Stream With Stream Data
### Sending Custom Data

`pipeDataStreamToResponse` can be used with `StreamData` to send additional data to the client.
`pipeDataStreamToResponse` can be used to send custom data to the client.

```ts filename='index.ts' highlight="6-7,12-15,18"
```ts filename='index.ts' highlight="6-9,16"
import { openai } from '@ai-sdk/openai';
import { StreamData, streamText } from 'ai';
import { pipeDataStreamToResponse, streamText } from 'ai';
import { createServer } from 'http';

createServer(async (req, res) => {
const data = new StreamData();
data.append('initialized call');
// immediately start streaming the response
pipeDataStreamToResponse(res, {
execute: async dataStreamWriter => {
dataStreamWriter.writeData('initialized call');

const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
onFinish() {
data.append('call completed');
data.close();
const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
});

result.mergeIntoDataStream(dataStreamWriter);
},
onError: error => {
// Error messages are masked by default for security reasons.
// If you want to expose the error message to the client, you can do so here:
return error instanceof Error ? error.message : String(error);
},
});

result.pipeDataStreamToResponse(res, { data });
}).listen(8080);
```

Expand Down
36 changes: 20 additions & 16 deletions content/cookbook/15-api-servers/20-express.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,36 @@ app.listen(8080, () => {
});
```

### Data Stream With Stream Data
### Sending Custom Data

`pipeDataStreamToResponse` can be used with `StreamData` to send additional data to the client.
`pipeDataStreamToResponse` can be used to send custom data to the client.

```ts filename='index.ts' highlight="8-10,15-18,21"
```ts filename='index.ts' highlight="8-11,18"
import { openai } from '@ai-sdk/openai';
import { StreamData, streamText } from 'ai';
import { pipeDataStreamToResponse, streamText } from 'ai';
import express, { Request, Response } from 'express';

const app = express();

app.post('/', async (req: Request, res: Response) => {
// use stream data (optional):
const data = new StreamData();
data.append('initialized call');
app.post('/stream-data', async (req: Request, res: Response) => {
// immediately start streaming the response
pipeDataStreamToResponse(res, {
execute: async dataStreamWriter => {
dataStreamWriter.writeData('initialized call');

const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
onFinish() {
data.append('call completed');
data.close();
const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
});

result.mergeIntoDataStream(dataStreamWriter);
},
onError: error => {
// Error messages are masked by default for security reasons.
// If you want to expose the error message to the client, you can do so here:
return error instanceof Error ? error.message : String(error);
},
});

result.pipeDataStreamToResponse(res, { data });
});

app.listen(8080, () => {
Expand Down
36 changes: 21 additions & 15 deletions content/cookbook/15-api-servers/30-hono.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -52,38 +52,44 @@ app.post('/', async c => {
serve({ fetch: app.fetch, port: 8080 });
```

### Data Stream With Stream Data
### Sending Custom Data

`toDataStream` can be used with `StreamData` to send additional data to the client.
`createDataStream` can be used to send custom data to the client.

```ts filename='index.ts' highlight="11-13,18-21,28"
```ts filename='index.ts' highlight="10-13,20"
import { openai } from '@ai-sdk/openai';
import { serve } from '@hono/node-server';
import { StreamData, streamText } from 'ai';
import { createDataStream, streamText } from 'ai';
import { Hono } from 'hono';
import { stream } from 'hono/streaming';

const app = new Hono();

app.post('/', async c => {
// use stream data (optional):
const data = new StreamData();
data.append('initialized call');
app.post('/stream-data', async c => {
// immediately start streaming the response
const dataStream = createDataStream({
execute: async dataStreamWriter => {
dataStreamWriter.writeData('initialized call');

const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
onFinish() {
data.append('call completed');
data.close();
const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
});

result.mergeIntoDataStream(dataStreamWriter);
},
onError: error => {
// Error messages are masked by default for security reasons.
// If you want to expose the error message to the client, you can do so here:
return error instanceof Error ? error.message : String(error);
},
});

// Mark the response as a v1 data stream:
c.header('X-Vercel-AI-Data-Stream', 'v1');
c.header('Content-Type', 'text/plain; charset=utf-8');

return stream(c, stream => stream.pipe(result.toDataStream({ data })));
return stream(c, stream => stream.pipe(dataStream));
});

serve({ fetch: app.fetch, port: 8080 });
Expand Down
36 changes: 21 additions & 15 deletions content/cookbook/15-api-servers/40-fastify.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -50,36 +50,42 @@ fastify.post('/', async function (request, reply) {
fastify.listen({ port: 8080 });
```

### Data Stream With Stream Data
### Sending Custom Data

`toDataStream` can be used with `StreamData` to send additional data to the client.
`createDataStream` can be used to send custom data to the client.

```ts filename='index.ts' highlight="8-10,15-18,25"
```ts filename='index.ts' highlight="8-11,18"
import { openai } from '@ai-sdk/openai';
import { StreamData, streamText } from 'ai';
import { createDataStream, streamText } from 'ai';
import Fastify from 'fastify';

const fastify = Fastify({ logger: true });

fastify.post('/', async function (request, reply) {
// use stream data (optional):
const data = new StreamData();
data.append('initialized call');
fastify.post('/stream-data', async function (request, reply) {
// immediately start streaming the response
const dataStream = createDataStream({
execute: async dataStreamWriter => {
dataStreamWriter.writeData('initialized call');

const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
onFinish() {
data.append('call completed');
data.close();
const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
});

result.mergeIntoDataStream(dataStreamWriter);
},
onError: error => {
// Error messages are masked by default for security reasons.
// If you want to expose the error message to the client, you can do so here:
return error instanceof Error ? error.message : String(error);
},
});

// Mark the response as a v1 data stream:
reply.header('X-Vercel-AI-Data-Stream', 'v1');
reply.header('Content-Type', 'text/plain; charset=utf-8');

return reply.send(result.toDataStream({ data }));
return reply.send(dataStream);
});

fastify.listen({ port: 8080 });
Expand Down
38 changes: 21 additions & 17 deletions content/cookbook/15-api-servers/50-nest.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,37 @@ export class AppController {
}
```

### Data Stream With Stream Data
### Sending Custom Data

`pipeDataStreamToResponse` can be used with `StreamData` to send additional data to the client.
`createDataStream` can be used to send custom data to the client.

```ts filename='app.controller.ts' highlight="10-11,16-19,22"
```ts filename='app.controller.ts' highlight="10-12,19"
import { Controller, Post, Res } from '@nestjs/common';
import { openai } from '@ai-sdk/openai';
import { StreamData, streamText } from 'ai';
import { createDataStream, streamText } from 'ai';
import { Response } from 'express';

@Controller()
export class AppController {
@Post()
async example(@Res() res: Response) {
const data = new StreamData();
data.append('initialized call');

const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
onFinish() {
data.append('call completed');
data.close();
@Post('/stream-data')
async streamData(@Res() res: Response) {
pipeDataStreamToResponse(res, {
execute: async dataStreamWriter => {
dataStreamWriter.writeData('initialized call');

const result = streamText({
model: openai('gpt-4o'),
prompt: 'Invent a new holiday and describe its traditions.',
});

result.mergeIntoDataStream(dataStreamWriter);
},
onError: error => {
// Error messages are masked by default for security reasons.
// If you want to expose the error message to the client, you can do so here:
return error instanceof Error ? error.message : String(error);
},
});

result.pipeDataStreamToResponse(res, { data });
}
}
```
Expand Down
82 changes: 46 additions & 36 deletions content/docs/04-ai-sdk-ui/20-streaming-data.mdx
Original file line number Diff line number Diff line change
@@ -1,63 +1,73 @@
---
title: Streaming Data
description: Welcome to the AI SDK documentation!
title: Streaming Custom Data
description: Learn how to stream custom data to the client.
---

# Streaming Data
# Streaming Custom Data

It is often useful to send additional data alongside the model's response.
For example, you may want to send status information, the message ids after storing them,
or references to content that the language model is referring to.

The AI SDK provides a `StreamData` helper that allows you to stream additional data to the client
and attach it either to the `Message` or to the `data` object of the `useChat` hook.
The AI SDK provides several helpers that allows you to stream additional data to the client
and attach it either to the `Message` or to the `data` object of the `useChat` hook:

- `createDataStream`: creates a data stream
- `createDataStreamResponse`: creates a response object that streams data
- `pipeDataStreamToResponse`: pipes a data stream to a server response object

The data is streamed as part of the response stream.

## Sending Stream Data
## Sending Custom Data from the Server

In your server-side route handler, you can use `StreamData` in combination with `streamText`.
In your server-side route handler, you can use `createDataStreamResponse` and `pipeDataStreamToResponse` in combination with `streamText`.
You need to:

1. Initialize a `StreamData` object.
2. Append data to it, which can either be message annotations (`appendMessageAnnotation`) or call annotations (`append`) .
3. Close the `StreamData` object.
4. Return the `StreamData` object in the `toDataStreamResponse` call.
1. Call `createDataStreamResponse` or `pipeDataStreamToResponse` to get a callback function with a `DataStreamWriter`.
2. Write to the `DataStreamWriter` to stream additional data.
3. Merge the `streamText` result into the `DataStreamWriter`.
4. Return the response from `createDataStreamResponse` (if that method is used)

Here is an example:

```tsx highlight="7,8,10,11, 16-20,22,23,25,26,31,32"
```tsx highlight="7-10,16,19-23,25-26,30"
import { openai } from '@ai-sdk/openai';
import { streamText, StreamData } from 'ai';
import { generateId, createDataStreamResponse, streamText } from 'ai';

export async function POST(req: Request) {
const { messages } = await req.json();

// Create a new StreamData object
const data = new StreamData();

// Append to general streamed data
data.append({ test: 'initialized calls' });

const result = streamText({
model: openai('gpt-4-turbo'),
onFinish() {
// message annotation:
data.appendMessageAnnotation({
id: generateId(), // e.g. id from saved DB record
other: 'information',
// immediately start streaming (solves RAG issues with status, etc.)
return createDataStreamResponse({
execute: dataStream => {
dataStream.writeData('initialized call');

const result = streamText({
model: openai('gpt-4o'),
messages,
onChunk() {
dataStream.writeMessageAnnotation({ chunk: '123' });
},
onFinish() {
// message annotation:
dataStream.writeMessageAnnotation({
id: generateId(), // e.g. id from saved DB record
other: 'information',
});

// call annotation:
dataStream.writeData('call completed');
},
});

// call annotation (can be any JSON serializable value)
data.append('call completed');

// close the StreamData object
data.close();
result.mergeIntoDataStream(dataStream);
},
onError: error => {
// Error messages are masked by default for security reasons.
// If you want to expose the error message to the client, you can do so here:
return error instanceof Error ? error.message : String(error);
},
messages,
});

// Respond with the stream and additional StreamData
return result.toDataStreamResponse({ data });
}
```

Expand All @@ -67,7 +77,7 @@ export async function POST(req: Request) {
Protocol](/docs/ai-sdk-ui/stream-protocol#data-stream-protocol).
</Note>

## Stream Data in useChat
## Processing Custom Data in `useChat`

The `useChat` hook automatically processes the streamed data and makes it available to you.

Expand Down
Loading

0 comments on commit fda9695

Please sign in to comment.