From c794a875ff9942c750f0e60bfbec723bc92289aa Mon Sep 17 00:00:00 2001 From: aricart Date: Wed, 11 Jan 2023 15:36:58 -0400 Subject: [PATCH] - [FEAT] [OS] expose timeout on put, for really large assets, it is possible for the publish request to timeout --- nats-base-client/internal_mod.ts | 1 + nats-base-client/mod.ts | 1 + nats-base-client/objectstore.ts | 19 ++++++++++++++----- nats-base-client/types.ts | 8 ++++++++ tests/service_test.ts | 7 +++---- 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/nats-base-client/internal_mod.ts b/nats-base-client/internal_mod.ts index 18822a35..7f0eff08 100644 --- a/nats-base-client/internal_mod.ts +++ b/nats-base-client/internal_mod.ts @@ -59,6 +59,7 @@ export type { ObjectStoreMeta, ObjectStoreMetaOptions, ObjectStoreOptions, + ObjectStorePutOpts, ObjectStoreStatus, PeerInfo, Placement, diff --git a/nats-base-client/mod.ts b/nats-base-client/mod.ts index e5350097..97b7eb9e 100644 --- a/nats-base-client/mod.ts +++ b/nats-base-client/mod.ts @@ -115,6 +115,7 @@ export type { ObjectStoreMeta, ObjectStoreMetaOptions, ObjectStoreOptions, + ObjectStorePutOpts, ObjectStoreStatus, PeerInfo, Perf, diff --git a/nats-base-client/objectstore.ts b/nats-base-client/objectstore.ts index a69ba459..305a7b51 100644 --- a/nats-base-client/objectstore.ts +++ b/nats-base-client/objectstore.ts @@ -1,5 +1,5 @@ /* - * Copyright 2022 The NATS Authors + * Copyright 2022-2023 The NATS Authors * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -26,6 +26,7 @@ import { ObjectStoreMeta, ObjectStoreMetaOptions, ObjectStoreOptions, + ObjectStorePutOpts, ObjectStoreStatus, PubAck, PurgeResponse, @@ -306,8 +307,12 @@ export class ObjectStoreImpl implements ObjectStore { async _put( meta: ObjectStoreMeta, rs: ReadableStream | null, + opts?: ObjectStorePutOpts, ): Promise { const jsi = this.js as JetStreamClientImpl; + opts = opts || { timeout: jsi.timeout }; + opts.timeout = opts.timeout || 1000; + const { timeout } = opts; const maxPayload = jsi.nc.info?.max_payload || 1024; meta = meta || {} as ObjectStoreMeta; meta.options = meta.options || {}; @@ -351,7 +356,7 @@ export class ObjectStoreImpl implements ObjectStore { sha.update(payload); info.chunks!++; info.size! += payload.length; - proms.push(this.js.publish(chunkSubj, payload)); + proms.push(this.js.publish(chunkSubj, payload, { timeout })); } info.mtime = new Date().toISOString(); const digest = sha.digest("base64"); @@ -364,7 +369,10 @@ export class ObjectStoreImpl implements ObjectStore { const h = headers(); h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject); proms.push( - this.js.publish(metaSubj, JSONCodec().encode(info), { headers: h }), + this.js.publish(metaSubj, JSONCodec().encode(info), { + headers: h, + timeout, + }), ); // if we had this object trim it out @@ -389,7 +397,7 @@ export class ObjectStoreImpl implements ObjectStore { const payload = db.drain(meta.options.max_chunk_size); sha.update(payload); proms.push( - this.js.publish(chunkSubj, payload), + this.js.publish(chunkSubj, payload, { timeout }), ); } } @@ -406,13 +414,14 @@ export class ObjectStoreImpl implements ObjectStore { put( meta: ObjectStoreMeta, rs: ReadableStream | null, + opts?: ObjectStorePutOpts, ): Promise { if (meta?.options?.link) { return Promise.reject( new Error("link cannot be set when putting the object in bucket"), ); } - return this._put(meta, rs); + return this._put(meta, rs, opts); } async get(name: string): Promise { diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 6a49bfda..f815723a 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -2919,6 +2919,13 @@ export type ObjectResult = { error: Promise; }; +export type ObjectStorePutOpts = { + /** + * maximum number of millis for the put requests to succeed + */ + timeout: number; +}; + export interface ObjectStore { info(name: string): Promise; list(): Promise; @@ -2926,6 +2933,7 @@ export interface ObjectStore { put( meta: ObjectStoreMeta, rs: ReadableStream, + opts?: ObjectStorePutOpts, ): Promise; delete(name: string): Promise; link(name: string, meta: ObjectInfo): Promise; diff --git a/tests/service_test.ts b/tests/service_test.ts index 2701400e..a6cbcf66 100644 --- a/tests/service_test.ts +++ b/tests/service_test.ts @@ -754,9 +754,8 @@ Deno.test("service - cross platform service test", async () => { ]; const p = Deno.run({ cmd: args, stderr: "piped", stdout: "piped" }); - const [status, stdout, stderr] = await Promise.all([ + const [status, stderr] = await Promise.all([ p.status(), - p.output(), p.stderrOutput(), ]); @@ -776,8 +775,8 @@ Deno.test("service - stats name respects assigned name", async () => { version: "0.0.1", endpoint: { subject: "q", - handler: (err, msg) => { - msg.respond(); + handler: (_err, msg) => { + msg?.respond(); }, }, });