Skip to content

Commit

Permalink
fix: avoid unhandled exception when reusing a closed client
Browse files Browse the repository at this point in the history
Example:

* Execute streamingRecognize
* When done, close the client
* Execute streamingRecognize again, with data already in the stream
* Unhandled exception occurs.

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object
    at _write (node:internal/streams/writable:474:13)
    at Writable.write (node:internal/streams/writable:502:10)
    at Duplexify._write (/project/node_modules/duplexify/index.js:212:22)
    at doWrite (/project/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:390:139)
    at writeOrBuffer (/project/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:381:5)
    at Writable.write (/project/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:302:11)
    at Pumpify.<anonymous> (/project/node_modules/@google-cloud/speech/build/src/helpers.js:79:27)
    at Object.onceWrapper (node:events:633:26)
    at Pumpify.emit (node:events:518:28)
    at obj.<computed> [as _write] (/project/node_modules/stubs/index.js:28:22)
    at doWrite (/project/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:390:139)
    at writeOrBuffer (/project/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:381:5)
    at Writable.write (/project/node_modules/duplexify/node_modules/readable-stream/lib/_stream_writable.js:302:11)
    at PassThrough.ondata (node:internal/streams/readable:1007:22)
    at PassThrough.emit (node:events:518:28)
    at addChunk (node:internal/streams/readable:559:12) {
  code: 'ERR_INVALID_ARG_TYPE'

Reproduction: https://gist.github.com/orgads/13cbf44c91923da27d8772b5f10489c9

This happens because streamingRecognize writes streamingConfig on first
chunk. Usually the stream is open in object mode, so it works, but when
the client is terminated, PassThrough is used without object mode.

Change PassThrough to object mode, so it terminates gracefully.

This was reported in [1] and fixed in [2], but was then reverted in [3]
which re-generated the code. Fix properly in the generator now.

[1] googleapis/google-cloud-node#5464
[2] googleapis/google-cloud-node#5465
[3] googleapis/google-cloud-node#5565
  • Loading branch information
orgads committed Aug 25, 2024
1 parent e346246 commit 8fe4485
Show file tree
Hide file tree
Showing 16 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ export class BigQueryStorageClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ export class BigQueryStorageClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export class EchoClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ export class MessagingClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ export class EchoClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export class MessagingClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ export class LoggingServiceV2Client {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ export class LoggingServiceV2Client {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ export class EchoClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ export class MessagingClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export class EchoClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ export class EchoClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
2 changes: 1 addition & 1 deletion baselines/showcase/src/v1beta1/echo_client.ts.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ export class EchoClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ export class MessagingClient {
stub => (...args: Array<{}>) => {
if (this._terminated) {
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ export class {{ service.name }}Client {
if (this._terminated) {
{%- if service.streaming.length > 0 %}
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ export class {{ service.name }}Client {
if (this._terminated) {
{%- if service.streaming.length > 0 %}
if (methodName in this.descriptors.stream) {
const stream = new PassThrough();
const stream = new PassThrough({objectMode: true});
setImmediate(() => {
stream.emit('error', new this._gaxModule.GoogleError('The client has already been closed.'));
});
Expand Down

0 comments on commit 8fe4485

Please sign in to comment.