Skip to content

Commit

Permalink
Merge pull request #4213 from nestjs/fix/grpc-streaming-metadata
Browse files Browse the repository at this point in the history
fix(microservices): fix sending metadata with grpc streaming
  • Loading branch information
kamilmysliwiec authored Mar 3, 2020
2 parents 1a805ad + ee55ceb commit 5f95dbd
Showing 1 changed file with 17 additions and 8 deletions.
25 changes: 17 additions & 8 deletions packages/microservices/client/client-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,14 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
let upstreamSubscription: Subscription;

const upstreamSubjectOrData = args[0];
const maybeMetadata = args[1];

const isUpstreamSubject =
upstreamSubjectOrData && isFunction(upstreamSubjectOrData.subscribe);

const call =
isRequestStream && isUpstreamSubject
? client[methodName]()
? client[methodName](maybeMetadata)
: client[methodName](...args);

if (isRequestStream && isUpstreamSubject) {
Expand Down Expand Up @@ -215,13 +217,20 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {

if (isRequestStream && isUpstreamSubject) {
return new Observable(observer => {
const call = client[methodName]((error, data) => {
if (error) {
return observer.error(error);
}
observer.next(data);
observer.complete();
});
const callArgs = [
(error: unknown, data: unknown) => {
if (error) {
return observer.error(error);
}
observer.next(data);
observer.complete();
},
];
const maybeMetadata = args[1];
if (maybeMetadata) {
callArgs.unshift(maybeMetadata);
}
const call = client[methodName](...callArgs);
upstreamSubjectOrData.subscribe(
(val: unknown) => call.write(val),
(err: unknown) => call.emit('error', err),
Expand Down

0 comments on commit 5f95dbd

Please sign in to comment.