Skip to content

Commit

Permalink
- [FEAT] [OS] expose timeout on put, for really large assets, it is p…
Browse files Browse the repository at this point in the history
…ossible for the publish request to timeout
  • Loading branch information
aricart committed Jan 11, 2023
1 parent f9ebbf5 commit c794a87
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 9 deletions.
1 change: 1 addition & 0 deletions nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export type {
ObjectStoreMeta,
ObjectStoreMetaOptions,
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
PeerInfo,
Placement,
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ export type {
ObjectStoreMeta,
ObjectStoreMetaOptions,
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
PeerInfo,
Perf,
Expand Down
19 changes: 14 additions & 5 deletions nats-base-client/objectstore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The NATS Authors
* Copyright 2022-2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -26,6 +26,7 @@ import {
ObjectStoreMeta,
ObjectStoreMetaOptions,
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
PubAck,
PurgeResponse,
Expand Down Expand Up @@ -306,8 +307,12 @@ export class ObjectStoreImpl implements ObjectStore {
async _put(
meta: ObjectStoreMeta,
rs: ReadableStream<Uint8Array> | null,
opts?: ObjectStorePutOpts,
): Promise<ObjectInfo> {
const jsi = this.js as JetStreamClientImpl;
opts = opts || { timeout: jsi.timeout };
opts.timeout = opts.timeout || 1000;
const { timeout } = opts;
const maxPayload = jsi.nc.info?.max_payload || 1024;
meta = meta || {} as ObjectStoreMeta;
meta.options = meta.options || {};
Expand Down Expand Up @@ -351,7 +356,7 @@ export class ObjectStoreImpl implements ObjectStore {
sha.update(payload);
info.chunks!++;
info.size! += payload.length;
proms.push(this.js.publish(chunkSubj, payload));
proms.push(this.js.publish(chunkSubj, payload, { timeout }));
}
info.mtime = new Date().toISOString();
const digest = sha.digest("base64");
Expand All @@ -364,7 +369,10 @@ export class ObjectStoreImpl implements ObjectStore {
const h = headers();
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
proms.push(
this.js.publish(metaSubj, JSONCodec().encode(info), { headers: h }),
this.js.publish(metaSubj, JSONCodec().encode(info), {
headers: h,
timeout,
}),
);

// if we had this object trim it out
Expand All @@ -389,7 +397,7 @@ export class ObjectStoreImpl implements ObjectStore {
const payload = db.drain(meta.options.max_chunk_size);
sha.update(payload);
proms.push(
this.js.publish(chunkSubj, payload),
this.js.publish(chunkSubj, payload, { timeout }),
);
}
}
Expand All @@ -406,13 +414,14 @@ export class ObjectStoreImpl implements ObjectStore {
put(
meta: ObjectStoreMeta,
rs: ReadableStream<Uint8Array> | null,
opts?: ObjectStorePutOpts,
): Promise<ObjectInfo> {
if (meta?.options?.link) {
return Promise.reject(
new Error("link cannot be set when putting the object in bucket"),
);
}
return this._put(meta, rs);
return this._put(meta, rs, opts);
}

async get(name: string): Promise<ObjectResult | null> {
Expand Down
8 changes: 8 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2919,13 +2919,21 @@ export type ObjectResult = {
error: Promise<Error | null>;
};

export type ObjectStorePutOpts = {
/**
* maximum number of millis for the put requests to succeed
*/
timeout: number;
};

export interface ObjectStore {
info(name: string): Promise<ObjectInfo | null>;
list(): Promise<ObjectInfo[]>;
get(name: string): Promise<ObjectResult | null>;
put(
meta: ObjectStoreMeta,
rs: ReadableStream<Uint8Array>,
opts?: ObjectStorePutOpts,
): Promise<ObjectInfo>;
delete(name: string): Promise<PurgeResponse>;
link(name: string, meta: ObjectInfo): Promise<ObjectInfo>;
Expand Down
7 changes: 3 additions & 4 deletions tests/service_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -754,9 +754,8 @@ Deno.test("service - cross platform service test", async () => {
];

const p = Deno.run({ cmd: args, stderr: "piped", stdout: "piped" });
const [status, stdout, stderr] = await Promise.all([
const [status, stderr] = await Promise.all([
p.status(),
p.output(),
p.stderrOutput(),
]);

Expand All @@ -776,8 +775,8 @@ Deno.test("service - stats name respects assigned name", async () => {
version: "0.0.1",
endpoint: {
subject: "q",
handler: (err, msg) => {
msg.respond();
handler: (_err, msg) => {
msg?.respond();
},
},
});
Expand Down

0 comments on commit c794a87

Please sign in to comment.