diff --git a/README.md b/README.md index 45cb94ca..8cb4c1f2 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,7 @@ let fdkClient = setupFdk({ notification_email: "test@abc.com", // required subscribe_on_install: false, //optional. Default true subscribed_saleschannel: 'specific', //optional. Default all + marketplace: true, // to receive marketplace saleschannel events. Only allowed when subscribed_saleschannel is set to specific event_map: { // required 'company/brand/create': { version: '1', @@ -167,6 +168,27 @@ let fdkClient = setupFdk({ version: '1', topic: 'coupon_create_kafka_topic', provider: 'kafka' + }, + 'company/brand/update': { + version: '1', + topic: "company-brand-create", + provider: 'pub_sub' + }, + 'extension/extension/install': { + version: '1', + queue: "extension-install", + workflow_name: "extension", + provider: 'temporal' + }, + 'company/location/create': { + version: '1', + queue: "company-location-create", + provider: 'sqs' + }, + 'company/product-size/create': { + version: '1', + event_bridge_name: "company-product-size-create", + provider: 'event_bridge' } } }, @@ -197,7 +219,32 @@ app.post('/api/v1/webhooks', async (req, res, next) => { > Setting `subscribed_saleschannel` as "specific" means, you will have to manually subscribe saleschannel level event for individual saleschannel. Default value here is "all" and event will be subscribed for all sales channels. For enabling events manually use function `enableSalesChannelWebhook`. To disable receiving events for a saleschannel use function `disableSalesChannelWebhook`. +#### Filters and reducers in webhook events + +A filter and reducer can be provided to refine the data delivered for each subscribed event. The Filter functionality allows selective delivery of data by specifying conditions based on JSONPath queries and logical operators. Reducer allow customization of the payload structure by specifying only the fields needed by the subscriber. The reducer extracts fields from the event’s data and restructures them as needed. +```javascript +webhook_config: { + api_path: "/v1.0/webhooks", + notification_email: "rahultambe@gofynd.com", + marketplace: true, + subscribed_saleschannel: 'specific', + event_map: { + 'company/brand/update': { + version: '1', + handler: handleExtensionUninstall, + filters: { + query: "$.brand.uid", + condition: "(uid) => uid === 238" + }, + reducer: { + brand_name: "$.brand.name", + logo_link: "$.brand.logo" + } + }] + } +} +``` ##### How webhook registery subscribes to webhooks on Fynd Platform? After webhook config is passed to setupFdk whenever extension is launched to any of companies where extension is installed or to be installed, webhook config data is used to create webhook subscriber on Fynd Platform for that company. diff --git a/express/webhook.js b/express/webhook.js index 17fd5443..52bfe212 100644 --- a/express/webhook.js +++ b/express/webhook.js @@ -7,18 +7,19 @@ const { TEST_WEBHOOK_EVENT_NAME, ASSOCIATION_CRITERIA } = require("./constants") const { FdkWebhookProcessError, FdkWebhookHandlerNotFound, FdkWebhookRegistrationError, FdkInvalidHMacError, FdkInvalidWebhookConfig } = require("./error_code"); const logger = require("./logger"); const { RetryManger } = require("./retry_manager"); +const _ = require('lodash'); let eventConfig = {} class WebhookRegistry { constructor(retryManager) { - this._handlerMap = null; - this._topicMap = null; + this._eventMap = null; this._config = null; this._fdkConfig = null; this._retryManager = retryManager; } async initialize(config, fdkConfig) { + const emailRegex = new RegExp(/^\S+@\S+\.\S+$/, 'gi'); if (!config.notification_email || !emailRegex.test(config.notification_email)) { throw new FdkInvalidWebhookConfig(`Invalid or missing "notification_email"`); @@ -26,18 +27,24 @@ class WebhookRegistry { if (!config.api_path || config.api_path[0] !== '/') { throw new FdkInvalidWebhookConfig(`Invalid or missing "api_path"`); } + if(config.marketplace == true && config.subscribed_saleschannel != "specific"){ + throw new FdkInvalidWebhookConfig(`marketplace is only allowed when subscribed_saleschannel is specific"`); + } if (!config.event_map || !Object.keys(config.event_map).length) { throw new FdkInvalidWebhookConfig(`Invalid or missing "event_map"`); } config.subscribe_on_install = config.subscribe_on_install === undefined ? true : config.subscribe_on_install; - this._handlerMap = {}; - this._topicMap = {}; + this._eventMap = { + rest: {}, + kafka: {}, + pub_sub: {}, + sqs: {}, + event_bridge: {}, + temporal: {} + }; this._config = config; this._fdkConfig = fdkConfig; - - const handlerConfig = {}; - const topicConfig = {}; - + for (let [eventName, eventData] of Object.entries(this._config.event_map)) { // Validate Webhook Event Map if (eventName.split('/').length !== 3) { @@ -50,28 +57,45 @@ class WebhookRegistry { if(!eventData.hasOwnProperty("provider")){ eventData.provider = 'rest'; } - const allowedProviders = ['kafka', 'rest']; + const allowedProviders = ['kafka', 'rest', 'pub_sub', 'temporal', 'sqs', 'event_bridge']; if(!allowedProviders.includes(eventData.provider)){ throw new FdkInvalidWebhookConfig(`Invalid provider value in webhook event ${eventName}, allowed values are ${allowedProviders.toString()}`); } if(eventData.provider === 'rest' && !eventData.hasOwnProperty("handler")){ throw new FdkInvalidWebhookConfig(`Missing handler in webhook event ${eventName}`); } - else if(eventData.provider === 'kafka' && !eventData.hasOwnProperty("topic")){ + else if((eventData.provider === 'kafka' || eventData.provider === 'pub_sub' ) && !eventData.hasOwnProperty("topic")){ throw new FdkInvalidWebhookConfig(`Missing topic in webhook event ${eventName}`); + }else if((eventData.provider === 'temporal' || eventData.provider === 'sqs' )&& !eventData.hasOwnProperty("queue")){ + throw new FdkInvalidWebhookConfig(`Missing queue in webhook event ${eventName}`); + }else if(eventData.provider === 'temporal' && !eventData.hasOwnProperty("workflow_name")){ + throw new FdkInvalidWebhookConfig(`Missing workflow_name in webhook event ${eventName}`); + }else if(eventData.provider === 'event_bridge' && !eventData.hasOwnProperty("event_bridge_name")){ + throw new FdkInvalidWebhookConfig(`Missing event_bridge_name in webhook event ${eventName}`); } - if(eventData.provider === 'rest'){ - handlerConfig[eventName + '/v' + eventData.version] = eventData; + + if("filters" in eventData && typeof eventData.filters != 'object'){ + throw new FdkInvalidWebhookConfig(`filters should be an object in webhook event ${eventName}`); } - if(eventData.provider === 'kafka'){ - topicConfig[eventName + '/v' + eventData.version] = eventData; + + if("reducer" in eventData && typeof eventData.reducer != 'object'){ + throw new FdkInvalidWebhookConfig(`reducer should be an object in webhook event ${eventName}`); } - + + this._eventMap[eventData.provider][eventName + '/v' + eventData.version] = eventData; } - await this.getEventConfig({...handlerConfig, ...topicConfig}); // get event config for required event_map in eventConfig.event_configs - eventConfig.eventsMap = this._getEventIdMap(eventConfig.event_configs); // generate eventIdMap from eventConfig.event_configs - this._validateEventsMap({...handlerConfig, ...topicConfig}); + let allEventMap = {...this._eventMap.rest, + ...this._eventMap.kafka, + ...this._eventMap.sqs, + ...this._eventMap.pub_sub, + ...this._eventMap.temporal, + ...this._eventMap.event_bridge}; + // get event config for required event_map in eventConfig.event_configs + await this.getEventConfig(allEventMap); + // generate eventIdMap from eventConfig.event_configs + eventConfig.eventsMap = this._getEventIdMap(eventConfig.event_configs); + this._validateEventsMap(allEventMap); if(Object.keys(eventConfig.eventsNotFound).length){ let errors = [] @@ -80,13 +104,11 @@ class WebhookRegistry { }) throw new FdkInvalidWebhookConfig(`Webhooks events ${errors.join(' and ')} not found`); } - this._handlerMap = handlerConfig; - this._topicMap = topicConfig; logger.debug('Webhook registry initialized'); } get isInitialized() { - return !!this._handlerMap || !!this._topicMap; + return !!this._eventMap; } get isSubscribeOnInstall(){ @@ -144,6 +166,27 @@ class WebhookRegistry { subscriberConfig.webhook_url = this._webhookUrl; updated = true; } + + // type marketplace is allowed only when association criteria is specific + if(configCriteria == ASSOCIATION_CRITERIA.SPECIFIC){ + if((subscriberConfig.type == 'marketplace' && !this._config.marketplace)){ + logger.debug(`Type updated from ${subscriberConfig.type} to null`); + subscriberConfig.type = null; + updated = true; + }else if (((!subscriberConfig.type || subscriberConfig.type != "marketplace") && this._config.marketplace) ){ + logger.debug(`Type updated from ${subscriberConfig.type} to marketplace`); + subscriberConfig.type = "marketplace"; + updated = true + } + }else { + if(subscriberConfig.type == 'marketplace'){ + logger.debug(`Type updated from ${subscriberConfig.type} to null`); + subscriberConfig.type = null; + updated = true; + } + } + + return updated; } @@ -155,11 +198,140 @@ class WebhookRegistry { throw new FdkInvalidWebhookConfig('Webhook registry not initialized'); } logger.debug('Webhook sync events started'); - + let subscriberConfigList = await this.getSubscriberConfig(platformClient); - await this.syncSubscriberConfig(subscriberConfigList.rest, 'rest', this._handlerMap, platformClient, enableWebhooks); - await this.syncSubscriberConfig(subscriberConfigList.kafka, 'kafka', this._topicMap, platformClient, enableWebhooks); + let subscriberSyncedForAllProvider = await this.syncSubscriberConfigForAllProviders(platformClient, subscriberConfigList) + + // v3.0 upsert put api does not exist + if(!subscriberSyncedForAllProvider){ + let subscriberConfigList = await this.getSubscriberConfig(platformClient); + await this.syncSubscriberConfig(subscriberConfigList.rest, 'rest', this._eventMap.rest, platformClient, enableWebhooks); + + await this.syncSubscriberConfig(subscriberConfigList.kafka, 'kafka', this._eventMap.kafka , platformClient, enableWebhooks); + + await this.syncSubscriberConfig(subscriberConfigList.pub_sub, 'pub_sub', this._eventMap.pub_sub , platformClient, enableWebhooks); + + await this.syncSubscriberConfig(subscriberConfigList.sqs, 'sqs', this._eventMap.sqs , platformClient, enableWebhooks); + + await this.syncSubscriberConfig(subscriberConfigList.event_bridge, 'event_bridge', this._eventMap.event_bridge , platformClient, enableWebhooks); + + await this.syncSubscriberConfig(subscriberConfigList.temporal, 'temporal', this._eventMap.temporal , platformClient, enableWebhooks); + } + + } + + /* this will call the v3.0 upsert put api which will handle syncing in a single api call. + In case the api does not exist we need to fallback to v2.0 api */ + async syncSubscriberConfigForAllProviders(platformClient, subscriberConfigList){ + let payload = this.createRegisterPayloadData(subscriberConfigList); + const uniqueKey = `registerSubscriberToEventForAllProvider_${platformClient.config.companyId}_${this._fdkConfig.api_key}`; + const token = await platformClient.config.oauthClient.getAccessToken(); + const retryInfo = this._retryManager.retryInfoMap.get(uniqueKey); + if (retryInfo && !retryInfo.isRetry) { + this._retryManager.resetRetryState(uniqueKey); + } + try { + try{ + const rawRequest = { + method: "put", + url: `${this._fdkConfig.cluster}/service/platform/webhook/v3.0/company/${platformClient.config.companyId}/subscriber`, + data: payload, + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + "x-ext-lib-version": `js/${version}` + } + } + let response = await fdkAxios.request(rawRequest); + return true; + } + catch(err){ + if(err.code != '404'){ + throw err; + } + return false; + } + } catch(err) { + if ( + RetryManger.shouldRetryOnError(err) + && !this._retryManager.isRetryInProgress(uniqueKey) + ) { + return await this._retryManager.retry( + uniqueKey, + this.syncSubscriberConfigForAllProviders.bind(this), + platformClient + ); + } + throw new FdkWebhookRegistrationError(`Error while updating webhook subscriber configuration for all providers. Reason: ${err.message}. Errors: ${JSON.stringify(err?.details)}`); + } + + } + + createRegisterPayloadData(subscriberConfigList){ + let payload = { + "webhook_config": { + notification_email: this._config.notification_email, + name: this._fdkConfig.api_key, + association: { + "extension_id": this._fdkConfig.api_key, + "application_id": [], + "criteria": this._associationCriteria([]) + }, + status: "active", + event_map: { + + } + } + }; + + const configKeys = Object.keys(subscriberConfigList); + //Every provider has same association. Get the first one. + if (this._config.subscribed_saleschannel === 'specific' && configKeys.length > 0) { + const firstConfig = subscriberConfigList[configKeys[0]]; + if ( firstConfig?.association?.criteria == ASSOCIATION_CRITERIA.SPECIFIC) { + payload.webhook_config.association = firstConfig.association; + } + } + + let payloadEventMap = payload.webhook_config.event_map; + for(let [key, event] of Object.entries(this._config.event_map)) { + if(!payloadEventMap[event.provider]){ + payloadEventMap[event.provider] = {} + if(payload.webhook_config?.association?.criteria == ASSOCIATION_CRITERIA.SPECIFIC){ + payloadEventMap[event.provider] = { + type: this._config.marketplace ? 'marketplace' : null + } + } + payloadEventMap[event.provider].events = [] + if(event.provider == 'rest'){ + payloadEventMap[event.provider] = { + ...payloadEventMap[event.provider], + webhook_url: this._webhookUrl, + auth_meta: { + type: 'hmac', + secret: this._fdkConfig.api_secret + } + } + } + + } + let [event_category, event_name, event_type] = key.split('/'); + let eventData = { + event_category, + event_name, + event_type, + version: event.version, + topic: event.topic, + queue: event.queue, + workflow_name: event.workflow_name, + event_bridge_name: event.event_bridge_name, + filters: event.filters, + reducer: event.reducer + }; + payloadEventMap[event.provider].events.push(eventData); + }; + return payload; } async syncSubscriberConfig(subscriberConfig, configType, currentEventMapConfig, platformClient, enableWebhooks){ @@ -183,7 +355,7 @@ class WebhookRegistry { }, "events": [], "provider": configType, - "email_id": this._config.notification_email + "email_id": this._config.notification_email, } if(configType === 'rest'){ subscriberConfig['webhook_url'] = this._webhookUrl; @@ -195,13 +367,19 @@ class WebhookRegistry { } else { logger.debug(`Webhook ${configType} config on platform side for company id ${platformClient.config.companyId}: ${JSON.stringify(subscriberConfig)}`) - const { id, name, webhook_url, provider="rest", association, status, auth_meta, event_configs, email_id } = subscriberConfig - subscriberConfig = { id, name, webhook_url, provider, association, status, auth_meta, email_id }; - subscriberConfig.events = []; + + const { id, name, webhook_url, provider="rest", association, status, auth_meta, event_configs, email_id, type } = subscriberConfig + subscriberConfig = { id, name, webhook_url, provider, association, status, auth_meta, email_id, type }; + subscriberConfig.events = []; existingEvents = event_configs.map(event => { return { 'slug': `${event.event_category}/${event.event_name}/${event.event_type}/v${event.version}`, - 'topic': event?.subscriber_event_mapping?.broadcaster_config?.topic + 'topic': event?.subscriber_event_mapping?.broadcaster_config?.topic, + 'queue': event?.subscriber_event_mapping?.broadcaster_config?.queue, + 'event_bridge_name': event?.subscriber_event_mapping?.broadcaster_config?.event_bridge_name, + 'workflow_name': event?.subscriber_event_mapping?.broadcaster_config?.workflow_name, + 'filters': event?.subscriber_event_mapping?.filters, + 'reducer': event?.subscriber_event_mapping?.reducer } }); // Checking Configuration Updates @@ -226,7 +404,13 @@ class WebhookRegistry { let event_id = eventConfig.eventsMap[eventName] if (event_id) { const event = { - slug: eventName + slug: eventName, + topic: currentEventMapConfig[eventName]?.topic, + queue: currentEventMapConfig[eventName]?.queue, + event_bridge_name: currentEventMapConfig[eventName]?.event_bridge_name, + workflow_name: currentEventMapConfig[eventName]?.workflow_name, + filters: currentEventMapConfig[eventName]?.filters, + reducer: currentEventMapConfig[eventName]?.reducer } if(currentEventMapConfig[eventName].hasOwnProperty('topic')){ event['topic'] = currentEventMapConfig[eventName].topic; @@ -253,13 +437,41 @@ class WebhookRegistry { ...existingEvents.filter(event => !subscriberConfig.events.find(item => item.slug === event.slug)) ] - // Check if only topic is changed for same kafka events - if(configType === 'kafka' && !configUpdated){ + //keys to check for updates in subscriberConfig for different config type + let configTypeKeysToCheck = { + 'kafka': ['topic'], + 'pub_sub': ['topic'], + 'temporal': ['queue', 'workflow_name'], + 'sqs': ['queue'], + 'event_bridge': ['event_bridge_name'], + 'rest': [] + } + + //key to check which are common across all config type + let commonKeys = ['filters','reducer'] + + // check if these keys have changed + if(!configUpdated){ for(const event of subscriberConfig.events){ const existingEvent = existingEvents.find(e => e.slug === event.slug); - if(existingEvent && !(event.topic === existingEvent.topic)){ - configUpdated = true - break; + + if(existingEvent){ + + //compare config related keys + for(let key of configTypeKeysToCheck[configType]){ + if(!(event[key] === existingEvent[key])){ + configUpdated = true; + break + } + } + + //compare common keys + for(let key of commonKeys){ + if(!_.isEqual(event[key], existingEvent[key])){ + configUpdated = true; + break + } + } } } } @@ -296,10 +508,11 @@ class WebhookRegistry { subscriberConfig = { id, name, webhook_url, provider, association, status, auth_meta, email_id }; subscriberConfig.events = event_configs.map(event => { const eventObj = { - slug: `${event.event_category}/${event.event_name}/${event.event_type}/v${event.version}` - } - if(subscriberConfig.provider === 'kafka'){ - eventObj['topic'] = event?.subscriber_event_mapping?.broadcaster_config?.topic + 'slug': `${event.event_category}/${event.event_name}/${event.event_type}/v${event.version}`, + 'topic': event?.subscriber_event_mapping?.broadcaster_config?.topic, + 'queue': event?.subscriber_event_mapping?.broadcaster_config?.queue, + 'event_bridge_name': event?.subscriber_event_mapping?.broadcaster_config?.event_bridge_name, + 'workflow_name': event?.subscriber_event_mapping?.broadcaster_config?.workflow_name } return eventObj; }); @@ -309,6 +522,10 @@ class WebhookRegistry { arrApplicationId.push(applicationId); subscriberConfig.association.application_id = arrApplicationId; subscriberConfig.association.criteria = this._associationCriteria(subscriberConfig.association.application_id); + + if(subscriberConfig?.association?.criteria == ASSOCIATION_CRITERIA.SPECIFIC){ + subscriberConfig.type = this._config.marketplace ? 'marketplace' : null; + } await this.updateSubscriberConfig(platformClient, subscriberConfig); logger.debug(`Webhook enabled for saleschannel: ${applicationId}`); } @@ -337,11 +554,13 @@ class WebhookRegistry { subscriberConfig = { id, name, webhook_url, provider, association, status, auth_meta, email_id }; subscriberConfig.events = event_configs.map(event => { const eventObj = { - slug: `${event.event_category}/${event.event_name}/${event.event_type}/v${event.version}` - } - if(subscriberConfig.provider === 'kafka'){ - eventObj['topic'] = event?.subscriber_event_mapping?.broadcaster_config?.topic + 'slug': `${event.event_category}/${event.event_name}/${event.event_type}/v${event.version}`, + 'topic': event?.subscriber_event_mapping?.broadcaster_config?.topic, + 'queue': event?.subscriber_event_mapping?.broadcaster_config?.queue, + 'event_bridge_name': event?.subscriber_event_mapping?.broadcaster_config?.event_bridge_name, + 'workflow_name': event?.subscriber_event_mapping?.broadcaster_config?.workflow_name } + return eventObj; }); const arrApplicationId = subscriberConfig.association.application_id; @@ -350,6 +569,11 @@ class WebhookRegistry { if (rmIndex > -1) { arrApplicationId.splice(rmIndex, 1); subscriberConfig.association.criteria = this._associationCriteria(subscriberConfig.association.application_id); + if(subscriberConfig?.association?.criteria == ASSOCIATION_CRITERIA.SPECIFIC){ + subscriberConfig.type = this._config.marketplace ? 'marketplace' : null; + }else{ + subscriberConfig.type = null; + } await this.updateSubscriberConfig(platformClient, subscriberConfig); logger.debug(`Webhook disabled for saleschannel: ${applicationId}`); } @@ -383,7 +607,7 @@ class WebhookRegistry { const eventName = `${body.event.category}/${body.event.name}/${body.event.type}/v${body.event.version}`; - const eventHandlerMap = (this._handlerMap[eventName] || {}); + const eventHandlerMap = (this._eventMap.rest[eventName] || {}); const extHandler = eventHandlerMap.handler; if (typeof extHandler === 'function') { @@ -568,11 +792,8 @@ class WebhookRegistry { const subscriberConfig = {}; subscriberConfigResponse.items.forEach((config) => { - if(config.provider === 'kafka'){ - subscriberConfig['kafka'] = config; - } - else{ - subscriberConfig['rest'] = config; + if(config.provider){ + subscriberConfig[config.provider] = config } }) diff --git a/package-lock.json b/package-lock.json index a9eaec8c..d013be6a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "axios": "^1.6.4", "crypto-js": "^4.2.0", + "lodash": "^4.17.21", "querystring": "^0.2.1", "url-join": "^4.0.1", "uuid": "^8.3.2", @@ -2747,8 +2748,7 @@ "node_modules/lodash": { "version": "4.17.21", "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", - "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", - "dev": true + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" }, "node_modules/lodash.defaults": { "version": "4.2.0", @@ -6774,8 +6774,7 @@ "lodash": { "version": "4.17.21", "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", - "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", - "dev": true + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" }, "lodash.defaults": { "version": "4.2.0", diff --git a/package.json b/package.json index 0c7dea6a..68a44f10 100644 --- a/package.json +++ b/package.json @@ -2,6 +2,7 @@ "dependencies": { "axios": "^1.6.4", "crypto-js": "^4.2.0", + "lodash": "^4.17.21", "querystring": "^0.2.1", "url-join": "^4.0.1", "uuid": "^8.3.2",