Skip to content

Commit

Permalink
feat: more observability for import CAR and CAS polling
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap authored and Samika Kashyap committed Jul 19, 2024
1 parent 962546e commit 3afe18d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
9 changes: 9 additions & 0 deletions packages/core/src/anchor/ethereum/remote-cas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const CAS_REQUEST_POLL_FAILED = 'cas_request_poll_failed'
const CAS_REQUEST_COMPLETED = 'cas_request_completed'
const CAS_INACCESSIBLE = 'cas_inaccessible'

const CAS_CREATE_REQUEST_TIME = 'cas_create_request_time'
const CAS_POLL_STATUS_TIME = 'cas_poll_status_time'

/**
* Parse JSON that CAS returns.
*/
Expand Down Expand Up @@ -161,6 +164,7 @@ export class RemoteCAS implements CASClient {

create$(streamId: StreamID, tip: CID, timestamp: Date): Observable<unknown> {
const sendRequest$ = deferAbortable(async (signal) => {
const timeStart = Date.now()
const response = await this.#sendRequest(this.#requestsApiEndpoint, {
method: 'POST',
headers: {
Expand All @@ -175,6 +179,8 @@ export class RemoteCAS implements CASClient {
},
signal: signal,
})
const timeEnd = Date.now()
Metrics.observe(CAS_CREATE_REQUEST_TIME, timeEnd - timeStart)

// We successfully contacted the CAS
this._recordCASContactSuccess('created')
Expand All @@ -201,12 +207,15 @@ export class RemoteCAS implements CASClient {

async getStatusForRequest(streamId: StreamID, tip: CID): Promise<AnchorEvent> {
const requestUrl = [this.#requestsApiEndpoint, tip.toString()].join('/')
const timeStart = Date.now()
const sendRequest$ = deferAbortable(async (signal) => {
const response = await this.#sendRequest(requestUrl, { signal: signal })

// We successfully contacted the CAS
this._recordCASContactSuccess('polled')
Metrics.count(CAS_REQUEST_POLLED, 1)
const timeEnd = Date.now()
Metrics.observe(CAS_POLL_STATUS_TIME, timeEnd - timeStart)
return response
})
const response = await firstValueFrom(
Expand Down
20 changes: 20 additions & 0 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ const PUBSUB_CACHE_SIZE = 500
const ERROR_IPFS_TIMEOUT = 'ipfs_timeout'
const ERROR_STORING_COMMIT = 'error_storing_commit'
const COMMITS_STORED = 'commits_stored'
const IMPORT_CAR_INIT_EVENT_REQUESTED = 'import_car_init_event_requested'
const IMPORT_CAR_STORE_EVENT_REQUESTED = 'import_car_store_event_requested'
const IMPORT_CAR_INIT_EVENT_TIME = 'import_car_init_event_time'
const IMPORT_CAR_STORE_EVENT_TIME = 'import_car_store_event_time'
const CREATE_CAR_INIT_EVENT_TIME = 'create_car_init_event_time'
const CREATE_CAR_STORE_EVENT_TIME = 'create_car_store_event_time'

function messageTypeToString(type: MsgType): string {
switch (type) {
Expand Down Expand Up @@ -296,9 +302,16 @@ export class Dispatcher {
*/
async storeInitEvent(data: any, streamType: number): Promise<CID> {
try {
const timeStartCreate = Date.now()
const car = this._createCAR(data)
const timeEndCreate = Date.now()
Metrics.observe(CREATE_CAR_INIT_EVENT_TIME, timeEndCreate - timeStartCreate)
const streamId = new StreamID(streamType, car.roots[0])
Metrics.count(IMPORT_CAR_INIT_EVENT_REQUESTED, 1)
const timeStartImport = Date.now()
await this.importCAR(car, streamId)
const timeEndImport = Date.now()
Metrics.observe(IMPORT_CAR_INIT_EVENT_TIME, timeEndImport - timeStartImport)
Metrics.count(COMMITS_STORED, 1)
return car.roots[0]
} catch (e) {
Expand All @@ -317,8 +330,15 @@ export class Dispatcher {
*/
async storeEvent(data: any, streamId: StreamID): Promise<CID> {
try {
const timeStartCreate = Date.now()
const car = this._createCAR(data)
const timeEndCreate = Date.now()
Metrics.observe(CREATE_CAR_STORE_EVENT_TIME, timeEndCreate - timeStartCreate)
Metrics.count(IMPORT_CAR_STORE_EVENT_REQUESTED, 1)
const timeStartImport = Date.now()
await this.importCAR(car, streamId)
const timeEndImport = Date.now()
Metrics.observe(IMPORT_CAR_STORE_EVENT_TIME, timeEndImport - timeStartImport)
Metrics.count(COMMITS_STORED, 1)
return car.roots[0]
} catch (e) {
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ const STREAM_SYNC = 'stream_sync'
const RECON_STORE_USECASE_NAME = 'recon'
const RECON_STORE_CURSOR_KEY = 'cursor'

// TODO : Use spans instead of observe when reporting for spans is ready
const IMPORT_CAR_ANCHOR_COMMIT_REQUESTED = 'import_car_anchor_commit'
// const IMPORT_CAR_ANCHOR_COMMIT_SPAN = 'import_car_anchor_commit_span'
const IMPORT_CAR_ANCHOR_COMMIT_COMPLETED = 'import_car_anchor_commit_completed'
const IMPORT_CAR_ANCHOR_COMMIT_TIME = 'import_car_anchor_commit_time'

export type RepositoryDependencies = {
dispatcher: Dispatcher
pinStore: PinStore
Expand Down Expand Up @@ -762,9 +768,17 @@ export class Repository {
remainingRetries--
) {
try {
// TODO : Remove observe and use Spans for richer reporting when Spans support is ready
if (witnessCAR) {
// const importCARAnchorCommitSpan = Metrics.startSpan(IMPORT_CAR_ANCHOR_COMMIT_SPAN)
const timeStart = Date.now()
Metrics.count(IMPORT_CAR_ANCHOR_COMMIT_REQUESTED, 1)
await this.dispatcher.importCAR(witnessCAR, streamId)
Metrics.count(IMPORT_CAR_ANCHOR_COMMIT_COMPLETED, 1)
const timeEnd = Date.now()
Metrics.observe(IMPORT_CAR_ANCHOR_COMMIT_TIME, timeEnd - timeStart)
this.logger.verbose(`successfully imported CAR file for ${streamId}`)
// importCARAnchorCommitSpan.end()
}

const applied = await this._handleTip(state$, anchorCommitCID)
Expand Down

0 comments on commit 3afe18d

Please sign in to comment.