Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core-p2p): socketcluster shutdown and logging #2560

Merged
merged 11 commits into from
May 10, 2019
4 changes: 2 additions & 2 deletions __tests__/integration/core-p2p/network-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ beforeEach(async () => {
});

describe("NetworkMonitor", () => {
describe("cleanPeers", () => {
describe("cleansePeers", () => {
it("should be ok", async () => {
storage.setPeer(new Peer("0.0.0.11", 4444));

const previousLength = storage.getPeers().length;

await monitor.cleanPeers(true);
await monitor.cleansePeers(true);

expect(storage.getPeers().length).toBeLessThan(previousLength);
});
Expand Down
4 changes: 2 additions & 2 deletions __tests__/unit/core-p2p/network-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe("NetworkMonitor", () => {
});
});

describe("cleanPeers", () => {
describe("cleansePeers", () => {
it("should remove the unresponsive peers", async () => {
storage.setPeer(stubPeer);

Expand All @@ -55,7 +55,7 @@ describe("NetworkMonitor", () => {
throw new Error("yo");
}),
} as any);
await monitor.cleanPeers();
await monitor.cleansePeers();
expect(storage.hasPeers()).toBeFalse();
mockGetPeer.mockRestore();
});
Expand Down
38 changes: 20 additions & 18 deletions packages/core-container/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,31 +153,33 @@ export class Container implements container.IContainer {

private registerExitHandler(exitEvents: string[]): void {
const handleExit = async () => {
if (this.shuttingDown || !this.isReady) {
if (this.shuttingDown) {
return;
}

this.shuttingDown = true;

const logger: Logger.ILogger = this.resolvePlugin<Logger.ILogger>("logger");
if (logger) {
logger.suppressConsoleOutput(this.silentShutdown);
logger.info("Core is trying to gracefully shut down to avoid data corruption");
if (this.isReady) {
const logger: Logger.ILogger = this.resolvePlugin<Logger.ILogger>("logger");
if (logger) {
logger.suppressConsoleOutput(this.silentShutdown);
logger.info("Core is trying to gracefully shut down to avoid data corruption");
}

try {
// Notify plugins about shutdown
this.resolvePlugin<EventEmitter.EventEmitter>("event-emitter").emit("shutdown");

// Wait for event to be emitted and give time to finish
await delay(1000);
} catch (error) {
// tslint:disable-next-line:no-console
console.error(error.stack);
}

await this.plugins.tearDown();
}

try {
// Notify plugins about shutdown
this.resolvePlugin<EventEmitter.EventEmitter>("event-emitter").emit("shutdown");

// Wait for event to be emitted and give time to finish
await delay(1000);
} catch (error) {
// tslint:disable-next-line:no-console
console.error(error.stack);
}

await this.plugins.tearDown();

process.exit();
};

Expand Down
15 changes: 10 additions & 5 deletions packages/core-forger/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ export class Client {
constructor(hosts: IRelayHost[]) {
this.hosts = hosts.map(host => {
host.socket = socketCluster.create(host);
host.socket.on("error", err => this.logger.error(err.message));
host.socket.on("error", err => {
if (err.message !== "Socket hung up") {
this.logger.error(err.message);
}
});

return host;
});
Expand Down Expand Up @@ -70,8 +74,6 @@ export class Client {
try {
return NetworkState.parse(await this.emit<P2P.INetworkState>("p2p.internal.getNetworkState", {}, 4000));
} catch (err) {
this.logger.error(`Could not retrieve network state: ${this.host.hostname}: ${err.message}`);

return new NetworkState(NetworkStateStatus.Unknown);
}
}
Expand Down Expand Up @@ -107,7 +109,6 @@ export class Client {
}

public async selectHost(): Promise<void> {
// if no socket is connected, we give it 1 second
for (let i = 0; i < 10; i++) {
for (const host of this.hosts) {
if (host.socket.getState() === host.socket.OPEN) {
Expand All @@ -119,7 +120,11 @@ export class Client {
await delay(100);
}

this.logger.debug(`No open socket connection to any host : ${this.hosts.map(host => host.hostname).join()}.`);
this.logger.debug(
`No open socket connection to any host: ${JSON.stringify(
this.hosts.map(host => `${host.hostname}:${host.port}`),
)}.`,
);

throw new HostNoResponseError(this.hosts.map(host => host.hostname).join());
}
Expand Down
13 changes: 7 additions & 6 deletions packages/core-forger/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import uniq from "lodash.uniq";
import pluralize from "pluralize";
import { Client } from "./client";
import { Delegate } from "./delegate";
import { HostNoResponseError } from "./errors";
import { HostNoResponseError, RelayCommunicationError } from "./errors";

export class ForgerManager {
private readonly logger: Logger.ILogger = app.resolvePlugin<Logger.ILogger>("logger");
Expand Down Expand Up @@ -48,14 +48,16 @@ export class ForgerManager {
return;
}

let timeout: number;
try {
await this.loadRound();

await this.checkLater(Crypto.Slots.getTimeInMsUntilNextSlot());

timeout = Crypto.Slots.getTimeInMsUntilNextSlot();
this.logger.info(`Forger Manager started with ${pluralize("forger", this.delegates.length, true)}`);
} catch (error) {
timeout = 2000;
this.logger.warn("Waiting for a responsive host.");
} finally {
await this.checkLater(timeout);
}
}

Expand Down Expand Up @@ -111,11 +113,10 @@ export class ForgerManager {

return this.checkLater(Crypto.Slots.getTimeInMsUntilNextSlot());
} catch (error) {
if (error instanceof HostNoResponseError) {
if (error instanceof HostNoResponseError || error instanceof RelayCommunicationError) {
this.logger.warn(error.message);
} else {
this.logger.error(error.stack);
this.logger.error(`Forging failed: ${error.message}`);

if (!isEmpty(this.round)) {
this.logger.info(
Expand Down
3 changes: 2 additions & 1 deletion packages/core-interfaces/src/core-p2p/network-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface INetworkStatus {
export interface INetworkMonitor {
start(options): Promise<INetworkMonitor>;
updateNetworkStatus(networkStart?: boolean): Promise<void>;
cleanPeers(fast?: boolean, forcePing?: boolean): Promise<void>;
cleansePeers(fast?: boolean, forcePing?: boolean): Promise<void>;
discoverPeers(): Promise<void>;
getNetworkHeight(): number;
getNetworkState(): Promise<INetworkState>;
Expand All @@ -22,5 +22,6 @@ export interface INetworkMonitor {
broadcastTransactions(transactions: Interfaces.ITransaction[]): Promise<void>;
getServer(): SocketCluster;
setServer(server: SocketCluster): void;
stopServer(): void;
resetSuspendedPeers(): Promise<void>;
}
7 changes: 7 additions & 0 deletions packages/core-p2p/src/event-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,12 @@ export class EventListener {
processor.suspend(peer, punishment ? guard.punishment(punishment) : undefined);
}
});

const exitHandler = () => {
service.getMonitor().stopServer();
};

process.on("SIGINT", exitHandler);
process.on("exit", exitHandler);
}
}
18 changes: 13 additions & 5 deletions packages/core-p2p/src/network-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
this.server = server;
}

public stopServer(): void {
if (this.server) {
this.server.removeAllListeners();
this.server.destroy();
this.server = undefined;
}
}

public isColdStartActive(): boolean {
if (process.env.CORE_SKIP_COLD_START) {
return false;
Expand Down Expand Up @@ -102,7 +110,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor {

try {
await this.discoverPeers();
await this.cleanPeers();
await this.cleansePeers();
} catch (error) {
this.logger.error(`Network Status: ${error.message}`);
}
Expand All @@ -120,7 +128,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
this.scheduleUpdateNetworkStatus(nextRunDelaySeconds);
}

public async cleanPeers(fast: boolean = false, forcePing: boolean = false): Promise<void> {
public async cleansePeers(fast: boolean = false, forcePing: boolean = false): Promise<void> {
const peers = this.storage.getPeers();
let unresponsivePeers = 0;
const pingDelay = fast ? 1500 : app.resolveOptions("p2p").globalTimeout;
Expand Down Expand Up @@ -194,7 +202,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor {

public async getNetworkState(): Promise<P2P.INetworkState> {
if (!this.isColdStartActive()) {
await this.cleanPeers(true, true);
await this.cleansePeers(true, true);
}

return NetworkState.analyze(this, this.storage);
Expand All @@ -204,7 +212,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
this.logger.info(`Refreshing ${this.storage.getPeers().length} peers after fork.`);

// Reset all peers, except peers banned because of causing a fork.
await this.cleanPeers(false, true);
await this.cleansePeers(false, true);
await this.resetSuspendedPeers();

// Ban peer who caused the fork
Expand All @@ -216,7 +224,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor {

public async checkNetworkHealth(): Promise<P2P.INetworkStatus> {
if (!this.isColdStartActive()) {
await this.cleanPeers(false, true);
await this.cleansePeers(false, true);
await this.resetSuspendedPeers();
}

Expand Down
4 changes: 4 additions & 0 deletions packages/core-p2p/src/peer-communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ export class PeerCommunicator implements P2P.IPeerCommunicator {
peer.latency = -1;
this.emitter.emit("internal.p2p.suspendPeer", { peer });
break;
case "Error":
case "CoreSocketNotOpenError":
this.emitter.emit("internal.p2p.suspendPeer", { peer });
break;
default:
this.logger.error(`Socket error (peer ${peer.ip}) : ${error.message}`);
this.emitter.emit("internal.p2p.suspendPeer", { peer });
Expand Down
7 changes: 1 addition & 6 deletions packages/core-p2p/src/peer-connector.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { app } from "@arkecosystem/core-container";
import { EventEmitter, Logger, P2P } from "@arkecosystem/core-interfaces";
import { EventEmitter, P2P } from "@arkecosystem/core-interfaces";
import { create, SCClientSocket } from "socketcluster-client";
import { PeerRepository } from "./peer-repository";

export class PeerConnector implements P2P.IPeerConnector {
private readonly logger: Logger.ILogger = app.resolvePlugin<Logger.ILogger>("logger");
private readonly emitter: EventEmitter.EventEmitter = app.resolvePlugin<EventEmitter.EventEmitter>("event-emitter");
private readonly connections: PeerRepository<SCClientSocket> = new PeerRepository<SCClientSocket>();
private readonly errors: Map<string, string> = new Map<string, string>();
Expand Down Expand Up @@ -32,8 +31,6 @@ export class PeerConnector implements P2P.IPeerConnector {
this.connections.set(peer.ip, connection);

this.connection(peer).on("error", err => {
this.logger.debug(`Socket error for peer ${peer.ip}: "${err}"`);

this.emitter.emit("internal.p2p.suspendPeer", { peer });
});

Expand All @@ -44,8 +41,6 @@ export class PeerConnector implements P2P.IPeerConnector {
const connection = this.connection(peer);

if (connection) {
this.logger.debug(`Disconnecting from ${peer.ip}:${peer.port}`);

connection.destroy();

this.connections.forget(peer.ip);
Expand Down
6 changes: 3 additions & 3 deletions packages/core-p2p/src/peer-guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ export class PeerGuard implements P2P.IPeerGuard {
until: () => dato().addMinutes(15),
reason: "Fork",
},
socketNotOpen: {
socketGotClosed: {
until: () => dato().addMinutes(5),
reason: "Socket not open",
reason: "Socket got closed",
},
};

Expand All @@ -71,7 +71,7 @@ export class PeerGuard implements P2P.IPeerGuard {
const connection: SCClientSocket = this.connector.connection(peer);

if (connection && connection.getState() !== connection.OPEN) {
return this.createPunishment(this.offences.socketNotOpen);
return this.createPunishment(this.offences.socketGotClosed);
}

if (this.connector.hasError(peer, SocketErrors.AppNotReady)) {
Expand Down
7 changes: 6 additions & 1 deletion packages/core-p2p/src/peer-processors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { EventEmitter, Logger, P2P } from "@arkecosystem/core-interfaces";
import { dato } from "@faustbrian/dato";
import prettyMs from "pretty-ms";
import { SCClientSocket } from "socketcluster-client";
import { PeerPingTimeoutError } from "./errors";
import { Peer } from "./peer";
import { PeerSuspension } from "./peer-suspension";
import { isValidPeer } from "./utils";
Expand Down Expand Up @@ -99,6 +100,7 @@ export class PeerProcessor implements P2P.IPeerProcessor {
this.connector.disconnect(peer);

if (!punishment) {
this.logger.debug(`Disconnecting from ${peer.ip}:${peer.port} without punishment.`);
return;
}

Expand Down Expand Up @@ -157,7 +159,10 @@ export class PeerProcessor implements P2P.IPeerProcessor {

this.emitter.emit("peer.added", newPeer);
} catch (error) {
this.logger.debug(`Could not accept new peer ${newPeer.ip}:${newPeer.port}: ${error}`);
if (error instanceof PeerPingTimeoutError) {
newPeer.latency = -1;
}

this.suspend(newPeer);
} finally {
this.storage.forgetPendingPeer(peer);
Expand Down
7 changes: 1 addition & 6 deletions packages/core-p2p/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ export const plugin: Container.IPluginDescriptor = {

const service = container.resolvePlugin<P2P.IPeerService>("p2p");
service.getStorage().savePeers();

const server = service.getMonitor().getServer();
if (server) {
server.removeAllListeners("fail");
server.destroy();
}
service.getMonitor().stopServer();
},
};