Skip to content

Commit

Permalink
Merge pull request #43 from deific/feat-12
Browse files Browse the repository at this point in the history
Feat 12
  • Loading branch information
www3com authored Sep 23, 2022
2 parents d1eaf63 + eae4ce4 commit 0b7b1eb
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 144 deletions.
211 changes: 87 additions & 124 deletions uprpc-app/src/rpc/client.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import { loadSync } from "@grpc/proto-loader";
import { credentials, GrpcObject, loadPackageDefinition, Metadata, ServiceError } from "@grpc/grpc-js";
import { credentials, GrpcObject, loadPackageDefinition, StatusObject, ServiceError } from "@grpc/grpc-js";
import { RequestData, ResponseData, Mode } from "../types";
import { parseMetadata, parseMds } from "./metadata";

let aliveClient = {};
let aliveSessions = {};

const clientCaches = {};
const callCache = {};
export declare function Callback(
response: ResponseData | null,
metadata?: any,
err?: Error,
closeStream?: boolean
): void;
interface CallOptions {

interface ClientStub {
service: any;
call: Function;
onData?: Function;
onEnd?: Function;
onError?: Function;
Expand Down Expand Up @@ -47,135 +49,45 @@ export async function send(request: RequestData, callback: typeof Callback) {
}

export async function stop(id: string, callback: (response: ResponseData | null, err?: Error) => void) {
let { call, methodMode } = aliveSessions[id];
let { call, methodMode } = callCache[id];
if (!!call) {
isWriteable(methodMode) ? call.end() : call.cancel();
delete aliveSessions[id];
Mode.isWriteStream(methodMode) ? call.end() : call.cancel();
delete callCache[id];
} else {
callback(null, new Error("This request not exist: " + id));
}
}

function invokeUnary(request: RequestData, callback: typeof Callback) {
let metadata = parseMds(request.mds || []);
let call = getCallStub(request, {
onData: (response: any) => {
console.log("客户端receive:", response);
callback(response);
},
onError: (err: any) => {
console.log("客户端receive:", err);
callback(null, parseMetadata(err.metadata), err);
},
});

let client: ClientStub = getCallStub(request);
client.call(request, callback);
return;
}

function invokeServerStream(request: RequestData, callback: typeof Callback) {
let call = getCallStub(request, {
onData: (response: any) => {
console.log("客户端receive:", response);
callback(response);
},
onEnd: (s: any) => callback(null, null, undefined, true),
onError: (err: any, response: any) => callback(response, parseMetadata(err.metadata), err, true),
onMetadata: (metadata: any) => callback(null),
// onStatus: (status: any) => callback(null),
});
let client: ClientStub = getCallStub(request);
let call = client.call(request, callback);
}

function invokeClientStream(request: RequestData, callback: typeof Callback) {
let call = getCallStub(request, {
onEnd: (response: any) => {
console.log("客户端receive:", response);
callback(response);
},
onError: (err: any, response: any) => callback(response, parseMetadata(err.metadata), err, true),
});
let client: ClientStub = getCallStub(request);
let call = client.call(request, callback);
call.write(JSON.parse(request.body));
}

function invokeBidirectionalStream(request: RequestData, callback: typeof Callback) {
let call = getCallStub(request, {
onData: (response: any) => {
console.log("客户端receive:", response);
callback(response);
},
onEnd: () => callback(null, null, undefined, true),
onError: (err: any, response: any) => callback(response, parseMetadata(err.metadata), err, true),
onMetadata: (metadata: any) => callback(null),
onStatus: (status: any) => callback(null),
});
let client: ClientStub = getCallStub(request);
let call = client.call(request, callback);
call.write(JSON.parse(request.body));
}

function getCallStub(request: RequestData, callOptions: CallOptions) {
let client = getClient(request);
return getOrCreateSession(client, request, callOptions);
}

function getOrCreateSession(client: any, request: RequestData, callOptions: CallOptions) {
if (request.methodMode === Mode.Unary) {
try {
client[request.methodName](JSON.parse(request.body), (err: ServiceError, response: any) => {
console.log("收到服务端返回数据:", err, response);
if (err != null) {
callOptions.onError ? callOptions.onError(err, response) : void 0;
} else {
callOptions.onData ? callOptions.onData(response) : void 0;
}
});
} catch (err) {
callOptions.onError ? callOptions.onError(err, null) : void 0;
}
return;
function getCallStub(request: RequestData): ClientStub {
let clientKey = request.serviceName;
if (clientCaches[clientKey] && request.host == clientCaches[clientKey].host) {
return clientCaches[clientKey];
}

let call = null;
if (!!aliveSessions[request.id]) {
call = aliveSessions[request.id].call;
}

call =
request.methodMode === Mode.BidirectionalStream
? client[request.methodName]()
: client[request.methodName](JSON.parse(request.body));

if (callOptions?.onData) {
call.on("data", callOptions.onData);
}
if (callOptions?.onEnd) {
call.on("end", (data: any) => {
console.log("服务器发送end,客户端关闭", data);
delete aliveSessions[request.id];
callOptions.onEnd ? callOptions.onEnd() : void 0;
});
}
if (callOptions?.onError) {
call.on("error", (e: Error) => {
console.log("发生异常,客户端关闭");
delete aliveSessions[request.id];
callOptions.onError ? callOptions.onError(e) : void 0;
});
}
if (callOptions?.onMetadata) {
call.on("metadata", callOptions.onMetadata);
}
if (callOptions?.onStatus) {
call.on("status", (s: any) => {
console.log("连接状态发生变化:", s);
callOptions.onStatus ? callOptions.onStatus(s) : void 0;
});
}
aliveSessions[request.id] = {
call: call,
methodMode: request.methodMode,
};
return call;
}

function getClient(request: RequestData) {
let packageDefinition = loadSync([request.protoPath], {
keepCase: true,
longs: String,
Expand All @@ -186,28 +98,79 @@ function getClient(request: RequestData) {
});

let grpcObject: GrpcObject = loadPackageDefinition(packageDefinition);

let service = null;
if (request.namespace == "") {
service = grpcObject[request.namespace];
} else {
service = grpcObject[request.namespace][request.serviceName];
}

if (request.methodMode === Mode.Unary) {
return new service(request.host, credentials.createInsecure());
}
let serviceImpl = new service(request.host, credentials.createInsecure());
let clientStub = {
host: request.host,
service: serviceImpl,
call: (request: RequestData, callback?: Function) => createCall(serviceImpl)(request, callback),
};
clientCaches[clientKey] = clientStub;
return clientStub;
}

let client = null;
if (!aliveClient[request.serviceName]) {
client = new service(request.host, credentials.createInsecure());
aliveClient[request.serviceName] = client;
} else {
client = aliveClient[request.serviceName];
}
return client;
function createCall(serviceImpl: any): Function {
return (request: RequestData, callback: CallableFunction) => {
let call: any = callCache[request.id];
if (call) {
return call.call;
}
let matadata = parseMds(request.mds || []);

if (Mode.isGrpcCallback(request.methodMode)) {
let grpcCallback = (err: ServiceError, response: any) => {
console.log("收到服务端返回数据:", err, response);
callback(response, parseMetadata(err?.metadata), err);
};
if (request.methodMode === Mode.Unary) {
call = serviceImpl[request.methodName](JSON.parse(request.body), matadata, grpcCallback);
} else {
call = serviceImpl[request.methodName](matadata, grpcCallback);
}
} else {
if (request.methodMode === Mode.BidirectionalStream) {
call = serviceImpl[request.methodName]();
} else {
call = serviceImpl[request.methodName](matadata, JSON.parse(request.body));
}
}

if (Mode.isStream(request.methodMode)) {
listenStatusAndCallback(request.id, call, callback);
if (Mode.isReadStream(request.methodMode)) {
listenDataAndCallback(request.id, call, callback);
}
callCache[request.id] = { methodMode: request.methodMode, call: call };
}
return call;
};
}

function isWriteable(mode: Mode): boolean {
return mode === Mode.BidirectionalStream || mode === Mode.ClientStream;
function listenDataAndCallback(reqId: any, call: any, callback?: CallableFunction) {
call.on("data", (data: any) => {
console.log("data收到数据:", data);
if (callback) {
callback(data);
}
});
}
function listenStatusAndCallback(reqId: any, call: any, callback?: CallableFunction) {
call.on("error", (e: Error) => {
console.log("发生异常,客户端关闭");
});
call.on("status", (status: StatusObject) => {
console.log("status收到数据:", status);
if (callback) {
status.code === 0
? callback(null, parseMetadata(status.metadata), undefined, true)
: callback(null, null, new Error(status.details), true);
}
delete callCache[reqId];
});
}
11 changes: 9 additions & 2 deletions uprpc-app/src/rpc/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ export function parseMds(mds: MD[]): Metadata {
let metadata = new Metadata();
if (mds) {
mds.forEach((item: MD) => {
metadata.add(item.key, item.value);
if (item.key.endsWith("-bin")) {
metadata.add(item.key, Buffer.from(item.value));
} else {
metadata.add(item.key, item.value);
}
});
}
return metadata;
}

export function parseMetadata(metadata: Metadata): MD[] {
export function parseMetadata(metadata: Metadata): MD[] | null {
if (!!!metadata) {
return null;
}
let mds: MD[] = [];
Object.keys(metadata.getMap()).forEach((key: string, index: number) => {
let values = metadata.get(key);
Expand Down
10 changes: 6 additions & 4 deletions uprpc-app/src/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ function returnResponse(
e?: Error,
closeStream?: boolean
): void {
if (closeStream) {
console.log("endStream ", req.id);
window.webContents.send("endStream", req.id);
} else {
if (res || md) {
let resData: ResponseData = {
id: req.id,
body: JSON.stringify(res ? res : e?.message, null, "\t"),
Expand All @@ -74,4 +71,9 @@ function returnResponse(
console.log("return response data: ", resData);
window.webContents.send("updateResponse", resData);
}

if (closeStream) {
console.log("endStream ", req.id);
window.webContents.send("endStream", req.id);
}
}
15 changes: 15 additions & 0 deletions uprpc-app/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ export enum Mode {
ServerStream = 2,
BidirectionalStream = 3,
}
export namespace Mode {
export function isGrpcCallback(mode: Mode) {
return mode === Mode.Unary || mode === Mode.ClientStream;
}
export function isWriteStream(mode: Mode): boolean {
return mode === Mode.BidirectionalStream || mode === Mode.ClientStream;
}
export function isReadStream(mode: Mode): boolean {
return mode === Mode.BidirectionalStream || mode === Mode.ServerStream;
}

export function isStream(mode: Mode): boolean {
return mode === Mode.BidirectionalStream || mode === Mode.ServerStream || mode === Mode.ClientStream;
}
}

export interface Method {
id: string;
Expand Down
13 changes: 10 additions & 3 deletions uprpc-mock/proto/helloworld.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ package helloworld;
service Greeter {
// 简单gRPC调用
rpc sayHelloSimple (HelloRequest) returns (HelloReply) {}

// 简单gRPC调用
rpc sayHelloSimpleError (HelloRequest) returns (HelloReply) {}
// 服务端流式调用
rpc sayHelloServer (HelloRequest) returns (stream HelloReply) {}

// 客户端流式调用
rpc sayHelloClient (stream HelloRequest) returns (HelloReply) {}

// 客户端服务端双向流
rpc sayHelloDouble (stream HelloRequest) returns (stream HelloReply) {}
}









message HelloRequest {
string name = 1;
}
Expand Down
Loading

0 comments on commit 0b7b1eb

Please sign in to comment.