Skip to content

Commit

Permalink
Support uploading batches of ndjson vectors from a source file
Browse files Browse the repository at this point in the history
  • Loading branch information
ndisidore committed Sep 26, 2023
1 parent 3cd7286 commit eade8a5
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 21 deletions.
7 changes: 7 additions & 0 deletions .changeset/eleven-monkeys-tan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"wrangler": patch
---

Adds Vectorize support uploading batches of newline delimited json (ndjson)
vectors from a source file.
Load a dataset with `vectorize insert my-index --file vectors.ndjson`
2 changes: 2 additions & 0 deletions packages/wrangler/src/__tests__/vectorize/vectorize.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ describe("vectorize help", () => {
wrangler vectorize delete <name> Delete a Vectorize index
wrangler vectorize get <name> Get a Vectorize index by name
wrangler vectorize list List your Vectorize indexes
wrangler vectorize insert <name> Insert vectors into a Vectorize index
Flags:
-j, --experimental-json-config Experimental: Support wrangler.json [boolean]
Expand Down Expand Up @@ -64,6 +65,7 @@ describe("vectorize help", () => {
wrangler vectorize delete <name> Delete a Vectorize index
wrangler vectorize get <name> Get a Vectorize index by name
wrangler vectorize list List your Vectorize indexes
wrangler vectorize insert <name> Insert vectors into a Vectorize index
Flags:
-j, --experimental-json-config Experimental: Support wrangler.json [boolean]
Expand Down
155 changes: 155 additions & 0 deletions packages/wrangler/src/__tests__/vectorize/vectorize.upsert.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import { writeFileSync } from "node:fs";
import { MockedRequest, rest, type RestRequest } from "msw";
import { FormData } from "undici";
import { mockAccountId, mockApiToken } from "../helpers/mock-account-id";
import { mockConsoleMethods } from "../helpers/mock-console";
import { msw } from "../helpers/msw";
import { FileReaderSync } from "../helpers/msw/read-file-sync";
import { runInTempDir } from "../helpers/run-in-tmp";
import { runWrangler } from "../helpers/run-wrangler";
import type { VectorizeVector } from "@cloudflare/workers-types";

describe("dataset upsert", () => {
const std = mockConsoleMethods();

runInTempDir();
mockAccountId();
mockApiToken();

const testVectors: VectorizeVector[] = [
{
id: "b0daca4a-ffd8-4865-926b-e24800af2a2d",
values: [0.2331, 1.0125, 0.6131, 0.9421, 0.9661, 0.8121],
metadata: { text: "She sells seashells by the seashore" },
},
{
id: "a44706aa-a366-48bc-8cc1-3feffd87d548",
values: [0.2321, 0.8121, 0.6315, 0.6151, 0.4121, 0.1512],
metadata: { text: "Peter Piper picked a peck of pickled peppers" },
},
{
id: "43cfcb31-07e2-411f-8bf9-f82a95ba8b96",
values: [0.0515, 0.7512, 0.8612, 0.2153, 0.1521, 0.6812],
metadata: {
text: "You know New York, you need New York, you know you need unique New York",
},
},
{
id: "15cc795d-93d3-416d-9a2a-36fa6fac73da",
values: [0.8525, 0.7751, 0.6326, 0.1512, 0.9655, 0.6626],
metadata: { text: "He threw three free throws" },
},
{
id: "15cc795d-93d3-416d-9a2a-36fa6fac73da",
values: [0.6323, 0.1111, 0.5136, 0.7512, 0.6632, 0.5254],
metadata: { text: "Which witch is which?" },
},
];

it("should batch uploads in ndjson format", async () => {
writeFileSync(
"vectors.ndjson",
testVectors.map((v) => JSON.stringify(v)).join(`\n`)
);

const batchSize = 3;
let insertRequestCount = 0;
msw.use(
rest.post(
"*/vectorize/indexes/:indexName/insert",
async (req, res, ctx) => {
expect(req.params.indexName).toEqual("my-index");
expect(req.headers.get("Content-Type")).toBe(
"text/plain;charset=UTF-8"
);
const formData = await (req as RestRequestWithFormData).formData();
const vectors = formData.get("vectors") as string;

if (insertRequestCount === 0) {
expect(formData).toMatchInlineSnapshot(`
FormData {
Symbol(state): Array [
Object {
"name": "vectors",
"value": "{\\"id\\":\\"b0daca4a-ffd8-4865-926b-e24800af2a2d\\",\\"values\\":[0.2331,1.0125,0.6131,0.9421,0.9661,0.8121],\\"metadata\\":{\\"text\\":\\"She sells seashells by the seashore\\"}}
{\\"id\\":\\"a44706aa-a366-48bc-8cc1-3feffd87d548\\",\\"values\\":[0.2321,0.8121,0.6315,0.6151,0.4121,0.1512],\\"metadata\\":{\\"text\\":\\"Peter Piper picked a peck of pickled peppers\\"}}
{\\"id\\":\\"43cfcb31-07e2-411f-8bf9-f82a95ba8b96\\",\\"values\\":[0.0515,0.7512,0.8612,0.2153,0.1521,0.6812],\\"metadata\\":{\\"text\\":\\"You know New York, you need New York, you know you need unique New York\\"}}",
},
],
}
`);
} else {
expect(formData).toMatchInlineSnapshot(`
FormData {
Symbol(state): Array [
Object {
"name": "vectors",
"value": "{\\"id\\":\\"15cc795d-93d3-416d-9a2a-36fa6fac73da\\",\\"values\\":[0.8525,0.7751,0.6326,0.1512,0.9655,0.6626],\\"metadata\\":{\\"text\\":\\"He threw three free throws\\"}}
{\\"id\\":\\"15cc795d-93d3-416d-9a2a-36fa6fac73da\\",\\"values\\":[0.6323,0.1111,0.5136,0.7512,0.6632,0.5254],\\"metadata\\":{\\"text\\":\\"Which witch is which?\\"}}",
},
],
}
`);
}
insertRequestCount++;

return res(
ctx.status(200),
ctx.json({
success: true,
errors: [],
messages: [],
result: { count: vectors.split(`\n`).length },
})
);
}
)
);

await runWrangler(
`vectorize insert my-index --file vectors.ndjson --batch-size ${batchSize}`
);

expect(insertRequestCount).toBe(2);
expect(std.out).toMatchInlineSnapshot(`
"✨ Uploading vector batch (3 vectors)
✨ Uploading vector batch (2 vectors)
✅ Successfully inserted 5 vectors into index 'my-index'"
`);
});
});

FormData.prototype.toString = mockFormDataToString;
export interface RestRequestWithFormData extends MockedRequest, RestRequest {
formData(): Promise<FormData>;
}
(MockedRequest.prototype as RestRequestWithFormData).formData =
mockFormDataFromString;

function mockFormDataToString(this: FormData) {
const entries = [];
for (const [key, value] of this.entries()) {
if (value instanceof Blob) {
const reader = new FileReaderSync();
reader.readAsText(value);
const result = reader.result;
entries.push([key, result]);
} else {
entries.push([key, value]);
}
}
return JSON.stringify({
__formdata: entries,
});
}

async function mockFormDataFromString(this: MockedRequest): Promise<FormData> {
const { __formdata } = await this.json();
expect(__formdata).toBeInstanceOf(Array);

const form = new FormData();
for (const [key, value] of __formdata) {
form.set(key, value);
}
return form;
}
11 changes: 7 additions & 4 deletions packages/wrangler/src/vectorize/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
VectorizeVector,
VectorizeVectorMutation,
} from "@cloudflare/workers-types";
import type { FormData } from "undici";

const jsonContentType = "application/json; charset=utf-8;";

Expand Down Expand Up @@ -97,29 +98,31 @@ export async function updateIndex(
export async function insertIntoIndex(
config: Config,
indexName: string,
vectors: Array<VectorizeVector>
body: FormData
): Promise<VectorizeVectorMutation> {
const accountId = await requireAuth(config);

return await fetchResult(
`/accounts/${accountId}/vectorize/indexes/${indexName}/insert`,
{
method: "POST",
body: JSON.stringify(vectors),
body: body,
}
);
}

export async function upsertIntoIndex(
config: Config,
indexName: string,
vectors: VectorizeVector[]
body: FormData
): Promise<VectorizeVectorMutation> {
const accountId = await requireAuth(config);

return await fetchResult(
`/accounts/${accountId}/vectorize/indexes/${indexName}/upsert`,
{
method: "POST",
body: JSON.stringify(vectors),
body: body,
}
);
}
Expand Down
4 changes: 3 additions & 1 deletion packages/wrangler/src/vectorize/create.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ export function options(yargs: CommonYargsArgv) {
preset: {
type: "string",
choices: [
"workers-ai/bge-small-en",
"@cf/baai/bge-small-en-v1.5",
"@cf/baai/bge-base-en-v1.5",
"@cf/baai/bge-large-en-v1.5",
"openai/text-embedding-ada-002",
"cohere/embed-multilingual-v2.0",
],
Expand Down
13 changes: 7 additions & 6 deletions packages/wrangler/src/vectorize/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { vectorizeBetaWarning } from "./common";
import { options as createOptions, handler as createHandler } from "./create";
import { options as deleteOptions, handler as deleteHandler } from "./delete";
import { options as getOptions, handler as getHandler } from "./get";
import { options as insertOptions, handler as insertHandler } from "./insert";
import { options as listOptions, handler as listHandler } from "./list";
import type { CommonYargsArgv } from "../yargs-types";

Expand Down Expand Up @@ -34,12 +35,12 @@ export function vectorize(yargs: CommonYargsArgv) {
// queryOptions,
// queryHandler
// )
// .command(
// "insert <name>",
// "Insert vectors into a Vectorize index",
// insertOptions,
// insertHandler
// )
.command(
"insert <name>",
"Insert vectors into a Vectorize index",
insertOptions,
insertHandler
)
.epilogue(vectorizeBetaWarning)
);
}
78 changes: 68 additions & 10 deletions packages/wrangler/src/vectorize/insert.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { createReadStream } from "node:fs";
import { type Interface as RLInterface, createInterface } from "node:readline";
import { File, FormData } from "undici";
import { readConfig } from "../config";
import { logger } from "../logger";
import { insertIntoIndex } from "./client";
Expand All @@ -6,7 +9,9 @@ import type {
CommonYargsArgv,
StrictYargsOptionsToInterface,
} from "../yargs-types";
import type { VectorizeVector } from "@cloudflare/workers-types";

const VECTORIZE_UPSERT_BATCH_SIZE = 5_000;
const VECTORIZE_MAX_UPSERT_VECTOR_RECORDS = 100_000;

export function options(yargs: CommonYargsArgv) {
return yargs
Expand All @@ -16,11 +21,22 @@ export function options(yargs: CommonYargsArgv) {
description: "The name of the Vectorize index.",
})
.options({
vectors: {
type: "array",
file: {
describe:
"A file containing line separated json (ndjson) vector objects.",
demandOption: true,
type: "string",
},
"batch-size": {
describe:
"An array of one or more vectors in JSON format to insert into the index",
"Number of vector records to include when sending to the Cloudflare API.",
type: "number",
default: VECTORIZE_UPSERT_BATCH_SIZE,
},
json: {
describe: "return output as clean JSON",
type: "boolean",
default: false,
},
})
.epilogue(vectorizeBetaWarning);
Expand All @@ -30,13 +46,55 @@ export async function handler(
args: StrictYargsOptionsToInterface<typeof options>
) {
const config = readConfig(args.config, args);
const rl = createInterface({ input: createReadStream(args.file) });

let vectorInsertCount = 0;
for await (const batch of getBatchFromFile(rl, args.batchSize)) {
const formData = new FormData();
formData.append(
"vectors",
new File([batch.join(`\n`)], "vectors.ndjson", {
type: "application/x-ndjson",
})
);
logger.log(`✨ Uploading vector batch (${batch.length} vectors)`);
const idxPart = await insertIntoIndex(config, args.name, formData);
vectorInsertCount += idxPart.count;

if (vectorInsertCount > VECTORIZE_MAX_UPSERT_VECTOR_RECORDS) {
logger.warn(
`🚧 While Vectorize is in beta, we've limited uploads to 100k vectors per run. You may run this again with another batch to upload further`
);
break;
}
}

if (args.json) {
logger.log(
JSON.stringify({ index: args.name, count: vectorInsertCount }, null, 2)
);
return;
}

const vectors: VectorizeVector[] = [];
if (args.vectors) {
// Parse each vector into a Vector type
// Think about potential limits on args.vectors.length?
logger.log(
`✅ Successfully inserted ${vectorInsertCount} vectors into index '${args.name}'`
);
}

// helper method that reads an ndjson file line by line in batches. not this doesn't
// actually do any parsing - that will be handled on the backend
// https://nodejs.org/docs/latest-v16.x/api/readline.html#rlsymbolasynciterator
async function* getBatchFromFile(
rl: RLInterface,
batchSize = VECTORIZE_UPSERT_BATCH_SIZE
) {
let batch: string[] = [];
for await (const line of rl) {
if (batch.push(line) >= batchSize) {
yield batch;
batch = [];
}
}

const index = await insertIntoIndex(config, args.name, vectors);
logger.log(JSON.stringify(index, null, 2));
yield batch;
}

0 comments on commit eade8a5

Please sign in to comment.