From cd9a9ce577b40c7b4c65dd94088d3e1e0863b213 Mon Sep 17 00:00:00 2001 From: Erno Date: Sun, 18 Sep 2022 22:46:37 +0200 Subject: [PATCH] feat: add retry and updateStatus methods --- .changeset/sixty-tips-switch.md | 5 + packages/streams/src/MoralisStreams.ts | 4 + packages/streams/src/generated/types.ts | 248 ++++++++++++++---- packages/streams/src/methods/updateStatus.ts | 33 +++ .../streams/src/resolvers/replayHistory.ts | 20 ++ .../src/resolvers/updateStreamEvmStatus.ts | 26 ++ 6 files changed, 280 insertions(+), 56 deletions(-) create mode 100644 .changeset/sixty-tips-switch.md create mode 100644 packages/streams/src/methods/updateStatus.ts create mode 100644 packages/streams/src/resolvers/replayHistory.ts create mode 100644 packages/streams/src/resolvers/updateStreamEvmStatus.ts diff --git a/.changeset/sixty-tips-switch.md b/.changeset/sixty-tips-switch.md new file mode 100644 index 0000000000..4cb568417f --- /dev/null +++ b/.changeset/sixty-tips-switch.md @@ -0,0 +1,5 @@ +--- +'@moralisweb3/streams': minor +--- + +Add `Moralis.Streams.retry()` to retry failed webhooks, and add `Moralis.Streams.updateStatus()` to update the status of a webhook. diff --git a/packages/streams/src/MoralisStreams.ts b/packages/streams/src/MoralisStreams.ts index 0ff6dd440f..0346235d5f 100644 --- a/packages/streams/src/MoralisStreams.ts +++ b/packages/streams/src/MoralisStreams.ts @@ -8,6 +8,8 @@ import { GetStreamsOptions, getStreams } from './methods/getAll'; import { makeVerifySignature, VerifySignatureOptions } from './methods/verifySignature'; import { parseLog, ParseLogOptions } from './methods/logParser'; import { getHistory } from './resolvers/getHistory'; +import { replayHistory } from './resolvers/replayHistory'; +import { updateStreamStatus, UpdateStreamStatusOptions } from './methods/updateStatus'; export const BASE_URL = 'https://streams-api.aws-prod-streams-master-1.moralis.io'; @@ -36,8 +38,10 @@ export class MoralisStreams extends ApiModule { public readonly update = (options: UpdateStreamOptions) => updateStream(this.core)(options); public readonly delete = (options: DeleteStreamOptions) => deleteStream(this.core)(options); public readonly getAll = (options: GetStreamsOptions) => getStreams(this.core)(options); + public readonly updateStatus = (options: UpdateStreamStatusOptions) => updateStreamStatus(this.core)(options); public readonly getHistory = this.endpoints.createFetcher(getHistory); + public readonly retry = this.endpoints.createFetcher(replayHistory); public readonly setSettings = this.endpoints.createFetcher(setSettings); private readonly _readSettings = this.endpoints.createFetcher(getSettings); diff --git a/packages/streams/src/generated/types.ts b/packages/streams/src/generated/types.ts index e10282f27c..91322f3ffd 100644 --- a/packages/streams/src/generated/types.ts +++ b/packages/streams/src/generated/types.ts @@ -7,12 +7,20 @@ export interface paths { "/history": { get: operations["GetHistory"]; }; + "/history/replay/{id}": { + /** Replay a specific history. */ + post: operations["ReplayHistory"]; + }; "/settings": { /** Get the settings for the current project based on the project api-key. */ get: operations["GetSettings"]; /** Set the settings for the current project based on the project api-key. */ post: operations["SetSettings"]; }; + "/beta/stats": { + /** Get the stats for the current project based on the project api-key (Beta - This endpoint could be replaced or removed). */ + get: operations["GetStats"]; + }; "/streams/evm": { /** Get all the evm streams for the current project based on the project api-key. */ get: operations["GetStreams"]; @@ -27,97 +35,142 @@ export interface paths { /** Delete a specific evm stream. */ delete: operations["DeleteStream"]; }; + "/streams/evm/{id}/status": { + /** Updates the status of specific evm stream. */ + post: operations["UpdateStreamStatus"]; + }; } export interface components { schemas: { - ILog: { - transaction_hash: string; - /** Format: double */ - transaction_index: number; - /** Format: double */ - log_index: number; - tag: string; + WebhookBlock: { + number: string; + hash: string; + timestamp: string; + }; + Log: { + logIndex: string; + transactionHash: string; address: string; - topic0: string; - topic1: string; - topic2: string; - topic3: string; - data: string; - streamId: string; - }; - ITransaction: { [key: string]: string }; - ITransactionInternal: { - from: string; - to: string; - value: string; - gas: string; - transaction_hash: string; + data: string | null; + topic0: string | null; + topic1: string | null; + topic2: string | null; + topic3: string | null; }; - IERC20Transfer: { - transaction_hash: string; + Transaction: { + hash: string; + gas: string | null; + gasPrice: string | null; + nonce: string | null; + input: string | null; + transactionIndex: string; + fromAddress: string; + toAddress: string | null; + value: string | null; + type: string | null; + v: string | null; + r: string | null; + s: string | null; + receiptCumulativeGasUsed: string | null; + receiptGasUsed: string | null; + receiptContractAddress: string | null; + receiptRoot: string | null; + receiptStatus: string | null; + }; + InternalTransaction: { + from: string | null; + to: string | null; + value: string | null; + transactionHash: string; + gas: string | null; + }; + AbiInput: { + name: string; + type: string; + indexed?: boolean; + components?: components["schemas"]["AbiInput"][]; + internalType?: string; + }; + AbiOutput: { + name: string; + type: string; + components?: components["schemas"]["AbiOutput"][]; + internalType?: string; + }; + /** @enum {string} */ + StateMutabilityType: "pure" | "view" | "nonpayable" | "payable"; + /** @enum {string} */ + AbiType: "function" | "constructor" | "event" | "fallback"; + AbiItem: { + anonymous?: boolean; + constant?: boolean; + inputs?: components["schemas"]["AbiInput"][]; + name?: string; + outputs?: components["schemas"]["AbiOutput"][]; + payable?: boolean; + stateMutability?: components["schemas"]["StateMutabilityType"]; + type: components["schemas"]["AbiType"]; /** Format: double */ - transaction_index: number; + gas?: number; + }; + IAbi: { [key: string]: components["schemas"]["AbiItem"] }; + IERC20Transfer: { + transactionHash: string; + tokenAddress: string; /** Format: double */ - log_index: number; + logIndex: number; tag: string; - contractAddress: string; from: string; to: string; amount: string; valueWithDecimals: string; }; IERC20Approval: { - transaction_hash: string; - /** Format: double */ - transaction_index: number; + transactionHash: string; + tokenAddress: string; /** Format: double */ - log_index: number; + logIndex: number; tag: string; - contractAddress: string; owner: string; spender: string; value: string; valueWithDecimals: string; }; INFTTransfer: { - transaction_hash: string; - /** Format: double */ - transaction_index: number; + transactionHash: string; + tokenAddress: string; /** Format: double */ - log_index: number; + logIndex: number; tag: string; - contractAddress: string; from: string; to: string; tokenId: string; }; INFTApproval: { - transaction_hash: string; + transactionHash: string; + tokenAddress: string; /** Format: double */ - transaction_index: number; - /** Format: double */ - log_index: number; + logIndex: number; tag: string; account: string; operator: string; approved: boolean; }; "WebhookTypes.IWebhook": { - /** Format: double */ - block: number; + erc20Transfers: components["schemas"]["IERC20Transfer"][]; + erc20Approvals: components["schemas"]["IERC20Approval"][]; + nftTransfers: components["schemas"]["INFTTransfer"][]; + nftApprovals: components["schemas"]["INFTApproval"][]; + block: components["schemas"]["WebhookBlock"]; chainId: string; - logs: components["schemas"]["ILog"][]; - txs: components["schemas"]["ITransaction"][]; - txsInternal: components["schemas"]["ITransactionInternal"][]; - abis: unknown; + logs: components["schemas"]["Log"][]; + txs: components["schemas"]["Transaction"][]; + txsInternal: components["schemas"]["InternalTransaction"][]; + abis: components["schemas"]["IAbi"]; /** Format: double */ retries: number; confirmed: boolean; - erc20Transfers?: components["schemas"]["IERC20Transfer"][]; - erc20Approvals?: components["schemas"]["IERC20Approval"][]; - nftTransfers?: components["schemas"]["INFTTransfer"][]; - nftApprovals?: components["schemas"]["INFTApproval"][]; }; HistoryModel: { id: string; @@ -130,6 +183,12 @@ export interface components { result: components["schemas"]["HistoryModel"][]; cursor?: string; }; + /** + * Format: uuid + * @description Stringified UUIDv4. + * See [RFC 4112](https://tools.ietf.org/html/rfc4122) + */ + UUID: string; /** @enum {string} */ SettingsRegion: | "us-east-1" @@ -140,12 +199,33 @@ export interface components { /** @description The region from where all the webhooks will be posted for this project */ region?: components["schemas"]["SettingsRegion"]; }; - /** - * Format: uuid - * @description Stringified UUIDv4. - * See [RFC 4112](https://tools.ietf.org/html/rfc4122) - */ - UUID: string; + StatsModel: { + /** + * Format: double + * @description The total amount of webhooks delivered across all streams + */ + totalWebhooksDelivered: number; + /** + * Format: double + * @description The total amount of failed webhooks across all streams + */ + totalWebhooksFailed: number; + /** + * Format: double + * @description The total amount of logs processed across all streams, this includes failed webhooks + */ + totalLogsProcessed: number; + /** + * Format: double + * @description The total amount of txs processed across all streams, this includes failed webhooks + */ + totalTxsProcessed: number; + /** + * Format: double + * @description The total amount of internal txs processed across all streams, this includes failed webhooks + */ + totalTxsInternalProcessed: number; + }; /** * @description The stream status: * [active] The Stream is healthy and processing blocks @@ -231,6 +311,10 @@ export interface components { /** @description The type of stream to create log or tx */ type: components["schemas"]["StreamsType"]; }; + StreamsStatusUpdate: { + /** @description The status of the stream. */ + status: components["schemas"]["StreamsStatus"]; + }; }; responses: {}; parameters: {}; @@ -255,6 +339,23 @@ export interface operations { }; }; }; + /** Replay a specific history. */ + ReplayHistory: { + parameters: { + path: { + /** The id of the history to replay */ + id: components["schemas"]["UUID"]; + }; + }; + responses: { + /** Ok */ + 200: { + content: { + "application/json": components["schemas"]["HistoryModel"]; + }; + }; + }; + }; /** Get the settings for the current project based on the project api-key. */ GetSettings: { parameters: {}; @@ -280,6 +381,18 @@ export interface operations { }; }; }; + /** Get the stats for the current project based on the project api-key (Beta - This endpoint could be replaced or removed). */ + GetStats: { + parameters: {}; + responses: { + /** Ok */ + 200: { + content: { + "application/json": components["schemas"]["StatsModel"]; + }; + }; + }; + }; /** Get all the evm streams for the current project based on the project api-key. */ GetStreams: { parameters: { @@ -374,6 +487,29 @@ export interface operations { }; }; }; + /** Updates the status of specific evm stream. */ + UpdateStreamStatus: { + parameters: { + path: { + /** The id of the stream to update */ + id: components["schemas"]["UUID"]; + }; + }; + responses: { + /** Ok */ + 200: { + content: { + "application/json": components["schemas"]["StreamsModel"]; + }; + }; + }; + /** Provide a Stream Model */ + requestBody: { + content: { + "application/json": components["schemas"]["StreamsStatusUpdate"]; + }; + }; + }; } export interface external {} diff --git a/packages/streams/src/methods/updateStatus.ts b/packages/streams/src/methods/updateStatus.ts new file mode 100644 index 0000000000..a7437cdfb5 --- /dev/null +++ b/packages/streams/src/methods/updateStatus.ts @@ -0,0 +1,33 @@ +import { BASE_URL } from '../MoralisStreams'; +import { EndpointResolver } from '@moralisweb3/api-utils'; +import MoralisCore, { MoralisStreamError, StreamErrorCode } from '@moralisweb3/core'; +import { updateStreamEvmStatus } from '../resolvers/updateStreamEvmStatus'; +export enum StreamNetwork { + EVM = 'evm', +} + +export interface UpdateStreamEvmStatusOptions { + network: 'evm'; + id: string; + status: 'active' | 'paused' | 'error'; +} + +export type UpdateStreamStatusOptions = UpdateStreamEvmStatusOptions; + +const makeUpdateStreamEvmStatus = (core: MoralisCore, { network, ...options }: UpdateStreamStatusOptions) => { + return EndpointResolver.create(core, BASE_URL, updateStreamEvmStatus).fetch(options); +}; + +export const updateStreamStatus = (core: MoralisCore) => (options: UpdateStreamStatusOptions) => { + switch (options.network) { + case StreamNetwork.EVM: + return makeUpdateStreamEvmStatus(core, options); + default: + throw new MoralisStreamError({ + code: StreamErrorCode.INCORRECT_NETWORK, + message: `Incorrect network provided. Got "${options.network}", Valid values are: ${Object.values(StreamNetwork) + .map((value) => `"${value}"`) + .join(', ')}`, + }); + } +}; diff --git a/packages/streams/src/resolvers/replayHistory.ts b/packages/streams/src/resolvers/replayHistory.ts new file mode 100644 index 0000000000..2173b694be --- /dev/null +++ b/packages/streams/src/resolvers/replayHistory.ts @@ -0,0 +1,20 @@ +import { createEndpoint, createEndpointFactory } from '@moralisweb3/api-utils'; +import { operations } from '../generated/types'; + +const name = 'ReplayHistory'; + +type Name = typeof name; +type ApiResult = operations[Name]['responses']['200']['content']['application/json']; +type PathParams = operations[Name]['parameters']['path']; +type ApiParams = PathParams; +type Params = ApiParams; + +export const replayHistory = createEndpointFactory(() => + createEndpoint({ + name, + getUrl: (id: Params) => `/history/replay/${id}`, + apiToResult: (data: ApiResult) => data, + resultToJson: (data) => data, + parseParams: (params: Params): ApiParams => params, + }), +); diff --git a/packages/streams/src/resolvers/updateStreamEvmStatus.ts b/packages/streams/src/resolvers/updateStreamEvmStatus.ts new file mode 100644 index 0000000000..c23b8d2240 --- /dev/null +++ b/packages/streams/src/resolvers/updateStreamEvmStatus.ts @@ -0,0 +1,26 @@ +import { createEndpoint, createEndpointFactory } from '@moralisweb3/api-utils'; +import { operations } from '../generated/types'; + +const name = 'UpdateStreamStatus'; + +type Name = typeof name; +const method = 'post'; + +type PathParams = operations[Name]['parameters']['path']; +type BodyParams = operations[Name]['requestBody']['content']['application/json']; +type ApiParams = PathParams & BodyParams; +type Params = ApiParams; +const bodyParams = ['status'] as const; +type ApiResult = operations[Name]['responses']['200']['content']['application/json']; + +export const updateStreamEvmStatus = createEndpointFactory(() => + createEndpoint({ + name, + getUrl: ({ id }: Params) => `/streams/evm/${id}/status`, + apiToResult: (data: ApiResult) => data, + resultToJson: (data) => data, + parseParams: (params: ApiParams) => params, + bodyParams, + method, + }), +);