Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing callbacks #522

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions src/implementation/Client/GRPCClient/GRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { Logger } from "../../../logger/Logger";
import GRPCClientSidecar from "./sidecar";
import DaprClient from "../DaprClient";
import { SDK_VERSION } from "../../../version";
import { promisify } from "util";

export default class GRPCClient implements IClient {
readonly options: DaprClientOptions;
Expand Down Expand Up @@ -98,16 +99,12 @@ export default class GRPCClient implements IClient {
async _startWaitForClientReady(): Promise<void> {
const deadline = Date.now() + Settings.getDaprSidecarStartupTimeoutMs();

return new Promise((resolve, reject) => {
this.client.waitForReady(deadline, (err?) => {
if (err) {
this.logger.error(`Error waiting for client to be ready: ${err}`);
return reject();
}

return resolve();
});
});
try {
await promisify(this.client.waitForReady).bind(this.client)(deadline);
} catch (err) {
this.logger.error(`Error waiting for client to be ready: ${err}`);
throw undefined;
}
}

async _startAwaitSidecarStarted(): Promise<void> {
Expand Down
23 changes: 9 additions & 14 deletions src/implementation/Client/GRPCClient/binding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import IClientBinding from "../../../interfaces/Client/IClientBinding";
import * as SerializerUtil from "../../../utils/Serializer.util";
import { addMetadataToMap } from "../../../utils/Client.util";
import { KeyValueType } from "../../../types/KeyValue.type";
import { promisify } from "util";

// https://docs.dapr.io/reference/api/bindings_api/
export default class GRPCClientBinding implements IClientBinding {
Expand All @@ -42,19 +43,13 @@ export default class GRPCClientBinding implements IClientBinding {

const client = await this.client.getClient();

return new Promise((resolve, reject) => {
client.invokeBinding(msgService, (err, res: InvokeBindingResponse) => {
if (err) {
return reject(err);
}

// https://docs.dapr.io/reference/api/bindings_api/#payload
return resolve({
data: res.getData(),
metadata: res.getMetadataMap(),
operation,
});
});
});
const invokeBinding = promisify<InvokeBindingRequest, InvokeBindingResponse>(client.invokeBinding).bind(client);
const res = await invokeBinding(msgService);

return {
data: res.getData(),
metadata: res.getMetadataMap(),
operation,
};
}
}
63 changes: 33 additions & 30 deletions src/implementation/Client/GRPCClient/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { SubscribeConfigurationCallback } from "../../../types/configuration/Sub
import { SubscribeConfigurationStream } from "../../../types/configuration/SubscribeConfigurationStream";
import { ConfigurationItem } from "../../../types/configuration/ConfigurationItem";
import { createConfigurationType } from "../../../utils/Client.util";
import { promisify } from "util";

export default class GRPCClientConfiguration implements IClientConfiguration {
client: GRPCClient;
Expand All @@ -55,21 +56,16 @@ export default class GRPCClientConfiguration implements IClientConfiguration {

const client = await this.client.getClient();

return new Promise((resolve, reject) => {
client.getConfiguration(msg, metadata, (err, res: GetConfigurationResponse) => {
if (err) {
return reject(err);
}

const configMap: { [k: string]: ConfigurationItem } = createConfigurationType(res.getItemsMap());
const getConfiguration = promisify<GetConfigurationRequest, grpc.Metadata, GetConfigurationResponse>(
client.getConfiguration,
).bind(client);
const res = await getConfiguration(msg, metadata);

const result: SubscribeConfigurationResponseResult = {
items: configMap,
};
const configMap: { [k: string]: ConfigurationItem } = createConfigurationType(res.getItemsMap());

return resolve(result);
});
});
return {
items: configMap,
};
}

async subscribe(storeName: string, cb: SubscribeConfigurationCallback): Promise<SubscribeConfigurationStream> {
Expand Down Expand Up @@ -144,23 +140,30 @@ export default class GRPCClientConfiguration implements IClientConfiguration {

return {
stop: async () => {
return new Promise((resolve, reject) => {
const req = new UnsubscribeConfigurationRequest();
req.setStoreName(storeName);
req.setId(streamId);

client.unsubscribeConfiguration(req, (err, res: UnsubscribeConfigurationResponse) => {
if (err || !res.getOk()) {
return reject(res.getMessage());
}

// Clean up the node.js event emitter
stream.removeAllListeners();
stream.destroy();

return resolve();
});
});
const req = new UnsubscribeConfigurationRequest();
req.setStoreName(storeName);
req.setId(streamId);

let res: UnsubscribeConfigurationResponse | undefined = undefined;
let hasError = false;
try {
const unsubscribe = promisify<UnsubscribeConfigurationRequest, UnsubscribeConfigurationResponse>(
client.unsubscribeConfiguration,
).bind(client);

res = await unsubscribe(req);
hasError = !res.getOk();
} catch (e) {
hasError = true;
}

if (res !== undefined && hasError) {
throw res.getMessage();
}

// Clean up the node.js event emitter
stream.removeAllListeners();
stream.destroy();
},
};
}
Expand Down
23 changes: 10 additions & 13 deletions src/implementation/Client/GRPCClient/health.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ limitations under the License.

import GRPCClient from "./GRPCClient";
import IClientHealth from "../../../interfaces/Client/IClientHealth";
import { GetMetadataResponse } from "../../../proto/dapr/proto/runtime/v1/dapr_pb";
//import { GetMetadataResponse } from "../../../proto/dapr/proto/runtime/v1/dapr_pb";
import { Empty } from "google-protobuf/google/protobuf/empty_pb";
import { promisify } from "util";

// https://docs.dapr.io/reference/api/health_api/
export default class GRPCClientHealth implements IClientHealth {
Expand All @@ -28,18 +29,14 @@ export default class GRPCClientHealth implements IClientHealth {
async isHealthy(): Promise<boolean> {
const client = await this.client.getClient();

return new Promise((resolve, _reject) => {
try {
client.getMetadata(new Empty(), (err, _res: GetMetadataResponse) => {
if (err) {
return resolve(false);
}
let isHealthy = true;
try {
const getMetadata = promisify(client.getMetadata).bind(client);
await getMetadata(new Empty());
} catch (e) {
isHealthy = false;
}

return resolve(true);
});
} catch (e) {
return resolve(false);
}
});
return isHealthy;
}
}
40 changes: 17 additions & 23 deletions src/implementation/Client/GRPCClient/invoker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import * as HttpVerbUtil from "../../../utils/HttpVerb.util";
import IClientInvoker from "../../../interfaces/Client/IClientInvoker";
import * as SerializerUtil from "../../../utils/Serializer.util";
import { InvokerOptions } from "../../../types/InvokerOptions.type";
import { promisify } from "util";

// https://docs.dapr.io/reference/api/service_invocation_api/
export default class GRPCClientInvoker implements IClientInvoker {
Expand Down Expand Up @@ -60,30 +61,23 @@ export default class GRPCClientInvoker implements IClientInvoker {

const client = await this.client.getClient();

return new Promise((resolve, reject) => {
client.invokeService(msgInvokeService, (err, res: InvokeResponse) => {
if (err) {
return reject(err);
}
const invokeService = promisify<InvokeServiceRequest, InvokeResponse>(client.invokeService).bind(client);
const res = await invokeService(msgInvokeService);
let resData = "";

let resData = "";
if (res.getData()) {
resData = Buffer.from((res.getData() as Any).getValue()).toString();
}

if (res.getData()) {
resData = Buffer.from((res.getData() as Any).getValue()).toString();
}

try {
const parsedResData = JSON.parse(resData);
return resolve(parsedResData);
} catch (e) {
throw new Error(
JSON.stringify({
error: "COULD_NOT_PARSE_RESULT",
error_msg: `Could not parse the returned resultset: ${resData}`,
}),
);
}
});
});
try {
return JSON.parse(resData);
} catch (e) {
throw new Error(
JSON.stringify({
error: "COULD_NOT_PARSE_RESULT",
error_msg: `Could not parse the returned resultset: ${resData}`,
}),
);
}
}
}
34 changes: 11 additions & 23 deletions src/implementation/Client/GRPCClient/lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
UnlockResponse,
} from "../../../proto/dapr/proto/runtime/v1/dapr_pb";
import IClientLock from "../../../interfaces/Client/IClientLock";
import { promisify } from "util";

export default class GRPCClientLock implements IClientLock {
client: GRPCClient;
Expand All @@ -42,38 +43,25 @@ export default class GRPCClientLock implements IClientLock {
.setExpiryInSeconds(expiryInSeconds);

const client = await this.client.getClient();
return new Promise((resolve, reject) => {
client.tryLockAlpha1(request, (err, res: TryLockResponse) => {
if (err) {
return reject(err);
}
const tryLock = promisify<TryLockRequest, TryLockResponse>(client.tryLockAlpha1).bind(client);
const res = await tryLock(request);

const wrapped: LockResponseResult = {
success: res.getSuccess(),
};

return resolve(wrapped);
});
});
return {
success: res.getSuccess(),
};
}

async unlock(storeName: string, resourceId: string, lockOwner: string): Promise<UnLockResponseResult> {
const request = new UnlockRequest().setStoreName(storeName).setResourceId(resourceId).setLockOwner(lockOwner);

const client = await this.client.getClient();
return new Promise((resolve, reject) => {
client.unlockAlpha1(request, (err, res: UnlockResponse) => {
if (err) {
return reject(err);
}

const wrapped: UnLockResponseResult = {
status: this.getUnlockResponse(res),
};
const unlock = promisify<UnlockRequest, UnlockResponse>(client.unlockAlpha1).bind(client);
const res = await unlock(request);

return resolve(wrapped);
});
});
return {
status: this.getUnlockResponse(res),
};
}

getUnlockResponse(res: UnlockResponse) {
Expand Down
69 changes: 31 additions & 38 deletions src/implementation/Client/GRPCClient/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { GetMetadataResponse, SetMetadataRequest } from "../../../proto/dapr/pro
import { Empty } from "google-protobuf/google/protobuf/empty_pb";
import IClientMetadata from "../../../interfaces/Client/IClientMetadata";
import { GetMetadataResponse as GetMetadataResponseResult } from "../../../types/metadata/GetMetadataResponse";
import { promisify } from "util";

// https://docs.dapr.io/reference/api/metadata_api
export default class GRPCClientMetadata implements IClientMetadata {
Expand All @@ -29,37 +30,30 @@ export default class GRPCClientMetadata implements IClientMetadata {
async get(): Promise<GetMetadataResponseResult> {
const client = await this.client.getClient();

return new Promise((resolve, reject) => {
client.getMetadata(new Empty(), (err, res: GetMetadataResponse) => {
if (err) {
return reject(err);
}
const getMetadata = promisify<Empty, GetMetadataResponse>(client.getMetadata).bind(client);
const res = await getMetadata(new Empty());

const wrapped: GetMetadataResponseResult = {
id: res.getId(),
actors: res.getActiveActorsCountList().map((a) => ({
type: a.getType(),
count: a.getCount(),
})),
extended: res
.getExtendedMetadataMap()
.toObject()
.reduce((result: object, [key, value]) => {
// @ts-ignore
result[key] = value;
return result;
}, {}),
components: res.getRegisteredComponentsList().map((c) => ({
name: c.getName(),
type: c.getType(),
version: c.getVersion(),
capabilities: c.getCapabilitiesList(),
})),
};

return resolve(wrapped);
});
});
return {
id: res.getId(),
actors: res.getActiveActorsCountList().map((a) => ({
type: a.getType(),
count: a.getCount(),
})),
extended: res
.getExtendedMetadataMap()
.toObject()
.reduce((result: object, [key, value]) => {
// @ts-ignore
result[key] = value;
return result;
}, {}),
components: res.getRegisteredComponentsList().map((c) => ({
name: c.getName(),
type: c.getType(),
version: c.getVersion(),
capabilities: c.getCapabilitiesList(),
})),
};
}

async set(key: string, value: string): Promise<boolean> {
Expand All @@ -69,14 +63,13 @@ export default class GRPCClientMetadata implements IClientMetadata {

const client = await this.client.getClient();

return new Promise((resolve, reject) => {
client.setMetadata(msg, (err, _res: Empty) => {
if (err) {
return reject(false);
}
try {
const setMetadata = promisify(client.setMetadata).bind(client);
await setMetadata(msg);
} catch (e) {
throw false;
}

return resolve(true);
});
});
return true;
}
}
Loading