Skip to content

Commit

Permalink
Grpc endpoint parsing (#543)
Browse files Browse the repository at this point in the history
* Adds new parsing logic

Signed-off-by: Elena Kolevska <[email protected]>

* wip

Signed-off-by: Elena Kolevska <[email protected]>

* Cleanup

Signed-off-by: Elena Kolevska <[email protected]>

* http endpoint fix

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes linter errors

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes

Signed-off-by: Elena Kolevska <[email protected]>

* Reorganises code

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes tests

Signed-off-by: Elena Kolevska <[email protected]>

* Removes unneeded test (after rebase)

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes rebase manual merge

Signed-off-by: Elena Kolevska <[email protected]>

* Make dapr DaprClient.options not partial again

Signed-off-by: Elena Kolevska <[email protected]>

* Reorganises the network classes

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes linter errors

Signed-off-by: Elena Kolevska <[email protected]>

* Apply suggestions from code review

Co-authored-by: Shubham Sharma <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* Adds changes from review

Signed-off-by: Elena Kolevska <[email protected]>

* Apply suggestions from code review

Co-authored-by: Shubham Sharma <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>

* WIP adding comments from review

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes linter

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes import

Signed-off-by: Elena Kolevska <[email protected]>

* Accounts for ipv6 addresses for http endpoints

Signed-off-by: Elena Kolevska <[email protected]>

* Fix

Signed-off-by: Elena Kolevska <[email protected]>

* Fixes linter

Signed-off-by: Elena Kolevska <[email protected]>

---------

Signed-off-by: Elena Kolevska <[email protected]>
Signed-off-by: Elena Kolevska <[email protected]>
Co-authored-by: Shubham Sharma <[email protected]>
  • Loading branch information
elena-kolevska and shubham1172 authored Dec 5, 2023
1 parent d2cc38e commit 998f8ee
Show file tree
Hide file tree
Showing 12 changed files with 972 additions and 629 deletions.
29 changes: 22 additions & 7 deletions src/implementation/Client/DaprClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,22 @@ export default class DaprClient {
private readonly logger: Logger;

constructor(options: Partial<DaprClientOptions> = {}) {
this.options = getClientOptions(options, Settings.getDefaultCommunicationProtocol(), undefined);
this.logger = new Logger("DaprClient", "DaprClient", this.options.logger);

// Validation on port
if (this.options.daprPort && !/^[0-9]+$/.test(this.options.daprPort)) {
options = getClientOptions(options, Settings.getDefaultCommunicationProtocol(), undefined);
this.logger = new Logger("DaprClient", "DaprClient", options.logger);

// Legacy validation on port
// URI validation is done later, when we instantiate the HttpEndpoint or GrpcEndpoint
// object in the HttpClient or GrpcClient constructor, but we need to
// keep this additional check for backward compatibility
// TODO: Remove this validation in the next major version
if (options?.daprPort && !/^[0-9]+$/.test(options?.daprPort)) {
throw new Error("DAPR_INCORRECT_SIDECAR_PORT");
}

// Builder
switch (options.communicationProtocol) {
case CommunicationProtocolEnum.GRPC: {
const client = new GRPCClient(this.options);
const client = new GRPCClient(options);
this.daprClient = client;

this.state = new GRPCClientState(client);
Expand All @@ -119,7 +123,7 @@ export default class DaprClient {
}
case CommunicationProtocolEnum.HTTP:
default: {
const client = new HTTPClient(this.options);
const client = new HTTPClient(options);
this.daprClient = client;

this.actor = new HTTPClientActor(client); // we use an abstractor here since we interface through a builder with the Actor Runtime
Expand All @@ -139,6 +143,17 @@ export default class DaprClient {
break;
}
}

this.options = {
daprHost: this.daprClient.options.daprHost,
daprPort: this.daprClient.options.daprPort,
communicationProtocol: this.daprClient.options.communicationProtocol,
isKeepAlive: options.isKeepAlive,
logger: options.logger,
actor: options.actor,
daprApiToken: options.daprApiToken,
maxBodySizeMb: options.maxBodySizeMb,
};
}

static create(client: IClient): DaprClient {
Expand Down
63 changes: 39 additions & 24 deletions src/implementation/Client/GRPCClient/GRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { Logger } from "../../../logger/Logger";
import GRPCClientSidecar from "./sidecar";
import DaprClient from "../DaprClient";
import { SDK_VERSION } from "../../../version";
import communicationProtocolEnum from "../../../enum/CommunicationProtocol.enum";
import { GrpcEndpoint } from "../../../network/GrpcEndpoint";

export default class GRPCClient implements IClient {
readonly options: DaprClientOptions;
Expand All @@ -29,17 +31,34 @@ export default class GRPCClient implements IClient {
private readonly clientCredentials: grpc.ChannelCredentials;
private readonly logger: Logger;
private readonly grpcClientOptions: Partial<grpc.ClientOptions>;
private daprEndpoint: GrpcEndpoint;

constructor(options: Partial<DaprClientOptions>) {
this.daprEndpoint = this.generateEndpoint(options);

this.options = {
daprHost: this.daprEndpoint.hostname,
daprPort: this.daprEndpoint.port,
communicationProtocol: communicationProtocolEnum.GRPC,
isKeepAlive: options?.isKeepAlive,
logger: options?.logger,
actor: options?.actor,
daprApiToken: options?.daprApiToken,
maxBodySizeMb: options?.maxBodySizeMb,
};

constructor(options: DaprClientOptions) {
this.options = options;
this.clientCredentials = this.generateCredentials();
this.grpcClientOptions = this.generateChannelOptions();

this.logger = new Logger("GRPCClient", "GRPCClient", options.logger);
this.isInitialized = false;

this.logger.info(`Opening connection to ${this.options.daprHost}:${this.options.daprPort}`);
this.client = this.generateClient(this.options.daprHost, this.options.daprPort);
this.client = new GrpcDaprClient(
this.daprEndpoint.endpoint,
this.getClientCredentials(),
this.getGrpcClientOptions(),
);
}

async getClient(requiresInitialization = true): Promise<GrpcDaprClient> {
Expand All @@ -59,8 +78,24 @@ export default class GRPCClient implements IClient {
return this.grpcClientOptions;
}

private generateEndpoint(options: Partial<DaprClientOptions>): GrpcEndpoint {
const host = options?.daprHost ?? Settings.getDefaultHost();
const port = options?.daprPort ?? Settings.getDefaultGrpcPort();
let uri = `${host}:${port}`;

if (!(options?.daprHost || options?.daprPort)) {
// If neither host nor port are specified, check the endpoint environment variable.
const endpoint = Settings.getDefaultGrpcEndpoint();
if (endpoint != "") {
uri = endpoint;
}
}

return new GrpcEndpoint(uri);
}

private generateCredentials(): grpc.ChannelCredentials {
if (this.options.daprHost.startsWith("https")) {
if (this.daprEndpoint?.tls) {
return grpc.ChannelCredentials.createSsl();
}
return grpc.ChannelCredentials.createInsecure();
Expand Down Expand Up @@ -93,26 +128,6 @@ export default class GRPCClient implements IClient {
return options;
}

private generateClient(host: string, port: string): GrpcDaprClient {
return new GrpcDaprClient(
GRPCClient.getEndpoint(host, port),
this.getClientCredentials(),
this.getGrpcClientOptions(),
);
}

// The grpc client doesn't allow http:// or https:// for grpc connections,
// so we need to remove it, if it exists
static getEndpoint(host: string, port: string): string {
let endpoint = `${host}:${port}`;
const parts = endpoint.split("://");
if (parts.length > 1 && parts[0].startsWith("http")) {
endpoint = parts[1];
}

return endpoint;
}

private generateInterceptors(): (options: any, nextCall: any) => grpc.InterceptingCall {
return (options: any, nextCall: any) => {
return new grpc.InterceptingCall(nextCall(options), {
Expand Down
41 changes: 34 additions & 7 deletions src/implementation/Client/HTTPClient/HTTPClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { Logger } from "../../../logger/Logger";
import HTTPClientSidecar from "./sidecar";
import { SDK_VERSION } from "../../../version";
import * as SerializerUtil from "../../../utils/Serializer.util";
import communicationProtocolEnum from "../../../enum/CommunicationProtocol.enum";
import { HttpEndpoint } from "../../../network/HttpEndpoint";

export default class HTTPClient implements IClient {
readonly options: DaprClientOptions;
Expand All @@ -34,16 +36,25 @@ export default class HTTPClient implements IClient {

private static httpAgent: http.Agent;
private static httpsAgent: https.Agent;
private daprEndpoint: HttpEndpoint;

constructor(options: Partial<DaprClientOptions>) {
this.daprEndpoint = this.generateEndpoint(options);

this.options = {
daprHost: this.daprEndpoint.hostname,
daprPort: this.daprEndpoint.port,
communicationProtocol: communicationProtocolEnum.HTTP,
isKeepAlive: options?.isKeepAlive,
logger: options?.logger,
actor: options?.actor,
daprApiToken: options?.daprApiToken,
maxBodySizeMb: options?.maxBodySizeMb,
};

constructor(options: DaprClientOptions) {
this.options = options;
this.logger = new Logger("HTTPClient", "HTTPClient", this.options.logger);
this.isInitialized = false;

this.clientUrl = `${this.options.daprHost}:${this.options.daprPort}/v1.0`;
if (!this.clientUrl.startsWith("http://") && !this.clientUrl.startsWith("https://")) {
this.clientUrl = `http://${this.clientUrl}`;
}
this.clientUrl = `${this.daprEndpoint.endpoint}/v1.0`;

if (!HTTPClient.client) {
HTTPClient.client = fetch;
Expand All @@ -63,6 +74,22 @@ export default class HTTPClient implements IClient {
}
}

private generateEndpoint(options: Partial<DaprClientOptions>): HttpEndpoint {
const host = options?.daprHost ?? Settings.getDefaultHost();
const port = options?.daprPort ?? Settings.getDefaultHttpPort();
let uri = `${host}:${port}`;

if (!(options?.daprHost || options?.daprPort)) {
// If neither host nor port are specified, check the endpoint environment variable.
const endpoint = Settings.getDefaultHttpEndpoint();
if (endpoint != "") {
uri = endpoint;
}
}

return new HttpEndpoint(uri);
}

async getClient(requiresInitialization = true): Promise<typeof fetch> {
// Ensure the sidecar has been started
if (requiresInitialization && !this.isInitialized) {
Expand Down
21 changes: 14 additions & 7 deletions src/implementation/Server/DaprServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,31 +48,38 @@ export default class DaprServer {
constructor(serverOptions: Partial<DaprServerOptions> = {}) {
const communicationProtocol = serverOptions.communicationProtocol ?? Settings.getDefaultCommunicationProtocol();
const clientOptions = getClientOptions(serverOptions.clientOptions, communicationProtocol, serverOptions?.logger);

// Legacy validation on port
// URI validation is done later, when we instantiate the HttpEndpoint or GrpcEndpoint
// object in the HttpClient or GrpcClient constructor, but we need to
// keep this additional check for backward compatibility
// TODO: Remove this validation in the next major version
if (clientOptions?.daprPort && !/^[0-9]+$/.test(clientOptions?.daprPort)) {
throw new Error("DAPR_INCORRECT_SIDECAR_PORT");
}

this.client = new DaprClient(clientOptions);

this.serverOptions = {
serverHost: serverOptions.serverHost ?? Settings.getDefaultHost(),
serverPort: serverOptions.serverPort ?? Settings.getDefaultAppPort(communicationProtocol),
communicationProtocol: communicationProtocol,
maxBodySizeMb: serverOptions.maxBodySizeMb,
serverHttp: serverOptions.serverHttp,
clientOptions: clientOptions,
clientOptions: this.client.options,
logger: serverOptions.logger,
};
// Create a client to interface with the sidecar from the server side
this.client = new DaprClient(clientOptions);

// If DAPR_SERVER_PORT was not set, we set it
process.env.DAPR_SERVER_PORT = this.serverOptions.serverPort;
process.env.DAPR_CLIENT_PORT = clientOptions.daprPort;
process.env.DAPR_CLIENT_PORT = this.client.options.daprPort;

// Validation on port
if (!/^[0-9]+$/.test(this.serverOptions.serverPort)) {
throw new Error("DAPR_INCORRECT_SERVER_PORT");
}

if (!/^[0-9]+$/.test(clientOptions.daprPort)) {
throw new Error("DAPR_INCORRECT_SIDECAR_PORT");
}

// Builder
switch (serverOptions.communicationProtocol) {
case CommunicationProtocolEnum.GRPC: {
Expand Down
45 changes: 45 additions & 0 deletions src/network/AbstractEndpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

export abstract class Endpoint {
protected _scheme = "";
protected _hostname = "";
protected _port = 0;
protected _tls = false;
protected _url: string;
protected _endpoint = "";

protected constructor(url: string) {
this._url = url;
}

get tls(): boolean {
return this._tls;
}

get hostname(): string {
return this._hostname;
}

get scheme(): string {
return this._scheme;
}

get port(): string {
return this._port === 0 ? "" : this._port.toString();
}

get endpoint(): string {
return this._endpoint;
}
}
Loading

0 comments on commit 998f8ee

Please sign in to comment.