From 53bc07685eb9310b75bc972e7e7d1d3a202f0fbf Mon Sep 17 00:00:00 2001 From: Antonio Mindov Date: Tue, 26 Sep 2023 11:08:13 +0300 Subject: [PATCH] fix: topic subscription error handling Signed-off-by: Antonio Mindov --- examples/consensus-pub-sub.js | 7 +++++-- src/channel/NodeMirrorChannel.js | 10 +++++++--- src/topic/TopicMessageQuery.js | 10 ++++++---- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/examples/consensus-pub-sub.js b/examples/consensus-pub-sub.js index 1357ed2fc..81e3fd267 100644 --- a/examples/consensus-pub-sub.js +++ b/examples/consensus-pub-sub.js @@ -37,8 +37,11 @@ async function main() { new TopicMessageQuery() .setTopicId(topicId) .setStartTime(0) - .subscribe(client, null, (message) => - console.log(Buffer.from(message.contents).toString("utf8")) + .subscribe(client, (message, error) => { + console.log(message) + console.log(error) + }, (message) => + console.log(Buffer.from(message.contents).toString("utf8")) ); for (let i = 0; ; i += 1) { diff --git a/src/channel/NodeMirrorChannel.js b/src/channel/NodeMirrorChannel.js index 6dcb762a3..164b5f32d 100644 --- a/src/channel/NodeMirrorChannel.js +++ b/src/channel/NodeMirrorChannel.js @@ -45,7 +45,11 @@ export default class NodeMirrorChannel extends MirrorChannel { address, address.endsWith(":50212") || address.endsWith(":443") ? grpc.credentials.createSsl() - : grpc.credentials.createInsecure() + : grpc.credentials.createInsecure(), + { + "grpc.keepalive_time_ms": 90000, + "grpc.keepalive_timeout_ms": 5000, + } ); } @@ -94,8 +98,8 @@ export default class NodeMirrorChannel extends MirrorChannel { } }) // eslint-disable-next-line @typescript-eslint/no-unused-vars - .on("error", (/** @type {grpc.StatusObject} */ _) => { - // Do nothing + .on("error", (/** @type {grpc.StatusObject} */ err) => { + error(err) }); return () => { diff --git a/src/topic/TopicMessageQuery.js b/src/topic/TopicMessageQuery.js index 2b74ee764..7ffac8013 100644 --- a/src/topic/TopicMessageQuery.js +++ b/src/topic/TopicMessageQuery.js @@ -48,7 +48,7 @@ export default class TopicMessageQuery extends Query { * @param {TopicId | string} [props.topicId] * @param {Timestamp} [props.startTime] * @param {Timestamp} [props.endTime] - * @param {(message: TopicMessage, error: Error)=> void} [props.errorHandler] + * @param {(message: TopicMessage | null, error: Error)=> void} [props.errorHandler] * @param {() => void} [props.completionHandler] * @param {(error: MirrorError | Error | null) => boolean} [props.retryHandler] * @param {Long | number} [props.limit] @@ -94,7 +94,7 @@ export default class TopicMessageQuery extends Query { /** * @private - * @type {(message: TopicMessage, error: Error) => void} + * @type {(message: TopicMessage | null, error: Error) => void} */ // eslint-disable-next-line @typescript-eslint/no-unused-vars this._errorHandler = (message, error) => { @@ -278,7 +278,7 @@ export default class TopicMessageQuery extends Query { } /** - * @param {(message: TopicMessage, error: Error)=> void} errorHandler + * @param {(message: TopicMessage | null, error: Error)=> void} errorHandler * @returns {TopicMessageQuery} */ setErrorHandler(errorHandler) { @@ -321,7 +321,7 @@ export default class TopicMessageQuery extends Query { /** * @param {Client} client - * @param {((message: TopicMessage, error: Error) => void) | null} errorHandler + * @param {((message: TopicMessage | null, error: Error) => void) | null} errorHandler * @param {(message: TopicMessage) => void} listener * @returns {SubscriptionHandle} */ @@ -461,6 +461,8 @@ export default class TopicMessageQuery extends Query { setTimeout(() => { this._makeServerStreamRequest(client); }, delay); + } else { + this._errorHandler(null, new Error(message)); } }, this._completionHandler