Skip to content

Commit

Permalink
feat: Start up Node Metrics publishing after the Model becomes availa…
Browse files Browse the repository at this point in the history
…ble locally
  • Loading branch information
stbrody committed Jul 24, 2024
1 parent 3a3bb99 commit 1005a79
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 30 deletions.
113 changes: 86 additions & 27 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,35 +615,22 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
// If authenticated into the node, we can start publishing metrics
// publishing metrics is enabled by default, even if no metrics config
if (this._metricsConfig?.metricsPublisherEnabled) {
// First, subscribe the node to the Model used for NodeMetrics
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
// TODO: Check for Recon mode
if (EnvironmentUtils.useRustCeramic()) {
// Start a background job that will wait for the Model to be available (synced over Recon)
// and then start publishing to it.
const metricsModel = NodeMetrics.getModel(this._networkOptions.name)
void this._waitForMetricsModel(metricsModel).then(
this._startPublishingNodeMetrics.bind(this, metricsModel)
)
} else {
this._logger.warn(
`Disabling publishing of Node Metrics because we are not connected to a Recon-compatible p2p node`
)
}
}
} else {
// warn that the node does not have an authenticated did
this._logger.imp(
this._logger.warn(
`The ceramic daemon is running without an authenticated DID. This means that this node cannot itself publish streams, including node metrics, and cannot use a DID as the method to authenticate with the Ceramic Anchor Service. See https://developers.ceramic.network/docs/composedb/guides/composedb-server/access-mainnet#updating-to-did-based-authentication for instructions on how to update your node to use DID authentication.`
)
}
Expand All @@ -655,6 +642,78 @@ export class Ceramic implements StreamReaderWriter, StreamStateLoader {
)
}

/**
* Starts up the subsystem to periodically publish Node Metrics to a Stream.
* Requires the data for the NodeMetrics Model to already be available locally
* in the ceramic-one blockstore.
* @param metricsModel - the StreamID of the Model that Node Metrics should be published to.
*/
async _startPublishingNodeMetrics(metricsModel: StreamID): Promise<void> {
await this.repository.index.indexModels([{ streamID: metricsModel }])
await this.recon.registerInterest(metricsModel, this.did.id)

// Now start the NodeMetrics system.
const ipfsVersion = await this.ipfs.version()
const ipfsId = await this.ipfs.id()

NodeMetrics.start({
ceramic: this,
network: this._networkOptions.name,
ceramicVersion: this._versionInfo.cliPackageVersion,
ipfsVersion: ipfsVersion.version,
intervalMS: this._metricsConfig?.metricsPublishIntervalMS || DEFAULT_PUBLISH_INTERVAL_MS,
nodeId: ipfsId.publicKey, // what makes the best ID for the node?
nodeName: '', // daemon.hostname is not useful
nodeAuthDID: this.did.id,
nodeIPAddr: '', // daemon.hostname is not the external name
nodePeerId: ipfsId.publicKey,
logger: this._logger,
})
this._logger.imp(
`Publishing Node Metrics publicly to the Ceramic Network. To learn more, including how to disable publishing, please see the NODE_METRICS.md file for your branch, e.g. https://github.com/ceramicnetwork/js-ceramic/blob/develop/docs-dev/NODE_METRICS.md`
)
}

/**
* Waits for Model used to publish NodeMetrics to be available locally.
* Since we subscribe to the metamodel at startup, so long as some connected node on the network
* has the model, it should eventually be available locally.
* @param model
*/
async _waitForMetricsModel(model: StreamID): Promise<void> {
let attemptNum = 0
let backoffMs = 100
const maxBackoffMs = 1000 * 60 // Caps off at checking once per minute
const delay = async function (ms) {
return new Promise<void>((resolve) => setTimeout(() => resolve(), ms))
}

// eslint-disable-next-line no-constant-condition
while (true) {
try {
await this.dispatcher.getFromIpfs(model.cid)
if (attemptNum > 0) {
this._logger.imp(`Model ${model} used to publish Node Metrics loaded successfully`)
}
return
} catch (err) {
if (attemptNum == 0) {
this._logger.imp(
`Waiting for Model ${model} used to publish Node Metrics to be available locally`
)
} else if (attemptNum % 5 == 0) {
this._logger.err(`Error loading Model ${model} used to publish Node Metrics: ${err}`)
}

await delay(backoffMs)
attemptNum++
if (backoffMs <= maxBackoffMs) {
backoffMs *= 2
}
}
}
}

/**
* Runs some checks at node startup to ensure that the node is healthy and properly configured.
* Throws an Error if any issues are detected
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export class Dispatcher {
*/
async retrieveCommit(cid: CID | string, streamId?: StreamID): Promise<any> {
try {
return await this._getFromIpfs(cid)
return await this.getFromIpfs(cid)
} catch (e) {
if (streamId) {
this._logger.err(
Expand All @@ -380,7 +380,7 @@ export class Dispatcher {
*/
async retrieveFromIPFS(cid: CID | string, path?: string): Promise<any> {
try {
return await this._getFromIpfs(cid, path)
return await this.getFromIpfs(cid, path)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`)
throw e
Expand Down Expand Up @@ -416,7 +416,7 @@ export class Dispatcher {
/**
* Helper function for loading a CID from IPFS
*/
private async _getFromIpfs(cid: CID | string, path?: string): Promise<any> {
async getFromIpfs(cid: CID | string, path?: string): Promise<any> {
const asCid = typeof cid === 'string' ? CID.parse(cid) : cid

// Lookup CID in cache before looking it up IPFS
Expand Down

0 comments on commit 1005a79

Please sign in to comment.