From 1005a79a6091b63a97415cbb0dd24b086dcd2663 Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Wed, 24 Jul 2024 17:04:04 -0400 Subject: [PATCH] feat: Start up Node Metrics publishing after the Model becomes available locally --- packages/core/src/ceramic.ts | 113 ++++++++++++++++++++++++-------- packages/core/src/dispatcher.ts | 6 +- 2 files changed, 89 insertions(+), 30 deletions(-) diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 57a07ad6d5..6e9a05cc89 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -615,35 +615,22 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { // If authenticated into the node, we can start publishing metrics // publishing metrics is enabled by default, even if no metrics config if (this._metricsConfig?.metricsPublisherEnabled) { - // First, subscribe the node to the Model used for NodeMetrics - const metricsModel = NodeMetrics.getModel(this._networkOptions.name) - await this.repository.index.indexModels([{ streamID: metricsModel }]) - await this.recon.registerInterest(metricsModel, this.did.id) - - // Now start the NodeMetrics system. - const ipfsVersion = await this.ipfs.version() - const ipfsId = await this.ipfs.id() - - NodeMetrics.start({ - ceramic: this, - network: this._networkOptions.name, - ceramicVersion: this._versionInfo.cliPackageVersion, - ipfsVersion: ipfsVersion.version, - intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS, - nodeId: ipfsId.publicKey, // what makes the best ID for the node? - nodeName: '', // daemon.hostname is not useful - nodeAuthDID: this.did.id, - nodeIPAddr: '', // daemon.hostname is not the external name - nodePeerId: ipfsId.publicKey, - logger: this._logger, - }) - this._logger.imp( - `Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md` - ) + // TODO: Check for Recon mode + if (EnvironmentUtils.useRustCeramic()) { + // Start a background job that will wait for the Model to be available (synced over Recon) + // and then start publishing to it. + const metricsModel = NodeMetrics.getModel(this._networkOptions.name) + void this._waitForMetricsModel(metricsModel).then( + this._startPublishingNodeMetrics.bind(this, metricsModel) + ) + } else { + this._logger.warn( + `Disabling publishing of Node Metrics because we are not connected to a Recon-compatible p2p node` + ) + } } } else { - // warn that the node does not have an authenticated did - this._logger.imp( + this._logger.warn( `The ceramic daemon is running without an authenticated DID. This means that this node cannot itself publish streams, including node metrics, and cannot use a DID as the method to authenticate with the Ceramic Anchor Service. See https://developers.ceramic.network/docs/composedb/guides/composedb-server/access-mainnet#updating-to-did-based-authentication for instructions on how to update your node to use DID authentication.` ) } @@ -655,6 +642,78 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader { ) } + /** + * Starts up the subsystem to periodically publish Node Metrics to a Stream. + * Requires the data for the NodeMetrics Model to already be available locally + * in the ceramic-one blockstore. + * @param metricsModel - the StreamID of the Model that Node Metrics should be published to. + */ + async _startPublishingNodeMetrics(metricsModel: StreamID): Promise { + await this.repository.index.indexModels([{ streamID: metricsModel }]) + await this.recon.registerInterest(metricsModel, this.did.id) + + // Now start the NodeMetrics system. + const ipfsVersion = await this.ipfs.version() + const ipfsId = await this.ipfs.id() + + NodeMetrics.start({ + ceramic: this, + network: this._networkOptions.name, + ceramicVersion: this._versionInfo.cliPackageVersion, + ipfsVersion: ipfsVersion.version, + intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS, + nodeId: ipfsId.publicKey, // what makes the best ID for the node? + nodeName: '', // daemon.hostname is not useful + nodeAuthDID: this.did.id, + nodeIPAddr: '', // daemon.hostname is not the external name + nodePeerId: ipfsId.publicKey, + logger: this._logger, + }) + this._logger.imp( + `Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md` + ) + } + + /** + * Waits for Model used to publish NodeMetrics to be available locally. + * Since we subscribe to the metamodel at startup, so long as some connected node on the network + * has the model, it should eventually be available locally. + * @param model + */ + async _waitForMetricsModel(model: StreamID): Promise { + let attemptNum = 0 + let backoffMs = 100 + const maxBackoffMs = 1000 * 60 // Caps off at checking once per minute + const delay = async function (ms) { + return new Promise((resolve) => setTimeout(() => resolve(), ms)) + } + + // eslint-disable-next-line no-constant-condition + while (true) { + try { + await this.dispatcher.getFromIpfs(model.cid) + if (attemptNum > 0) { + this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`) + } + return + } catch (err) { + if (attemptNum == 0) { + this._logger.imp( + `Waiting for Model ${model} used to publish Node Metrics to be available locally` + ) + } else if (attemptNum % 5 == 0) { + this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`) + } + + await delay(backoffMs) + attemptNum++ + if (backoffMs <= maxBackoffMs) { + backoffMs *= 2 + } + } + } + } + /** * Runs some checks at node startup to ensure that the node is healthy and properly configured. * Throws an Error if any issues are detected diff --git a/packages/core/src/dispatcher.ts b/packages/core/src/dispatcher.ts index 559210105e..c2cef4518d 100644 --- a/packages/core/src/dispatcher.ts +++ b/packages/core/src/dispatcher.ts @@ -359,7 +359,7 @@ export class Dispatcher { */ async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise { try { - return await this._getFromIpfs(cid) + return await this.getFromIpfs(cid) } catch (e) { if (streamId) { this._logger.err( @@ -380,7 +380,7 @@ export class Dispatcher { */ async retrieveFromIPFS(cid: CID | string, path?: string): Promise { try { - return await this._getFromIpfs(cid, path) + return await this.getFromIpfs(cid, path) } catch (e) { this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`) throw e @@ -416,7 +416,7 @@ export class Dispatcher { /** * Helper function for loading a CID from IPFS */ - private async _getFromIpfs(cid: CID | string, path?: string): Promise { + async getFromIpfs(cid: CID | string, path?: string): Promise { const asCid = typeof cid === 'string' ? CID.parse(cid) : cid // Lookup CID in cache before looking it up IPFS