diff --git a/marketing-analytics/activation/common-libs/nodejs-common/bin/install_functions.sh b/marketing-analytics/activation/common-libs/nodejs-common/bin/install_functions.sh index d047105..b44a961 100755 --- a/marketing-analytics/activation/common-libs/nodejs-common/bin/install_functions.sh +++ b/marketing-analytics/activation/common-libs/nodejs-common/bin/install_functions.sh @@ -1560,6 +1560,7 @@ set_cloud_functions_default_settings() { default_cf_flag+=(--set-env-vars=DEBUG="${DEBUG}") default_cf_flag+=(--set-env-vars=IN_GCP="${IN_GCP}") default_cf_flag+=(--set-env-vars=DATABASE_ID="${DATABASE_ID}") + default_cf_flag+=(--set-env-vars=DATABASE_MODE="${DATABASE_MODE}") } ####################################### diff --git a/marketing-analytics/activation/common-libs/nodejs-common/package.json b/marketing-analytics/activation/common-libs/nodejs-common/package.json index fe28dbb..8037bd9 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/package.json +++ b/marketing-analytics/activation/common-libs/nodejs-common/package.json @@ -1,6 +1,6 @@ { "name": "@google-cloud/nodejs-common", - "version": "2.1.0", + "version": "2.2.0", "description": "A NodeJs common library for solutions based on Cloud Functions", "author": "Google Inc.", "license": "Apache-2.0", @@ -16,24 +16,24 @@ }, "homepage": "https://github.com/GoogleCloudPlatform/cloud-for-marketing/blob/master/marketing-analytics/activation/common-libs/nodejs-common/README.md", "dependencies": { - "@google-cloud/aiplatform": "^3.10.0", + "@google-cloud/aiplatform": "^3.17.0", "@google-cloud/automl": "^4.0.1", - "@google-cloud/bigquery": "^7.3.0", - "@google-cloud/datastore": "^8.4.0", - "@google-cloud/firestore": "^7.2.0", + "@google-cloud/bigquery": "^7.5.2", + "@google-cloud/datastore": "^8.6.0", + "@google-cloud/firestore": "^7.5.0", "@google-cloud/logging-winston": "^6.0.0", - "@google-cloud/pubsub": "^4.1.1", - "@google-cloud/storage": "^7.7.0", - "@google-cloud/scheduler": "^4.0.1", - "@google-cloud/secret-manager": "^5.0.1", - "gaxios": "^6.1.1", + "@google-cloud/pubsub": "^4.3.3", + "@google-cloud/storage": "^7.9.0", + "@google-cloud/scheduler": "^4.1.0", + "@google-cloud/secret-manager": "^5.2.0", + "gaxios": "^6.3.0", "google-ads-nodejs-client": "16.0.0", "google-ads-api": "^14.1.0", "google-ads-node": "^12.0.2", - "google-auth-library": "^9.4.2", - "googleapis": "^131.0.0", - "winston": "^3.10.0", - "@grpc/grpc-js": "1.9.14", + "google-auth-library": "^9.7.0", + "googleapis": "^134.0.0", + "winston": "^3.13.0", + "@grpc/grpc-js": "^1.10.5", "lodash": "^4.17.21" }, "devDependencies": { diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/analytics.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/analytics.js index 817d925..7f4a696 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/analytics.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/analytics.js @@ -21,6 +21,7 @@ const stream = require('stream'); const {google} = require('googleapis'); +const { GoogleApiClient } = require('./base/google_api_client.js'); const {Schema$Upload} = google.analytics; const AuthClient = require('./auth_client.js'); const {wait, getLogger, BatchResult} = require('../components/utils.js'); @@ -55,31 +56,26 @@ let DataImportClearConfig; /** * Google Analytics API v3 stub. */ -class Analytics { +class Analytics extends GoogleApiClient { /** * @constructor * @param {!Object=} env The environment object to hold env * variables. */ constructor(env = process.env) { - this.authClient = new AuthClient(API_SCOPES, env); + super(env); + this.googleApi = 'analytics'; this.logger = getLogger('API.GA'); } - /** - * Prepares the Google Analytics instance. - * @return {!google.analytics} - * @private - */ - async getApiClient_() { - if (this.analytics) return this.analytics; - await this.authClient.prepareCredentials(); - this.logger.debug(`Initialized ${this.constructor.name} instance.`); - this.analytics = google.analytics({ - version: API_VERSION, - auth: this.authClient.getDefaultAuth(), - }); - return this.analytics; + /** @override */ + getScope() { + return API_SCOPES; + } + + /** @override */ + getVersion() { + return API_VERSION; } /** @@ -102,7 +98,7 @@ class Analytics { }, config); - const analytics = await this.getApiClient_(); + const analytics = await this.getApiClient(); const response = await analytics.management.uploads.uploadData( uploadConfig); this.logger.debug('Configuration: ', config); @@ -151,7 +147,7 @@ class Analytics { * @return {!Promise} Updated data import Job status. */ async checkJobStatus(jobConfig) { - const analytics = await this.getApiClient_(); + const analytics = await this.getApiClient(); const { data: job } = await analytics.management.uploads.get(jobConfig); if (job.status !== 'PENDING') return job; this.logger.debug( @@ -169,7 +165,7 @@ class Analytics { * @return {!Promise>} */ async listAccounts() { - const analytics = await this.getApiClient_(); + const analytics = await this.getApiClient(); const response = await analytics.management.accounts.list(); return response.data.items.map( (account) => `Account id: ${account.name}[${account.id}]` @@ -182,7 +178,7 @@ class Analytics { * @return {!Promise>} */ async listUploads(config) { - const analytics = await this.getApiClient_(); + const analytics = await this.getApiClient(); const response = await analytics.management.uploads.list(config); return response.data.items; } @@ -203,7 +199,7 @@ class Analytics { const request = Object.assign({}, config, { resource: {customDataImportUids}, }); - const analytics = await this.getApiClient_(); + const analytics = await this.getApiClient(); await analytics.management.uploads.deleteUploadData(request); this.logger.debug('Delete uploads: ', customDataImportUids); } diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/base/ads_api_common.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/base/ads_api_common.js index 421f3eb..7b9d466 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/base/ads_api_common.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/base/ads_api_common.js @@ -19,8 +19,7 @@ const { Transform } = require('stream'); const { - extractObject, - changeObjectNamingFromLowerCamelToSnake, + getFilterAndStringifyFn, getLogger, } = require('../../components/utils.js'); @@ -37,22 +36,6 @@ const START_TAG = '"results":'; const FIELD_MASK_TAG = '"fieldMask"'; const END_TAG = '"requestId"'; -/** - * Generates a function that can convert a given JSON object to a JSON string - * with only specified fields(fieldMask), in specified naming convention. - * @param {string} fieldMask The 'fieldMask' string from response. - * @param {boolean=} snakeCase Whether or not output JSON in snake naming. - */ -function generateProcessFn(fieldMask, snakeCase = false) { - const extractor = extractObject(fieldMask.split(',')); - return (originalObject) => { - const extracted = extractor(originalObject); - const generatedObject = snakeCase - ? changeObjectNamingFromLowerCamelToSnake(extracted) : extracted; - return JSON.stringify(generatedObject); - }; -}; - /** * A stream.Transform that can extract properties and convert naming of the * reponse of Google/Search Ads report from REST interface. @@ -85,7 +68,7 @@ class RestSearchStreamTransform extends Transform { .substring(maskIndex + FIELD_MASK_TAG.length, rawString.indexOf(END_TAG)) .split('"')[1]; this.logger.debug(`Got fieldMask: ${fieldMask}`); - this.processFn = generateProcessFn(fieldMask, this.snakeCase); + this.processFn = getFilterAndStringifyFn(fieldMask, this.snakeCase); } const resultsWithTailing = rawString.substring(startIndex, maskIndex); const results = resultsWithTailing.substring( diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/base/google_api_client.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/base/google_api_client.js new file mode 100644 index 0000000..5a2c6b4 --- /dev/null +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/base/google_api_client.js @@ -0,0 +1,55 @@ +// Copyright 2023 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @fileoverview A base class for Google Api client library class. + */ +const { google } = require('googleapis'); +const { getLogger, getObjectByPath } = require('../../components/utils.js'); +const { AuthRestfulApi } = require('./auth_restful_api.js'); + +/** + * A Google Api client library class. + */ +class GoogleApiClient extends AuthRestfulApi { + + /** @constructor */ + constructor(env = process.env, options = {}) { + super(env, options); + this.logger = getLogger('API.default'); + } + + /** + * Returns the Api version of the Api in the current library. + * @return {string} + * @abstract + */ + getVersion() { } + + /** + * Returns the Api instance. + * @return {!Promise} The Api instance. + */ + async getApiClient() { + if (this.apiClient) return this.apiClient; + this.logger.info(`Initialized ${this.constructor.name} instance.`); + this.apiClient = google[this.googleApi]({ + version: this.getVersion(), + auth: await this.getAuth(), + }); + return this.apiClient; + } +} + +module.exports = { GoogleApiClient }; diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/dfa_reporting.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/dfa_reporting.js index 43fa6c7..d87f96a 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/dfa_reporting.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/dfa_reporting.js @@ -19,9 +19,8 @@ 'use strict'; -const {google} = require('googleapis'); -const {request} = require('gaxios'); -const AuthClient = require('./auth_client.js'); +const { request } = require('gaxios'); +const { GoogleApiClient } = require('./base/google_api_client.js'); const { getLogger, getFilterFunction, @@ -94,7 +93,7 @@ const MAX_IDENTIFIERS_PER_USER = 5; * Google DfaReport API v3.0 stub. * see https://developers.google.com/doubleclick-advertisers/service_accounts */ -class DfaReporting { +class DfaReporting extends GoogleApiClient { /** * @constructor @@ -102,34 +101,19 @@ class DfaReporting { * variables. */ constructor(env = process.env) { - this.authClient = new AuthClient(API_SCOPES, env); + super(env); + this.googleApi = 'dfareporting'; this.logger = getLogger('API.CM'); } - /** - * Prepares the Google DfaReport API instance. - * @return {!google.dfareporting} - * @private - */ - async getApiClient_() { - if (this.dfareporting) return this.dfareporting; - this.logger.debug(`Initialized ${this.constructor.name} instance.`); - this.dfareporting = google.dfareporting({ - version: API_VERSION, - auth: await this.getAuth_(), - }); - return this.dfareporting; + /** @override */ + getScope() { + return API_SCOPES; } - /** - * Gets the auth object. - * @return {!Promise<{!OAuth2Client|!JWT|!Compute}>} - */ - async getAuth_() { - if (this.auth) return this.auth; - await this.authClient.prepareCredentials(); - this.auth = this.authClient.getDefaultAuth(); - return this.auth; + /** @override */ + getVersion() { + return API_VERSION; } /** @@ -140,7 +124,7 @@ class DfaReporting { * @return {!Promise} */ async getProfileId(accountId) { - const dfareporting = await this.getApiClient_(); + const dfareporting = await this.getApiClient(); const { data: { items } } = await dfareporting.userProfiles.list(); const profiles = items.filter( (profile) => profile.accountId === accountId @@ -237,7 +221,7 @@ class DfaReporting { numberOfLines: lines.length, }; try { - const dfareporting = await this.getApiClient_(); + const dfareporting = await this.getApiClient(); const response = await dfareporting.conversions[operation]({ profileId: config.profileId, requestBody: requestBody, @@ -304,7 +288,7 @@ class DfaReporting { * @return {!Promise>} */ async listUserProfiles() { - const dfareporting = await this.getApiClient_(); + const dfareporting = await this.getApiClient(); const { data: { items } } = await dfareporting.userProfiles.list(); return items.map(({profileId, userName, accountId, accountName}) => { return `Profile: ${profileId}[${userName}] ` @@ -349,7 +333,7 @@ class DfaReporting { */ async runReport(config) { const profileId = await this.getProfileForOperation_(config); - const dfareporting = await this.getApiClient_(); + const dfareporting = await this.getApiClient(); const { startDate, endDate } = config; if (startDate && endDate) { const { data: report } = await dfareporting.reports.get({ @@ -392,7 +376,7 @@ class DfaReporting { */ async getReportFileUrl(config) { const profileId = await this.getProfileForOperation_(config); - const dfareporting = await this.getApiClient_(); + const dfareporting = await this.getApiClient(); const response = await dfareporting.reports.files.get({ profileId, reportId: config.reportId, @@ -410,7 +394,7 @@ class DfaReporting { * @return {!Promise} */ async getReportFileStream(url) { - const auth = await this.getAuth_(); + const auth = await this.getAuth(); const headers = await auth.getRequestHeaders(); const response = await request({ method: 'GET', diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/display_video.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/display_video.js index 6cab43e..bad7f73 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/display_video.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/display_video.js @@ -19,14 +19,8 @@ 'use strict'; -const { google } = require('googleapis'); -const AuthClient = require('./auth_client.js'); -const { - getLogger, - getObjectByPath, - SendSingleBatch, - BatchResult, -} = require('../components/utils.js'); +const { GoogleApiClient } = require('./base/google_api_client.js'); +const { getLogger } = require('../components/utils.js'); const API_SCOPES = Object.freeze([ 'https://www.googleapis.com/auth/display-video', @@ -40,7 +34,7 @@ const API_VERSION = 'v3'; * Bid Manager API. * @see https://developers.google.com/bid-manager/reference/rest */ -class DisplayVideo { +class DisplayVideo extends GoogleApiClient { /** * @constructor @@ -48,44 +42,19 @@ class DisplayVideo { * variables. */ constructor(env = process.env) { - this.authClient = new AuthClient(API_SCOPES, env); + super(env); + this.googleApi = 'displayvideo'; this.logger = getLogger('API.DV3API'); } - /** - * Prepares the Google DV3 instance. - * @return {!google.displayvideo} - * @private - */ - async getApiClient_() { - if (this.displayvideo) return this.displayvideo; - this.logger.debug(`Initialized ${this.constructor.name} instance.`); - this.displayvideo = google.displayvideo({ - version: API_VERSION, - auth: await this.getAuth_(), - }); - return this.displayvideo; - } - - /** - * Gets the auth object. - * @return {!Promise<{!OAuth2Client|!JWT|!Compute}>} - */ - async getAuth_() { - if (this.auth) return this.auth; - await this.authClient.prepareCredentials(); - this.auth = this.authClient.getDefaultAuth(); - return this.auth; + /** @override */ + getScope() { + return API_SCOPES; } - /** - * Gets the instance of function object based on Google API client library. - * @param {string|undefined} path - * @return {Object} - */ - async getFunctionObject(path) { - const instance = await this.getApiClient_(); - return getObjectByPath(instance, path); + /** @override */ + getVersion() { + return API_VERSION; } } diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_bidmanager.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_bidmanager.js index dc46f7d..bf8be0b 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_bidmanager.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_bidmanager.js @@ -19,8 +19,7 @@ 'use strict'; -const {google} = require('googleapis'); -const AuthClient = require('./auth_client.js'); +const { GoogleApiClient } = require('./base/google_api_client.js'); const { getLogger } = require('../components/utils.js'); const API_SCOPES = Object.freeze([ @@ -28,25 +27,24 @@ const API_SCOPES = Object.freeze([ ]); const API_VERSION = 'v2'; -/** - * The returned information of get a query. - * @typedef {{ - * running:boolean, - * latestReportRunTimeMs:string, - * googleCloudStoragePathForLatestReport:string, - * }} - */ -let QueryResource; - /** * RequestBody controls the data range of reports. * see: - * https://developers.google.com/bid-manager/v1.1/queries/runquery#request-body + * https://developers.google.com/bid-manager/reference/rest/v2/queries/run#RunQueryRequest * @typedef {{ - * dataRange: string, - * reportDataStartTimeMs: long, - * reportDataEndTimeMs: long, - * timezoneCode: string + * dataRange: { + * range: Range, + * customStartDate: { + * year: integer, + * month: integer, + * day: integer, + * }, + * customEndDate: { +* year: integer, +* month: integer, +* day: integer, +* }, + * } * }} */ let RequestBody; @@ -56,31 +54,26 @@ let RequestBody; * Note: DV360 report API only support OAuth 2.0, see: * https://developers.google.com/bid-manager/how-tos/authorizing */ -class DoubleClickBidManager { +class DoubleClickBidManager extends GoogleApiClient { /** * @constructor * @param {!Object=} env The environment object to hold env * variables. */ constructor(env = process.env) { - this.authClient = new AuthClient(API_SCOPES, env); + super(env); + this.googleApi = 'doubleclickbidmanager'; this.logger = getLogger('API.DV3'); } - /** - * Prepares the Google DBM instance. - * @return {!google.doubleclickbidmanager} - * @private - */ - async getApiClient_() { - if (this.doubleclickbidmanager) return this.doubleclickbidmanager; - await this.authClient.prepareCredentials(); - this.logger.debug(`Initialized ${this.constructor.name} instance.`); - this.doubleclickbidmanager = google.doubleclickbidmanager({ - version: API_VERSION, - auth: this.authClient.getDefaultAuth(), - }); - return this.doubleclickbidmanager; + /** @override */ + getScope() { + return API_SCOPES; + } + + /** @override */ + getVersion() { + return API_VERSION; } /** @@ -91,21 +84,21 @@ class DoubleClickBidManager { * @return {!Promise} Report Id. */ async runQuery(queryId, requestBody = undefined) { - const doubleclickbidmanager = await this.getApiClient_(); + const doubleclickbidmanager = await this.getApiClient(); const response = await doubleclickbidmanager.queries.run( {queryId, requestBody}); return response.data.key.reportId; } /** - * Gets a query resource. + * Gets a query metadata. * See https://developers.google.com/bid-manager/reference/rest/v2/queries/get * @param {number} queryId Id of the query. - * @return {!Promise} Query resource, see - * https://developers.google.com/bid-manager/reference/rest/v2/queries#Query + * @return {!Promise} Query metadata, see + * https://developers.google.com/bid-manager/reference/rest/v2/queries#QueryMetadata */ async getQuery(queryId) { - const doubleclickbidmanager = await this.getApiClient_(); + const doubleclickbidmanager = await this.getApiClient(); const response = await doubleclickbidmanager.queries.get({ queryId }); return response.data.metadata; } @@ -117,14 +110,14 @@ class DoubleClickBidManager { * @return {!Promise} Id of created query. */ async createQuery(query) { - const doubleclickbidmanager = await this.getApiClient_(); + const doubleclickbidmanager = await this.getApiClient(); const response = await doubleclickbidmanager.queries.create( {requestBody: query}); return response.data.queryId; } async getQueryReport(queryId, reportId) { - const doubleclickbidmanager = await this.getApiClient_(); + const doubleclickbidmanager = await this.getApiClient(); const response = await doubleclickbidmanager.queries.reports.get( { queryId, reportId }); return response.data.metadata; @@ -136,7 +129,7 @@ class DoubleClickBidManager { * @return {!Promise} Whether the query was deleted. */ async deleteQuery(queryId) { - const doubleclickbidmanager = await this.getApiClient_(); + const doubleclickbidmanager = await this.getApiClient(); try { const { status } = await doubleclickbidmanager.queries.delete({ queryId }); return status === 200; @@ -148,7 +141,6 @@ class DoubleClickBidManager { } module.exports = { - QueryResource, RequestBody, DoubleClickBidManager, }; diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_search.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_search.js index d61e4f3..23f4e16 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_search.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/doubleclick_search.js @@ -19,9 +19,8 @@ 'use strict'; -const {google} = require('googleapis'); const {request} = require('gaxios'); -const AuthClient = require('./auth_client.js'); +const { GoogleApiClient } = require('./base/google_api_client.js'); const { getLogger, SendSingleBatch, @@ -118,7 +117,7 @@ let ReportRequest; * Quota limits, see: * https://support.google.com/adsihc/answer/6346075?hl=en */ -class DoubleClickSearch { +class DoubleClickSearch extends GoogleApiClient { /** * @constructor @@ -126,34 +125,19 @@ class DoubleClickSearch { * variables. */ constructor(env = process.env) { - this.authClient = new AuthClient(API_SCOPES, env); + super(env); + this.googleApi = 'doubleclicksearch'; this.logger = getLogger('API.DS'); } - /** - * Prepares the Google SA360 instance. - * @return {!google.doubleclicksearch} - * @private - */ - async getApiClient_() { - if (this.doubleclicksearch) return this.doubleclicksearch; - this.logger.debug(`Initialized ${this.constructor.name} instance.`); - this.doubleclicksearch = google.doubleclicksearch({ - version: API_VERSION, - auth: await this.getAuth_(), - }); - return this.doubleclicksearch; + /** @override */ + getScope() { + return API_SCOPES; } - /** - * Gets the auth object. - * @return {!Promise<{!OAuth2Client|!JWT|!Compute}>} - */ - async getAuth_() { - if (this.auth) return this.auth; - await this.authClient.prepareCredentials(); - this.auth = this.authClient.getDefaultAuth(); - return this.auth; + /** @override */ + getVersion() { + return API_VERSION; } /** @@ -172,7 +156,7 @@ class DoubleClickSearch { }); this.logger.debug('Sending out availabilities', availabilities); try { - const doubleclicksearch = await this.getApiClient_(); + const doubleclicksearch = await this.getApiClient(); const response = await doubleclicksearch.conversion.updateAvailability( {requestBody: {availabilities}}); this.logger.debug('Get response: ', response); @@ -221,7 +205,7 @@ class DoubleClickSearch { numberOfLines: lines.length, }; try { - const doubleclicksearch = await this.getApiClient_(); + const doubleclicksearch = await this.getApiClient(); const response = await doubleclicksearch.conversion.insert( {requestBody: {conversion: conversions}} ); @@ -322,7 +306,7 @@ class DoubleClickSearch { * @return {!Promise} */ async requestReports(requestBody) { - const doubleclicksearch = await this.getApiClient_(); + const doubleclicksearch = await this.getApiClient(); const { status, data } = await doubleclicksearch.reports.request({ requestBody }); if (status >= 200 && status < 300) { return data.id; @@ -341,7 +325,7 @@ class DoubleClickSearch { * }>>} */ async getReportUrls(reportId) { - const doubleclicksearch = await this.getApiClient_(); + const doubleclicksearch = await this.getApiClient(); const { status, data } = await doubleclicksearch.reports.get({ reportId }); switch (status) { case 200: @@ -367,11 +351,11 @@ class DoubleClickSearch { * @return {!Promise} */ async getReportFile(reportId, reportFragment) { - const doubleclicksearch = await this.getApiClient_(); + const doubleclicksearch = await this.getApiClient(); const response = await doubleclicksearch.reports.getFile( {reportId, reportFragment}); if (response.status === 200) { - return Buffer.from(await response.data.arrayBuffer()).toString(); + return response.data; } const errorMsg = `Error in get file from reports: ${reportFragment}@${reportId}`; @@ -387,7 +371,7 @@ class DoubleClickSearch { * @return {!Promise} */ async getReportFileStream(url) { - const auth = await this.getAuth_(); + const auth = await this.getAuth(); const headers = await auth.getRequestHeaders(); const response = await request({ method: 'GET', diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/google_ads_api.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/google_ads_api.js index 431de2e..8b0f72a 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/google_ads_api.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/google_ads_api.js @@ -1100,7 +1100,7 @@ class GoogleAdsApi { */ buildCustomerMatchUserListMetadata_(config) { const { customerId, listId, customerMatchUserListMetadata } = config; - const resourceName = `customers/${customerId}/userLists/${listId}`; + const resourceName = `customers/${getCleanCid(customerId)}/userLists/${listId}`; return new CustomerMatchUserListMetadata(Object.assign({ userList: resourceName, }, customerMatchUserListMetadata)); @@ -1521,7 +1521,7 @@ const buildConversionJsonList = (lines, config, conversionFields, conversion.customVariables = variables.map((variable) => { return new CustomVariable({ conversionCustomVariable: - `customers/${customerId}/conversionCustomVariables/${customVariables[variable]}`, + `customers/${getCleanCid(customerId)}/conversionCustomVariables/${customVariables[variable]}`, value: record[variable], }); }); diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/index.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/index.js index 821391b..ca227a0 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/index.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/index.js @@ -88,7 +88,6 @@ exports.searchads = require('./search_ads.js'); /** * APIs integration class for DoubleClick BidManager (DV360). * @const {{ - * QueryResource:!QueryResource, * DoubleClickBidManager:!DoubleClickBidManager, * }} */ diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/search_ads.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/search_ads.js index 59ccd1b..7d336de 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/search_ads.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/search_ads.js @@ -20,7 +20,7 @@ const { request: gaxiosRequest } = require('gaxios'); const { google } = require('googleapis'); -const AuthClient = require('./auth_client.js'); +const { GoogleApiClient } = require('./base/google_api_client.js'); const { getLogger } = require('../components/utils.js'); const { getCleanCid, RestSearchStreamTransform } = require('./base/ads_api_common.js'); @@ -35,7 +35,7 @@ const API_VERSION = 'v0'; * Search Ads 360 Reporting API stub. * See: https://developers.google.com/search-ads/reporting/api/reference/release-notes */ -class SearchAds { +class SearchAds extends GoogleApiClient { /** * @constructor @@ -43,10 +43,21 @@ class SearchAds { * variables. */ constructor(env = process.env) { - this.authClient = new AuthClient(API_SCOPES, env); + super(env); + this.googleApi = 'searchads360'; this.logger = getLogger('API.SA'); } + /** @override */ + getScope() { + return API_SCOPES; + } + + /** @override */ + getVersion() { + return API_VERSION; + } + /** * Prepares the Search Ads 360 Reporting API instance. * OAuth 2.0 application credentials is required for calling this API. @@ -59,11 +70,11 @@ class SearchAds { * @return {!google.searchads360} * @private */ - async getApiClient_(loginCustomerId) { + async getApiClient(loginCustomerId) { this.logger.debug(`Initialized SA reporting for ${loginCustomerId}`); const options = { - version: API_VERSION, - auth: await this.getAuth_(), + version: this.getVersion(), + auth: await this.getAuth(), }; if (loginCustomerId) { options.headers = { 'login-customer-id': getCleanCid(loginCustomerId) }; @@ -71,17 +82,6 @@ class SearchAds { return google.searchads360(options); } - /** - * Gets the auth object. - * @return {!Promise<{!OAuth2Client|!JWT|!Compute}>} - */ - async getAuth_() { - if (this.auth) return this.auth; - await this.authClient.prepareCredentials(); - this.auth = this.authClient.getDefaultAuth(); - return this.auth; - } - /** * Gets a report synchronously from a given Customer account. * If there is a `nextPageToken` in the response, it means the report is not @@ -96,7 +96,7 @@ class SearchAds { * @see https://developers.google.com/search-ads/reporting/api/reference/rpc/google.ads.searchads360.v0.services#searchsearchads360response */ async getPaginatedReport(customerId, loginCustomerId, query, options = {}) { - const searchads = await this.getApiClient_(loginCustomerId); + const searchads = await this.getApiClient(loginCustomerId); const requestBody = Object.assign({ query, pageSize: 10000, @@ -120,7 +120,7 @@ class SearchAds { * @see https://developers.google.com/search-ads/reporting/api/reference/rest/search */ async restStreamReport(customerId, loginCustomerId, query) { - const auth = await this.getAuth_(); + const auth = await this.getAuth(); const headers = Object.assign( await auth.getRequestHeaders(), { 'login-customer-id': getCleanCid(loginCustomerId), @@ -166,7 +166,7 @@ class SearchAds { * @see https://developers.google.com/search-ads/reporting/api/reference/rest/v0/searchAds360Fields#SearchAds360Field */ async getReportField(fieldName) { - const searchads = await this.getApiClient_(); + const searchads = await this.getApiClient(); const resourceName = `searchAds360Fields/${fieldName}`; const response = await searchads.searchAds360Fields.get({ resourceName }); @@ -185,7 +185,7 @@ class SearchAds { */ async searchReportField(adFields, metadata = ['name', 'data_type', 'is_repeated', 'type_url',]) { - const searchads = await this.getApiClient_(); + const searchads = await this.getApiClient(); const selectClause = metadata.join(','); const fields = adFields.join('","'); const query = `SELECT ${selectClause} WHERE name IN ("${fields}")`; @@ -203,7 +203,7 @@ class SearchAds { * @see https://developers.google.com/search-ads/reporting/api/reference/rest/v0/customers.customColumns#CustomColumn */ async listCustomColumns(customerId, loginCustomerId) { - const searchads = await this.getApiClient_(loginCustomerId); + const searchads = await this.getApiClient(loginCustomerId); const response = await searchads.customers.customColumns.list({ customerId }); return response.data.customColumns; } @@ -219,7 +219,7 @@ class SearchAds { */ async getCustomColumn(columnId, customerId, loginCustomerId) { const resourceName = `customers/${customerId}/customColumns/${columnId}`; - const searchads = await this.getApiClient_(loginCustomerId); + const searchads = await this.getApiClient(loginCustomerId); const response = await searchads.customers.customColumns.get({ resourceName }); return response.data; } diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/spreadsheets.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/spreadsheets.js index 976940b..e439f92 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/spreadsheets.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/spreadsheets.js @@ -19,6 +19,7 @@ 'use strict'; const {google} = require('googleapis'); +const { GoogleApiClient } = require('./base/google_api_client.js'); const {Params$Resource$Spreadsheets$Get} = google.sheets; const AuthClient = require('./auth_client.js'); const {getLogger, BatchResult} = require('../components/utils.js'); @@ -63,7 +64,7 @@ let DimensionRange; /** * Google Spreadsheets API v4 stub. */ -class Spreadsheets { +class Spreadsheets extends GoogleApiClient { /** * Init Spreadsheets API client. * @param {string} spreadsheetId @@ -71,29 +72,21 @@ class Spreadsheets { * variables. */ constructor(spreadsheetId, env = process.env) { + super(env); + this.googleApi = 'sheets'; /** @const {string} */ this.spreadsheetId = spreadsheetId; - this.authClient = new AuthClient(API_SCOPES, env); - /** - * Logger object from 'log4js' package where this type is not exported. - */ this.logger = getLogger('API.GS'); } - /** - * Prepares the Google Sheets instance. - * @return {!google.sheets} - * @private - */ - async getApiClient_() { - if (this.sheets) return this.sheets; - await this.authClient.prepareCredentials(); - this.logger.debug(`Initialized ${this.constructor.name} instance.`); - this.sheets = google.sheets({ - version: API_VERSION, - auth: this.authClient.getDefaultAuth(), - }); - return this.sheets; + /** @override */ + getScope() { + return API_SCOPES; + } + + /** @override */ + getVersion() { + return API_VERSION; } /** @@ -108,7 +101,7 @@ class Spreadsheets { spreadsheetId: this.spreadsheetId, ranges: sheetName, }; - const sheets = await this.getApiClient_(); + const sheets = await this.getApiClient(); const response = await sheets.spreadsheets.get(request); const sheet = response.data.sheets[0]; this.logger.debug(`Get sheet[${sheetName}]: `, sheet); @@ -127,7 +120,7 @@ class Spreadsheets { range: sheetName, }; try { - const sheets = await this.getApiClient_(); + const sheets = await this.getApiClient(); const response = await sheets.spreadsheets.values.clear(request); const data = response.data; this.logger.debug(`Clear sheet[${sheetName}}]: `, data); @@ -180,7 +173,7 @@ class Spreadsheets { ranges: sheetName, }; try { - const sheets = await this.getApiClient_(); + const sheets = await this.getApiClient(); const response = await sheets.spreadsheets.get(request); const sheet = response.data.sheets[0]; const sheetId = sheet.properties.sheetId; @@ -232,7 +225,7 @@ class Spreadsheets { numberOfLines: data.trim().split('\n').length, }; try { - const sheets = await this.getApiClient_(); + const sheets = await this.getApiClient(); const response = await sheets.spreadsheets.batchUpdate(request); const data = response.data; this.logger.debug(`Batch[${batchId}] uploaded: `, data); diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/youtube.js b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/youtube.js index 70d5bf9..e3eea5b 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/apis/youtube.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/apis/youtube.js @@ -19,6 +19,7 @@ 'use strict'; const {google} = require('googleapis'); +const { GoogleApiClient } = require('./base/google_api_client.js'); const { Schema$Channel, Schema$Video, @@ -26,8 +27,7 @@ const { Schema$Playlist, Schema$Search, } = google.youtube; -const AuthClient = require('./auth_client.js'); -const {getLogger} = require('../components/utils.js'); +const { getLogger } = require('../components/utils.js'); const API_SCOPES = Object.freeze([ 'https://www.googleapis.com/auth/youtube.force-ssl' @@ -159,34 +159,26 @@ let ListSearchConfig; * Search list type definition, see: * https://developers.google.com/youtube/v3/docs/search/list */ -class YouTube { +class YouTube extends GoogleApiClient { /** * @constructor * @param {!Object=} env The environment object to hold env * variables. */ constructor(env = process.env) { - this.authClient = new AuthClient(API_SCOPES, env); - /** - * Logger object from 'log4js' package where this type is not exported. - */ + super(env); + this.googleApi = 'youtube'; this.logger = getLogger('API.YT'); } - /** - * Prepares the Google YouTube instance. - * @return {!google.youtube} - * @private - */ - async getApiClient_() { - if (this.youtube) return this.youtube; - await this.authClient.prepareCredentials(); - this.logger.debug(`Initialized ${this.constructor.name} instance.`); - this.youtube = google.youtube({ - version: API_VERSION, - auth: this.authClient.getDefaultAuth(), - }); - return this.youtube; + /** @override */ + getScope() { + return API_SCOPES; + } + + /** @override */ + getVersion() { + return API_VERSION; } /** @@ -200,7 +192,7 @@ class YouTube { const channelListRequest = Object.assign({}, config); channelListRequest.part = channelListRequest.part.join(',') try { - const youtube = await this.getApiClient_(); + const youtube = await this.getApiClient(); const response = await youtube.channels.list(channelListRequest); this.logger.debug('Response: ', response); return response.data.items; @@ -223,7 +215,7 @@ class YouTube { const videoListRequest = Object.assign({}, config); videoListRequest.part = videoListRequest.part.join(',') try { - const youtube = await this.getApiClient_(); + const youtube = await this.getApiClient(); const response = await youtube.videos.list(videoListRequest); this.logger.debug('Response: ', response); return response.data.items; @@ -247,7 +239,7 @@ class YouTube { const commentThreadsRequest = Object.assign({}, config); commentThreadsRequest.part = commentThreadsRequest.part.join(',') try { - const youtube = await this.getApiClient_(); + const youtube = await this.getApiClient(); const response = await youtube.commentThreads.list(commentThreadsRequest); this.logger.debug('Response: ', response.data); return response.data.items; @@ -282,7 +274,7 @@ class YouTube { } try { - const youtube = await this.getApiClient_(); + const youtube = await this.getApiClient(); const response = await youtube.playlists.list(playlistsRequest); this.logger.debug('Response: ', response.data); if (response.data.nextPageToken) { @@ -324,7 +316,7 @@ class YouTube { } try { - const youtube = await this.getApiClient_(); + const youtube = await this.getApiClient(); const response = await youtube.search.list(searchRequest); this.logger.debug('Response: ', response.data); if (response.data.nextPageToken) { diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/components/firestore/datastore_mode_access.js b/marketing-analytics/activation/common-libs/nodejs-common/src/components/firestore/datastore_mode_access.js index 4104200..69e254b 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/components/firestore/datastore_mode_access.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/components/firestore/datastore_mode_access.js @@ -75,7 +75,7 @@ class DatastoreModeAccess { */ getKey(id) { const keyPath = [this.kind]; - if (id) keyPath.push(isNaN(id) ? id : parseInt(id)); + if (id) keyPath.push(isNaN(id) ? id : this.datastore.int(id)); return this.datastore.key({ namespace: this.namespace, path: keyPath, @@ -116,10 +116,11 @@ class DatastoreModeAccess { const key = this.getKey(id); const apiResponse = await this.datastore.save({ key, data, excludeLargeProperties: true }); - // Default key in Datastore is a number in response like following. + // Default key in Datastore is an int as string in response. It could be + // larger than JavaScript max safe integer, so keep it as string here. // With a given id, the key in response is null. const updatedId = id !== undefined ? id - : +apiResponse[0].mutationResults[0].key.path[0].id; + : apiResponse[0].mutationResults[0].key.path[0].id; this.logger.debug(`Result of saving ${updatedId}@${this.kind}: `, JSON.stringify(apiResponse)); // Datastore has a delay to write entity. This method only returns id diff --git a/marketing-analytics/activation/common-libs/nodejs-common/src/components/utils.js b/marketing-analytics/activation/common-libs/nodejs-common/src/components/utils.js index 8c9e375..2c5339b 100644 --- a/marketing-analytics/activation/common-libs/nodejs-common/src/components/utils.js +++ b/marketing-analytics/activation/common-libs/nodejs-common/src/components/utils.js @@ -487,7 +487,7 @@ const extractObject = (paths) => { return (sourceObject) => { const output = {}; paths.forEach((path) => { - const [value, owner, property] = path.split('.') + const [value, owner, property] = path.trim().split('.') .reduce(transcribe, [sourceObject, output, undefined]); if (typeof value !== 'undefined') { owner[property] = value; @@ -509,7 +509,7 @@ const getObjectByPath = (obj, paths) => { paths.split('.').filter((key) => !!key).forEach((key) => { instance = instance[key]; if (!instance) { - console.error('Fail to get function containter', paths); + console.error('Fail to get element from path:', paths); return instance; } }); @@ -591,7 +591,24 @@ const changeObjectNamingFromLowerCamelToSnake = (obj) => { } else { return obj; } -} +}; + +/** + * Generates a function that can convert a given JSON object to a JSON string + * with only specified fields(fieldMask), in specified naming convention. + * @param {string} fieldMask The 'fieldMask' string from response. + * @param {boolean=} snakeCase Whether or not output JSON in snake naming. + */ +const getFilterAndStringifyFn = (fieldMask, snakeCase = false) => { + const extractor = extractObject( + Array.isArray(fieldMask) ? fieldMask : fieldMask.split(',')); + return (originalObject) => { + const extracted = extractor(originalObject); + const generatedObject = snakeCase + ? changeObjectNamingFromLowerCamelToSnake(extracted) : extracted; + return JSON.stringify(generatedObject); + }; +}; /** * Returns the response data for a HTTP request. It will retry the specific @@ -619,7 +636,7 @@ const requestWithRetry = async (options, logger = console, retryTimes = 3) => { logger.error(`Request ${JSON.stringify(options)}`, error); } } while (processedTimes <= retryTimes) -} +}; // noinspection JSUnusedAssignment module.exports = { @@ -641,5 +658,6 @@ module.exports = { changeNamingFromLowerCamelToSnake, changeObjectNamingFromSnakeToLowerCamel, changeObjectNamingFromLowerCamelToSnake, + getFilterAndStringifyFn, requestWithRetry, }; diff --git a/marketing-analytics/activation/data-tasks-coordinator/deploy.sh b/marketing-analytics/activation/data-tasks-coordinator/deploy.sh index de2d4e9..13bd27d 100755 --- a/marketing-analytics/activation/data-tasks-coordinator/deploy.sh +++ b/marketing-analytics/activation/data-tasks-coordinator/deploy.sh @@ -39,6 +39,7 @@ CONFIG_ITEMS=( "REGION" "GCS_BUCKET" "DATABASE_ID" + "DATABASE_MODE" "SECRET_NAME" "INBOUND" ) @@ -437,13 +438,15 @@ EOF quit_if_failed $? check_firestore_existence local auth - if [[ -f "$(pwd)/${OAUTH2_TOKEN_JSON}" ]]; then + if [[ -n "${SECRET_NAME}" ]]; then + auth="SECRET_NAME=${SECRET_NAME}" + elif [[ -f "$(pwd)/${OAUTH2_TOKEN_JSON}" ]]; then auth="OAUTH2_TOKEN_JSON=$(pwd)/${OAUTH2_TOKEN_JSON}" elif [[ -f "$(pwd)/${SA_KEY_FILE}" ]]; then auth="API_SERVICE_ACCOUNT=$(pwd)/${SA_KEY_FILE}" fi printf '%s\n' " Setting environment variable of auth: ${auth}" - env "${auth}" node -e "require('./index.js').startTaskFromLocal(\ + env "${auth}" "DEBUG=true" node -e "require('./index.js').startTaskFromLocal(\ process.argv[1], process.argv[2], '${PROJECT_NAMESPACE}')" "$@" } diff --git a/marketing-analytics/activation/data-tasks-coordinator/package.json b/marketing-analytics/activation/data-tasks-coordinator/package.json index d17b4ba..f3827a9 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/package.json +++ b/marketing-analytics/activation/data-tasks-coordinator/package.json @@ -1,6 +1,6 @@ { "name": "@google-cloud/data-tasks-coordinator", - "version": "1.9.1", + "version": "2.1.0", "description": "A data task coordinator based on Cloud Functions", "author": "Google Inc.", "license": "Apache-2.0", @@ -20,13 +20,15 @@ "homepage": "https://github.com/GoogleCloudPlatform/cloud-for-marketing/blob/master/marketing-analytics/activation/data-tasks-coordinator/README.md", "main": "index.js", "dependencies": { - "@google-cloud/automl": "^4.0.0", - "@google-cloud/bigquery": "^7.2.0", - "@google-cloud/bigquery-data-transfer": "^4.0.0", - "@google-cloud/firestore": "^6.7.0", - "@google-cloud/nodejs-common": "2.0.0", - "@google-cloud/storage": "^7.0.1", - "google-auth-library": "^9.0.0", + "@google-cloud/automl": "^4.0.1", + "@google-cloud/bigquery": "^7.5.2", + "@google-cloud/bigquery-data-transfer": "^4.2.0", + "@google-cloud/firestore": "^7.5.0", + "@google-cloud/nodejs-common": "2.2.0", + "@google-cloud/storage": "^7.9.0", + "google-ads-api": "14.2.0", + "google-ads-nodejs-client": "16.0.0", + "google-auth-library": "^9.7.0", "jsdom": "^21.1.0", "lodash": "^4.17.21", "luxon": "^3.4.2", diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/sentinel.js b/marketing-analytics/activation/data-tasks-coordinator/src/sentinel.js index 379f0c8..b135244 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/sentinel.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/sentinel.js @@ -200,9 +200,6 @@ class Sentinel { if (this.isBigQueryLoggingMessage_(message)) { return this.finishBigQueryTask_(message); } - if (this.isBigQueryDataTransferMessage_(message)) { - return this.finishBigQueryDataTransferTask_(message); - } throw new Error(`Unknown message: ${getMessage(message)}`); }; return coordinateTask; @@ -280,30 +277,6 @@ class Sentinel { return this.handleBigQueryJobCompletedEvent_(event); } - /** - * Returns whether this is a message from BigQuery Data Transfer of a completed run job. - */ - isBigQueryDataTransferMessage_(message) { - try { - const attributes = message.attributes || {}; - return attributes.eventType === 'TRANSFER_RUN_FINISHED' - && attributes.payloadFormat === 'JSON_API_V1'; - } catch (error) { - this.logger.error( - 'Checking whether the message is from BigQuery Data Transfer', - error - ); - return false; - } - } - - /** Finishes the task (if any) related to the completed job in the message. */ - finishBigQueryDataTransferTask_(message) { - const data = getMessage(message); - const payload = JSON.parse(data); - return this.handleBigQueryDataTransferTask_(payload); - } - /** * Starts a task in a 'duplicated message proof' way by using * 'TaskLogDao.startTask' to check the status. @@ -413,6 +386,7 @@ class Sentinel { async prepareExternalTask_(taskConfigId, parameters) { const taskConfig = await this.taskConfigDao.load(taskConfigId); if (!taskConfig) throw new Error(`Fail to load Task ${taskConfigId}`); + parameters.taskConfigId = taskConfigId; const task = this.buildTask(taskConfig, parameters); task.injectContext(this.options); return task; @@ -505,38 +479,6 @@ class Sentinel { } this.logger.debug(`BigQuery JobId[${jobId}] is not a Sentinel Job.`); } - - /** - * Based on the incoming message, updates the TaskLog and triggers next tasks - * if there is any in TaskConfig of the current finished task. - * For Data Transfer tasks, the taskLog saves the 'name' of run job - * as 'job id' which is generated when the job starts at the beginning. - * When the job is done, the datatransfer job will be sent here with the run - * job 'name'. So we can match to TaskLogs in database - * waiting for the job is done. - * @param payload - * @return {!Promise<(!Array|number|undefined)>} The message Id array - * of the next tasks and an empty Array if there is no followed task. - * Returns taskLogId (number) when an error occurs. - * Returns undefined if there is no related taskLog. - * @private - */ - async handleBigQueryDataTransferTask_(payload) { - const jobId = payload.name; - const jobStatus = payload.state; - this.logger.debug(`Data Transfer job[${jobId}] status: ${jobStatus}`); - const filter = { property: 'jobId', value: jobId }; - const taskLogs = await this.taskLogDao.list([filter]); - if (taskLogs.length > 1) { - throw new Error(`Find more than one task with Job Id: ${jobId}`); - } - if (taskLogs.length === 1) { - return this.finishTask_(taskLogs[0].id); - } - this.logger.debug( - `BigQuery Data Transfer JobId[${jobId}] is not a Sentinel Job.` - ); - } } /** @@ -584,18 +526,53 @@ const getDatePartition = (filename) => { */ const getDefaultParameters = (parameters, timezone = 'UTC', unixMillis = Date.now()) => { + const DATE_KEYWORDS = [ + 'now', + 'today', + 'add', + 'set', + 'sub', + 'hyphenated', + 'timestamp', + 'ms', + 'last', + 'current', + 'year', + 'quarter', + 'month', + 'week', + 'start', + 'end', + ]; /** * Returns the value based on the given parameter name. * @param {string=} parameter * @return {string|number} */ const getDefaultValue = (parameter) => { - const regex = /(now)|(today)|(add)|(set)|(sub)|(hyphenated)|(timestamp)|(ms)/gi; + const regex = new RegExp(DATE_KEYWORDS.map((k) => `(${k})`).join('|'), 'ig'); let realParameter = parameter.replace(/(yesterday)/ig, 'today_sub_1') .replace(regex, (match) => match.toLowerCase()); const now = DateTime.fromMillis(unixMillis, {zone: timezone}); if (realParameter === 'now') return now.toISO(); // 'now' is a Date ISO String. if (realParameter === 'today') return now.toFormat('yyyyMMdd'); + // [last|current]_[week|month|quarter|year]_[start|end] + if (realParameter.startsWith('last_') + || realParameter.startsWith('current_')) { + const pattern = + /^(last|current)_(week|month|quarter|year)_(start|end)(_.+)?$/; + if (!pattern.test(realParameter)) + throw new Error(`Malformed of default parameter: ${parameter}`); + const periods = realParameter.split('_'); + const target = periods.slice(0, 3).join('_'); + const targetPeriod = realParameter.startsWith('last_') + ? now.minus({ [periods[1]]: 1 }) : now; + const targetDate = targetPeriod[`${periods[2]}Of`](periods[1]); + const diff = now.diff(targetDate, 'day').toObject().days; + const mathMethod = periods[2] === 'start' ? 'floor' : 'ceil'; + const replacedString = `today_sub_${Math[mathMethod](diff)}`; + realParameter = realParameter.replace(target, replacedString); + } if (!realParameter.startsWith('today')) { throw new Error(`Unknown default parameter: ${parameter}`); } @@ -680,17 +657,25 @@ const getSentinel = async (namespace, database) => { * @param {(string|undefined)=} namespace * @param {(string|undefined)=} projectId * @param {(string|undefined)=} databaseId + * @param {(string|undefined)=} databaseMode * @return {!Promise} */ const guessSentinel = async (namespace = process.env['PROJECT_NAMESPACE'], projectId = process.env['GCP_PROJECT'], - databaseId = process.env['DATABASE_ID'] || DEFAULT_DATABASE) => { + databaseId = process.env['DATABASE_ID'] || DEFAULT_DATABASE, + databaseMode = process.env['DATABASE_MODE']) => { if (!namespace) { console.warn( 'Fail to find ENV variables PROJECT_NAMESPACE, will set as `sentinel`'); namespace = 'sentinel'; } - const database = await getFirestoreDatabase(projectId, databaseId); + if (!databaseMode) { + console.warn( + 'Database mode is not set. Please consider upgrade this solution.'); + } + const database = databaseMode + ? { source: DataSource[databaseMode], id: databaseId } + : await getFirestoreDatabase(projectId, databaseId); return getSentinel(namespace, database); }; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/task_config/task_config_dao.js b/marketing-analytics/activation/data-tasks-coordinator/src/task_config/task_config_dao.js index 83d0d1a..45a0b6a 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/task_config/task_config_dao.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/task_config/task_config_dao.js @@ -32,6 +32,7 @@ const TaskType = { LOAD: 'load', QUERY: 'query', EXPORT: 'export', + CREATE_EXTERNAL: 'create_external', DATA_TRANSFER: 'data_transfer', EXPORT_SCHEMA: 'export_schema', DELETE_DATASET: 'delete_dataset', @@ -51,6 +52,7 @@ const TaskType = { // Tasks dependencies management tasks KNOT: 'knot', MULTIPLE: 'multiple', + SPEED_CONTROLLED: 'speed_controlled', }; /** diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/task_log/task_log_dao.js b/marketing-analytics/activation/data-tasks-coordinator/src/task_log/task_log_dao.js index ea0c5c6..d64d9b2 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/task_log/task_log_dao.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/task_log/task_log_dao.js @@ -88,9 +88,7 @@ const FIELD_NAMES = { let TaskLog; /** - * Base Task Log data access class on Firestore. The underlying transactions - * in native mode and datastore mode are different, so there will be two sub- - * classes for these two different modes. + * Base Task Log data access class on Firestore. */ class TaskLogDao extends DataAccessObject { diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/create_external_table.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/create_external_table.js new file mode 100644 index 0000000..2f13fe5 --- /dev/null +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/create_external_table.js @@ -0,0 +1,150 @@ +// Copyright 2019 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @fileoverview Load task class. + */ + +'use strict'; + +const { TableSchema: BqTableSchema } = require('@google-cloud/bigquery'); +const { TaskType, BigQueryTableConfig, } + = require('../../task_config/task_config_dao.js'); +const { BigQueryAbstractTask } = require('./bigquery_abstract_task.js'); +const { LoadTaskDestination } = require('./load_task.js'); + +/** + * @typedef {{ + * type:TaskType.CREATE_EXTERNAL, + * source:{ + * sheet:{ + * url: string, + * sheetName: string, + * skipLeadingRows: number|undefined, + * } + * }, + * destination:!LoadTaskDestination, + * appendedParameters:(Object|undefined), + * next:(string|!Array|undefined), + * }} + */ +let CreateExternalTableTaskConfig; + +/** Task to create a BigQuery external table on a Google Sheet. */ +class CreateExternalTableTask extends BigQueryAbstractTask { + + /** @override */ + getBigQueryForTask() { + /** @const {BigQueryTableConfig} */ + const destinationTable = this.config.destination.table; + return this.getBigQuery(destinationTable, true); + } + + /** + * Create a BigQuery external table based on the given Google Sheet. If the + * table exists, it will be removed first. + * @override + */ + async doTask() { + const { source, destination } = this.config; + await this.precheckTargetTable_(destination.table); + const { sheet } = source; + const { schema } = destination.tableSchema || {}; + const options = { + externalDataConfiguration: this.getSheetConfiguration_(sheet, schema), + }; + const [table] = await this.getBigQueryForTask() + .dataset(destination.table.datasetId) + .createTable(destination.table.tableId, options); + if (!table) { + throw new Error(`Fail to create ${destination.table.tableId}`); + } + this.logger.info(`Created external table ${destination.table.tableId}`); + return { + parameters: this.appendParameter({ destinationTable: destination.table }), + }; + } + + /** @override */ + async isDone() { + return true; + } + + /** @override */ + async completeTask() { } + + /** + * Prechecks for the target table. If it's an existing external table, will + * delete it; if the existing table is not an external one, will throw an + * error. + * @param {!BigQueryTableConfig} tableOptions BigQuery Table configuration. + * @private + */ + async precheckTargetTable_(tableOptions) { + const { tableId } = tableOptions; + if (!tableId) { + throw new Error(`Missing destination external table Id.`); + } + if (tableId.indexOf('$') > -1) { + throw new Error(`External table [${tableId}] can't be a partition one.`); + } + const [dataset] = await this.getBigQueryForTask() + .dataset(tableOptions.datasetId) + .get({ autoCreate: true }); + const table = dataset.table(tableId); + const [tableExists] = await table.exists(); + if (!tableExists) { + this.logger.debug(`Destination table doesn't exist: `, tableOptions); + return; + } + const { type } = (await table.get())[0].metadata; + if (type !== 'EXTERNAL') { + throw new Error(`Target table [${tableId}] is not an external table.`); + } + this.logger.debug(`Delete the existing table: `, tableOptions); + let result = await table.delete(); + this.logger.debug(`... resuslt: `, result); + } + + /** + * Gets the `externalDataConfiguration` for Google Sheet. + * @see https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externaldataconfiguration + * @see https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#GoogleSheetsOptions + * @param {!Object} sheet + * @param {!BqTableSchema|undefined} schema + * @return {Object} + */ + getSheetConfiguration_(sheet, schema) { + const config = { + sourceFormat: 'GOOGLE_SHEETS', + sourceUris: [sheet.url], + googleSheetsOptions: { + skipLeadingRows: sheet.skipLeadingRows || 1, + range: sheet.sheetName, + }, + }; + if (schema) { + config.schema = schema; + } else { + config.autodetect = true; + } + this.logger.debug(`Get sheet options: `, config); + return config; + } +} + +module.exports = { + CreateExternalTableTaskConfig, + CreateExternalTableTask, +}; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/data_transfer_task.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/data_transfer_task.js index e0fcce3..8bf798e 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/data_transfer_task.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/data_transfer_task.js @@ -50,6 +50,11 @@ const TRANSFER_STATE_CANCELLED = 'CANCELLED'; /** Executes BigQuery Data Transfer run job. */ class DataTransferTask extends BaseTask { + /** @override */ + isManualAsynchronous() { + return true; + } + /** * Gets a BigQuery Data Transfer service client instance. * @return {DataTransferServiceClient} diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/export_task.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/export_task.js index 5900363..c74fb89 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/export_task.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/export_task.js @@ -32,6 +32,7 @@ const { * @typedef {{ * destinationFormat:string, * printHeader:boolean, + * allowMultiple:boolean, * }} */ let ExtractOptions; @@ -74,9 +75,10 @@ class ExportTask extends BigQueryAbstractTask { /** @const {File} */ const storageFile = this.getStorage(destination) .bucket(destination.bucket) .file(destination.name); + const { allowMultiple = true } = this.config.options || {}; return this.exportTable_(sourceTable, storageFile).catch((error) => { - if (!destination.name.includes('*') && error.message.includes( - 'too large to be exported to a single file')) { + if (!destination.name.includes('*') && allowMultiple && + error.message.includes('too large to be exported to a single file')) { this.logger.info( 'Source table is too large to be exported in one file. Trying to ' + 'add * to the output file and export again now...'); @@ -97,9 +99,11 @@ class ExportTask extends BigQueryAbstractTask { * @private */ async exportTable_(sourceTable, storageFile) { + const destination = this.config.destination; const [job] = await sourceTable.extract(storageFile, this.config.options); const errors = job.status.errors; if (errors && errors.length > 0) throw errors; + const destinationUris = job.configuration.extract.destinationUris; const fileCounts = job.statistics.extract.destinationUriFileCounts[0]; const status = job.status.state; const { jobReference } = job; @@ -108,7 +112,8 @@ class ExportTask extends BigQueryAbstractTask { `Job[${jobId}] status ${status} with ${fileCounts} files.`); return { jobId, - parameters: this.appendParameter({ jobReference }), + parameters: this.appendParameter( + { jobReference, destination, destinationFile: destinationUris.join(',') }), }; } } diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/load_task.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/load_task.js index b7dec61..eba01b3 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/load_task.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/bigquery/load_task.js @@ -268,7 +268,7 @@ class LoadTask extends BigQueryAbstractTask { */ async getSchemaFromReportTask_(task, retriedTimes = 0) { try { - const schema = await task.getReport().generateSchema(); + const schema = await task.generateSchema(); return schema; } catch (error) { if (task.getReport().isFatalError(error.toString())) { @@ -305,5 +305,6 @@ module.exports = { TableSchema, LoadOptions, LoadTaskConfig, + LoadTaskDestination, LoadTask, }; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/gmc/gmc_webpage_fetcher.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/gmc/gmc_webpage_fetcher.js index 3ac16b8..4575a23 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/gmc/gmc_webpage_fetcher.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/gmc/gmc_webpage_fetcher.js @@ -97,7 +97,7 @@ class GmcWebpageFetch extends BaseTask { dom.window.document.querySelector('[itemtype="http://schema.org/Offer"]'); for (let i = 0; i < offer.children.length; i++) { const key = offer.children[i].getAttribute('itemprop'); - const value = offer.children[i].innerHTML.trim(); + const value = offer.children[i].getAttribute('content').trim(); result[key] = value; } break; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/index.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/index.js index a165a68..4c03d1c 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/index.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/index.js @@ -27,6 +27,7 @@ const { QueryTask } = require('./bigquery/query_task.js'); const { ExportSchemaTask } = require('./bigquery/export_schema_task.js'); const { DeleteDatasetTask } = require('./bigquery/delete_dataset_task.js'); const { DataTransferTask } = require('./bigquery/data_transfer_task.js'); +const { CreateExternalTableTask } = require('./bigquery/create_external_table.js'); const { CopyGcsTask } = require('./copy_gcs_task.js'); const { GmcXmlFeedToJsonlTask } = require('./gmc/gmc_xml_feed_to_jsonl_task.js'); const { GmcWebpageFetch } = require('./gmc/gmc_webpage_fetcher.js'); @@ -35,6 +36,7 @@ const { PredictTask } = require('./predict_task.js'); const { QueryAdhTask } = require('./query_adh_task.js'); const { ReportTask } = require('./report_task.js'); const { KnotTask } = require('./knot_task.js'); +const { SpeedControlledTask } = require('./internal/speed_controlled_task.js'); const { MultipleTask } = require('./multiple_task.js'); /** @@ -48,6 +50,7 @@ const ALL_TASKS = Object.freeze({ [TaskType.EXPORT_SCHEMA]: ExportSchemaTask, [TaskType.DELETE_DATASET]: DeleteDatasetTask, [TaskType.DATA_TRANSFER]: DataTransferTask, + [TaskType.CREATE_EXTERNAL]: CreateExternalTableTask, [TaskType.COPY_GCS]: CopyGcsTask, [TaskType.DOWNLOAD]: DownloadTask, [TaskType.GMC_XML_FEED_TO_JSONL]: GmcXmlFeedToJsonlTask, @@ -57,6 +60,7 @@ const ALL_TASKS = Object.freeze({ [TaskType.REPORT]: ReportTask, [TaskType.KNOT]: KnotTask, [TaskType.MULTIPLE]: MultipleTask, + [TaskType.SPEED_CONTROLLED]: SpeedControlledTask, }); /** diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/internal/speed_controlled_task.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/internal/speed_controlled_task.js new file mode 100644 index 0000000..c4d15ad --- /dev/null +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/internal/speed_controlled_task.js @@ -0,0 +1,205 @@ +// Copyright 2019 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @fileoverview Task to act as a pacing speed manager for a massive number of + * requests to a target service. + */ + +'use strict'; + +const { join } = require('path'); +const { + cloudfunctions: { getIdTokenForFunction }, + utils: { requestWithRetry, replaceParameters }, +} = require('@google-cloud/nodejs-common'); +const { + TaskType, + StorageFileConfig, + TaskGroup, + ErrorOptions, +} = require('../../task_config/task_config_dao.js'); +const { BaseTask } = require('../base_task.js'); + +/** + * The configuration for submitting a file to Tentacles HTTP Cloud Functions. + * + * `service` stands for the target Tentacles instance. + * `fileUrl` or `file` are used to present a GCS file resource that Tentacles + * will handle. + * `attributes` are attributes that Tentacles will figure out from the file + * name, e.g. `api`, `config`(name), `size`, etc. + * `config` is the object for Tentacles config. If this object is present, then + * in the `attribute`, the `config` will be ingored; otherwise, Tentacles will + * try to load config from Tentacles Firestore with the `api` and `config` in + * the `attributes` object. + * + * @typedef {{ + * service: { + * projectId: (string|undefined), + * locationId: (string|undefined), + * namespace: (string|undefined), + * url: (string|undefined), + * } + * fileUrl: (string|undefined), + * file: (!StorageFileConfig|undefined), + * attributes: (!Object|undefined), + * config: (!object|undefined) + * }} + */ +let TentaclesFileOption; + +/** + * The configuration object for SpeedControlledTask. + * @see TentaclesFileOption + * `dueTime` Task execution timeout time in minutes. If task group times out, it + * will be marked with 'timeout' error code. + * + * @typedef {{ + * type:TaskType.SPEED_CONTROLLED, + * source:!TentaclesFileOption, + * dueTime:(number|undefined), + * errorOptions:(!ErrorOptions|undefined), + * appendedParameters:(Object|undefined), + * next:(!TaskGroup|undefined), + * }} + */ +let SpeedControlledTaskConfig; + +/** + * This task is used to send a massive number of requests to a target service + * with a managed pacing speed, including QPS, records per request, etc. + * It is built based on the Tentacles: + * 1. This tast starts by submitting the information of a GCS file and + * configuration to the TentaclesFile management Cloud Function to create a + * new Tentacles File and related Tentacles Tasks; + * 2. Tentacles sends the requests with the specified speed setting; + * 3. This task checks whether all related Tentacles Tasks have been done from + * time to time and mark this task as done after all are done. + */ +class SpeedControlledTask extends BaseTask { + + /** @override */ + isManualAsynchronous() { + return true; + } + + /** + * @override + * This task can have un-replaced parameters as it manages other tasks, so + * it is possible that the managed ones need to replace those parameters. + */ + setConfig_() { + this.config = JSON.parse( + replaceParameters(this.originalConfigString, this.parameters, true)); + } + + /** + * Triggers TentaclesFile job to start this 'speed controlled' task. + * It submits the file and options to Tentacles HTTP CF to start the job and + * merges the TentaclesFile Id as `tentaclesFileId` to the parameters. This + * will be used to check the status of the task. + * @override + */ + async doTask() { + const { source } = this.config; + const options = await getRequestOption(source); + const file = getFileUrl(source); + const { attributes, config } = source; + options.data = { + file, + attributes, + config, + }; + const result = await requestWithRetry(options, this.logger); + const { fileId } = result; + this.logger.info(`Start tentacles tasks with fileId: ${fileId}`); + return { + parameters: this.appendParameter({ + tentaclesFileId: fileId, + startTime: Date.now(), + }), + } + } + + /** @override */ + async isDone() { + const { source } = this.config; + const options = await getRequestOption(source); + const { tentaclesFileId: fileId } = this.parameters; + if (!fileId) { + throw new Error('Fail to find TentaclesFileId'); + } + options.data = { fileId }; + const result = await requestWithRetry(options, this.logger); + const { status, error } = result; + if (status === 'done') return true; + if (error || status === 'error') { + this.logger.error(`TentaclesFile ${fileId} error.`, result); + throw new Error(`TentaclesFile ${fileId} error: ${error}`); + } + return false; + } + +} + +/** + * Gets the request options for the TentaclesFile management Cloud Function. + * This function will prepare the url and authorization token. + * @param {!TentaclesFileOption} options + * @return {{ +* url: string, +* headers: object, +* }} A request option for Gaxios request. +*/ +async function getRequestOption(options) { + const { service } = options; + const url = getServiceUrl(service); + const token = await getIdTokenForFunction(url); + return { + url, + headers: { Authorization: `bearer ${token}` }, + }; +} + +/** +* Returns the HTTP Cloud Functions url. By default, it returns the url of the +* TentaclesFile management Cloud Function of the given Tentacles instance. +* @param {object} service +* @return {string} The url for Tentacles HTTP Cloud Functions. +*/ +function getServiceUrl(service) { + if (service.url) return service.url; + const { projectId, locationId, namespace } = service; + return `https://${locationId}-${projectId}.cloudfunctions.net/${namespace}_http`; +} + +/** +* Returns the gsutil Url of the given GCS file. +* @param {!TentaclesFileOption} options +* @return {string} The gsutil Url +*/ +function getFileUrl(options) { + if (options.fileUrl) return options.fileUrl; + const { bucket, name } = options.file; + return 'gs://' + join(bucket, name); +} + +module.exports = { + TentaclesFileOption, + SpeedControlledTaskConfig, + SpeedControlledTask, + getRequestOption, + getFileUrl, +}; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/multiple_task.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/multiple_task.js index 6f1ba26..7393154 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/multiple_task.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/multiple_task.js @@ -15,9 +15,10 @@ /** @fileoverview Multiple Task class file. */ 'use strict'; +const lodash = require('lodash'); const {nanoid} = require('nanoid'); const { - utils: {apiSpeedControl, getProperValue,}, + utils: { apiSpeedControl, getProperValue, replaceParameters, requestWithRetry }, storage: {StorageFile}, } = require('@google-cloud/nodejs-common'); const { @@ -28,6 +29,11 @@ const { } = require('../task_config/task_config_dao.js'); const {FIELD_NAMES} = require('../task_log/task_log_dao.js'); const { KnotTask } = require('./knot_task.js'); +const { + TentaclesFileOption, + getRequestOption, + getFileUrl, +} = require('./internal/speed_controlled_task.js'); /** * `estimateRunningTime` Number of minutes to wait before doing the first task @@ -47,11 +53,13 @@ const { KnotTask } = require('./knot_task.js'); * target:'pubsub', * qps:number|undefined, * recordSize: number|undefined, + * message: object|undefined, * }|{ * taskId:string, * target:'gcs', - * file:!StorageFileConfig, -* recordSize: number|undefined, + * recordSize: number|undefined, + * file:(!StorageFileConfig|undefined), + * http:(!TentaclesFileOption|undefined), * }, * multiple:{ * estimateRunningTime:(number|undefined), @@ -87,6 +95,17 @@ class MultipleTask extends KnotTask { return this.parameters.numberOfTasks > 0; } + /** + * @override + * This task can have un-replaced parameters as it will start other tasks, so + * it is possible that those parameters will be replaced with given data for + * the target tasks.. + */ + setConfig_() { + this.config = JSON.parse( + replaceParameters(this.originalConfigString, this.parameters, true)); + } + /** @override */ async doTask() { let records; @@ -106,8 +125,12 @@ class MultipleTask extends KnotTask { const multipleTag = nanoid(); if (this.config.destination.target === 'pubsub') { numberOfTasks = await this.startThroughPubSub_(multipleTag, records); - } else { - numberOfTasks = await this.startThroughStorage_(multipleTag, records); + } else if (this.config.destination.target === 'gcs') { + if (this.config.destination.file) { + numberOfTasks = await this.startThroughStorage_(multipleTag, records); + } else if (this.config.destination.http) { + numberOfTasks = await this.startThroughHttp_(multipleTag, records); + } } return { parameters: this.appendParameter({ @@ -129,6 +152,7 @@ class MultipleTask extends KnotTask { */ async startThroughPubSub_(tag, records) { const qps = getProperValue(this.config.destination.qps, 1, false); + const { message: configedMessage } = this.config.destination; const managedSend = apiSpeedControl( this.recordSize, 1, qps, (batchResult) => batchResult); const sendSingleMessage = async (lines, batchId) => { @@ -140,11 +164,17 @@ class MultipleTask extends KnotTask { if (this.recordSize > 1) { extraParameters.records = lines.join('\n'); } + const finalParameters = + Object.assign({}, this.parameters, extraParameters); + const message = configedMessage + ? replaceParameters(JSON.stringify(configedMessage), finalParameters) + : JSON.stringify(finalParameters); try { await this.taskManager.startTasks( this.config.destination.taskId, { [FIELD_NAMES.MULTIPLE_TAG]: tag }, - JSON.stringify(Object.assign({}, this.parameters, extraParameters))); + message + ); return true; } catch (error) { this.logger.error(`Pub/Sub message[${batchId}] failed.`, error); @@ -231,6 +261,74 @@ class MultipleTask extends KnotTask { return tasks.length; } + /** + * Sends a file to Tentacles HTTP Cloud Functions to start the multiple task. + * In this way, the Tentacles config can be embedded here and not required to + * be uploaded to Tentacles' Firestore. It also brings the flexibility to + * modify the config items based on the file to be processed. + * @param {string} tag The multiple task tag to mark task instances. + * @param {!Array} records Array of multiple task instances data. Each + * element is a JSON string of the 'appendedParameters' object for an task + * instance. + * @return {!Promise} Number of multiple tasks. + * @private + */ + async startThroughHttp_(tag, records) { + const tasks = []; + const tentaclesFile = {}; + // Generate a new file if there are more than one record for one task. + if (this.recordSize > 1) { + for (let i = 0; i < records.length; i += this.recordSize) { + const chunk = records.slice(i, i + this.recordSize); + tasks.push(JSON.stringify({ records: chunk.join('\n') })); + } + const sourceFile = this.config.source.file; + tentaclesFile.file = { + bucket: sourceFile.bucket, + name: sourceFile.name + '_for_multiple_task', + }; + const outputFile = StorageFile.getInstance( + tentaclesFile.file.bucket, tentaclesFile.file.name, { + projectId: sourceFile.projectId, + keyFilename: sourceFile.keyFilename, + }); + await outputFile.getFile().save(tasks.join('\n')); + } else { + tasks.push(...records); + tentaclesFile.file = this.config.source.file; + } + // To keep the TaskConfig simple, fill default fields here. + const tentaclesFileOption = lodash.merge({ + service: { + projectId: '${projectId}', + locationId: '${locationId}', + namespace: '${namespace}', + }, + attributes: { api: 'PB' }, + config: { + topic: this.taskManager.getMonitorTopicName(), + attributes: { + taskId: this.config.destination.taskId, + [FIELD_NAMES.MULTIPLE_TAG]: tag, + } + } + }, this.config.destination.http); + const finalOption = JSON.parse(replaceParameters( + JSON.stringify(tentaclesFileOption), this.parameters, true)); + const options = await getRequestOption(finalOption); + const file = getFileUrl(tentaclesFile); + const { attributes, config } = finalOption; + options.data = { + file, + attributes, + config, + }; + const result = await requestWithRetry(options, this.logger); + const { fileId } = result; + this.parameters.tentaclesFileId = fileId; + return tasks.length; + } + /** * Return true if the instance number is 0. * Otherwise return false if there is a 'estimateRunningTime' and the time is diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/base_report.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/base_report.js index adb3de6..508e805 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/base_report.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/base_report.js @@ -20,12 +20,13 @@ const {TableSchema: BqTableSchema} = require('@google-cloud/bigquery'); const { api: { - doubleclicksearch: {ReportRequest: Sa360ReportConfig}, + doubleclicksearch: { ReportRequest: Sa360LegacyReportConfig }, googleads: ReportQueryConfig, doubleclickbidmanager: {RequestBody: Dv360RequestBody}, youtube: {ListChannelsConfig, ListVideosConfig}, } } = require('@google-cloud/nodejs-common'); +const { StorageFileConfig } = require('../../task_config/task_config_dao.js'); /** * Campaign Manager report configuration. @@ -50,18 +51,31 @@ let CmReportConfig; */ let Dv360ReportConfig; +/** + * SA360 (Search Ads 360) Reporting API configuration. + * @typedef {{ + * secretName: (string|undefined), + * customerId: string, + * loginCustomerId: string, + * query:(string|undefined), + * file:(!StorageFileConfig|undefined), + * }} + */ +let Sa360ReportConfig; + /** * General API result configuration. * @typedef {{ * secretName: (string|undefined), * packageName: string, - * className: string, - * functionObjectName: (string|undefined), - * functionName: string, + * api: string, + * resource: string, + * functionName: (string|undefined), * args: (Object|undefined), * limit: (number|undefined), * entityPath: string, * pageTokenPath: (string|undefined), + * fieldMask: (string|undefined), * }} */ let ApiResultConfig; @@ -73,6 +87,8 @@ let ApiResultConfig; * developerToken: string, * customerId: string|undefined, * loginCustomerId: string|undefined, + * query:(string|undefined), + * file:(!StorageFileConfig|undefined), * reportQuery: ReportQueryConfig|undefined, * }} */ @@ -99,6 +115,9 @@ let YouTubeReportConfig; * target: 'DV360', * config: Dv360ReportConfig, * } | { + * target: 'DS', + * config: Sa360LegacyReportConfig, + * } | { * target: 'SA360', * config: Sa360ReportConfig, * } | { @@ -108,6 +127,9 @@ let YouTubeReportConfig; * target: 'ADS', * config: AdsReportConfig, * } | { + * target: 'ADSL', + * config: AdsReportConfig, + * } | { * target: 'YT', * config: YouTubeReportConfig, * }} @@ -179,10 +201,11 @@ class Report { /** * Returns the schema of current report's data structure to help BigQuery load * the data into Table. + * @param {Object=} parameters Parameters of this instance. * @return {!BqTableSchema} BigQuery load schema, see: * https://cloud.google.com/bigquery/docs/schemas */ - generateSchema() { + generateSchema(parameters) { throw new Error('Unimplemented method.'); } diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/campaign_manager_report.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/campaign_manager_report.js index 73f6a67..9f9ed0e 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/campaign_manager_report.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/campaign_manager_report.js @@ -55,48 +55,61 @@ class CampaignManagerReport extends Report { /** * Cleans up the content of report. CM reports are unable to be customized, so * use this function to get rid of unwanted lines, e.g. summary line. - * @param {stream} stream + * @param {stream} reportStream * @return {string} */ - clean(stream) { - let last = ''; + clean(reportStream) { + const START_TAG = '\nReport Fields'; + const END_TAG = '\nGrand Total:'; + + let previousPiece = ''; let started = false; + let ended = false; const streamReportTransform = new Transform({ transform(chunk, encoding, callback) { - const data = chunk.toString(); - let toCheck = last + data; + const currentPiece = chunk.toString(); + let toCheck = previousPiece + currentPiece; + if (ended) return callback(); if (!started) { - const startIndex = toCheck.indexOf('Report Fields'); + const startIndex = toCheck.indexOf(START_TAG); if (startIndex === -1) { - last = toCheck; - callback(null, ''); + previousPiece = toCheck; + return callback(null, ''); } else { - last = ''; - toCheck = toCheck.substring(startIndex + 'Report Fields'.length + 1); + previousPiece = ''; + toCheck = toCheck.substring(startIndex + START_TAG.length + 1); started = true; } } if (started) { - const endIndex = toCheck.indexOf('Grand Total:'); + const endIndex = toCheck.indexOf(END_TAG); if (endIndex === -1) { - const output = last; - last = last === '' ? toCheck : data; + let output = ''; + if (currentPiece.length < END_TAG.length) { + previousPiece = toCheck; + } else { + output = previousPiece; + previousPiece = previousPiece === '' ? toCheck : currentPiece; + } callback(null, output); } else { - callback(null, toCheck.substring(0, endIndex)); - this.end(); + ended = true; + this.push(toCheck.substring(0, endIndex + 1)); + this.push(null); + callback(); } } } }); - stream.on('error', (error) => streamReportTransform.emit('error', error)); + reportStream + .on('error', (error) => streamReportTransform.emit('error', error)); streamReportTransform.on('end', () => { if (!started) { streamReportTransform.emit('error', new Error(`Can't find 'Report Fields' line. Wrong report format?`)); } }); - return stream.pipe(streamReportTransform); + return reportStream.pipe(streamReportTransform); } } diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/doubleclick_bidmanager_report.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/doubleclick_bidmanager_report.js index 29ae151..f23c86d 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/doubleclick_bidmanager_report.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/doubleclick_bidmanager_report.js @@ -68,31 +68,38 @@ class DoubleClickBidManagerReport extends Report { * separated in two different chunks, an extra parameter 'last' is used to * store data from the last chunk. * - * @param {stream} content + * @param {stream} reportStream * @return {string} */ - clean(content) { - let last = ''; + clean(reportStream) { + const END_TAG = '\n\n'; + let previousPiece = ''; + let ended = false; const streamReportTransform = new Transform({ transform(chunk, encoding, callback) { - const data = chunk.toString(); - const toCheck = last + data; - if (toCheck.indexOf('\n\n') === -1) { - const output = last; - last = data; + const currentPiece = chunk.toString(); + const toCheck = previousPiece + currentPiece; + if (ended) return callback(); + if (toCheck.indexOf(END_TAG) === -1) { + const output = previousPiece; + previousPiece = currentPiece; callback(null, output); } else { - const lines = toCheck.substring(0, toCheck.indexOf('\n\n')).split('\n'); - if (lines[lines.length - 1].startsWith(',')) { - lines.pop(); + ended = true; + const latest = + toCheck.substring(0, toCheck.indexOf(END_TAG)).split('\n'); + if (latest[latest.length - 1].startsWith(',')) { + latest.pop(); } - callback(null, lines.join('\n')); - this.end(); + this.push(latest.join('\n')); + this.push(null); + callback(); } } }); - content.on('error', (error) => streamReportTransform.emit('error', error)); - return content.pipe(streamReportTransform); + reportStream + .on('error', (error) => streamReportTransform.emit('error', error)); + return reportStream.pipe(streamReportTransform); } } diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/general_api_result.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/general_api_result.js index bfd2982..35fbfc5 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/general_api_result.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/general_api_result.js @@ -17,7 +17,7 @@ 'use strict'; -const { api, utils: { getObjectByPath } } +const { api, utils: { getObjectByPath, getFilterAndStringifyFn } } = require('@google-cloud/nodejs-common'); const { Report } = require('./base_report.js'); @@ -32,8 +32,8 @@ const { Report } = require('./base_report.js'); * following conditions: * 1. It has its own class(`className`) and been exported as an object * (`packageName`) in the `api` object of nodejs-common; - * 2. The class offer a function named `getFunctionObject` to return the - * instance of function object created based on Google API client library; + * 2. The class offer a function named `getApiClient` to return the + * instance of this Api based on Google API client library; * 3. If the API supports next page token, then the proper way to use the token * is to set it as property `pageToken` in the following request. * @see ApiResultConfig in './base_report.js' @@ -42,8 +42,11 @@ class GeneralApiResult extends Report { constructor(config, apiStub) { super(config); - const { packageName, className } = this.config; - this.apiStub = apiStub || new api[packageName][className](super.getOption()); + const { packageName: configedPackage, api: className } = this.config; + const packageName = + configedPackage ? configedPackage : className.toLowerCase(); + this.apiStub = + apiStub || new api[packageName][className](super.getOption()); } /** @override */ @@ -53,7 +56,7 @@ class GeneralApiResult extends Report { /** @override */ isReady(parameters) { - return Promise.resolve(true); + return true; } /** @override */ @@ -64,29 +67,52 @@ class GeneralApiResult extends Report { /** @override */ async getContent(parameters) { const { - functionObjectName, - functionName, + resource, + functionName = 'list', args, limit = 0, entityPath, pageTokenPath, + fieldMask, } = this.config; - const functionObject = - await this.apiStub.getFunctionObject(functionObjectName); + const apiClient = await this.apiStub.getApiClient(); + const functionObject = getObjectByPath(apiClient, resource); + const transformFn = + fieldMask ? getFilterAndStringifyFn(fieldMask) : JSON.stringify; let result = []; let updatedArgs = args; let pageToken; do { const response = await functionObject[functionName](updatedArgs); - result = result.concat(getObjectByPath(response, entityPath)); + result = + result.concat(getObjectByPath(response, entityPath).map(transformFn)); if (pageTokenPath) { pageToken = getObjectByPath(response, pageTokenPath); - if (pageToken) updatedArgs = Object.assign(args, { pageToken }); + if (pageToken) updatedArgs = + Object.assign(args, getNextPageArgs(pageTokenPath, pageToken)); } } while (pageToken && (limit === 0 || limit > result.length)); result = limit > 0 ? result.slice(0, limit) : result; - return result.map(JSON.stringify).join('\n'); + return result.join('\n'); } } +/** + * Getst the arguments for next page (request) of the same Api request. + * @param {string} pageTokenPath + * @param {string} value + * @return {object} Extra arguments for next page (request). + */ +function getNextPageArgs(pageTokenPath, value) { + if (pageTokenPath.endsWith('nextPageToken')) { + return { pageToken: value }; + } + if (pageTokenPath.endsWith('nextLink')) { + const urlParams = new URL(value); + const index = urlParams.searchParams.get('start-index'); + return { 'start-index': index }; + } + throw new Error(`Unsupported pageTokenPath: ${pageTokenPath}`); +} + module.exports = { GeneralApiResult }; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report.js index d7a3e2a..202b8d7 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report.js @@ -25,7 +25,7 @@ const { GoogleAdsField, } }, - utils: {extractObject,}, + utils: { extractObject, changeObjectNamingFromSnakeToLowerCamel }, } = require('@google-cloud/nodejs-common'); const {Report} = require('./base_report.js'); const {getSchemaFields} = require('./googleads_report_helper.js'); @@ -68,12 +68,9 @@ class GoogleAdsReport extends Report { (this.config.reportQuery.segments || []) .concat(this.config.reportQuery.metrics || []) .concat(this.config.reportQuery.attributes || []); - const adsFields = await this.ads.searchMetaData(this.config.loginCustomerId, - adsFieldNames); - /** @type {{string:!GoogleAdsField}} */ const adsFieldsMap = {}; - adsFields.forEach( - (adsField) => void (adsFieldsMap[adsField.name] = adsField)); - const fields = getSchemaFields(adsFieldNames, adsFieldsMap); + const adsFields = (await this.ads.searchMetaData(this.config.loginCustomerId, + adsFieldNames)).map(changeObjectNamingFromSnakeToLowerCamel); + const fields = getSchemaFields(adsFieldNames, adsFields, true); return {fields}; } diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report_api.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report_api.js new file mode 100644 index 0000000..c1e68c0 --- /dev/null +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report_api.js @@ -0,0 +1,63 @@ +// Copyright 2024 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this fileAccessObject except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +/** + * @fileoverview Interface for Google Ads Reporting. + */ + +'use strict'; + +const { buildQuery } = require('google-ads-api/build/src/query.js'); +const { api: { googleadsapi: { GoogleAdsApi } } } + = require('@google-cloud/nodejs-common'); +const { SearchAdsReport } = require('./search_ads_report.js'); + +/** + * Error messages that the task should fail directly without retry process. + * @type {Array} + */ +const FatalErrors = ['PERMISSION_DENIED: The caller does not have permission']; + +/** Google Ads Report class. */ +class GoogleAdsReport extends SearchAdsReport { + + constructor(config, apiInstance) { + super(config, apiInstance); + } + + /** @override */ + isFatalError(errorMessage) { + return FatalErrors.some( + (fatalErrorMessage) => errorMessage.indexOf(fatalErrorMessage) > -1 + ); + } + + /** @override */ + getApiInstance() { + if (!this.apiInstance) { + this.apiInstance = + new GoogleAdsApi(this.config.developerToken, false, super.getOption()); + } + return this.apiInstance; + } + + /** @override */ + async getQuery(parameters) { + if (this.config.reportQuery) { + return buildQuery(this.config.reportQuery).gaqlQuery; + } + return super.getQuery(parameters); + } +} + +module.exports = { GoogleAdsReport }; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report_helper.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report_helper.js index 179d033..b267b65 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report_helper.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/googleads_report_helper.js @@ -18,8 +18,11 @@ */ 'use strict'; -const {api: {googleads: {GoogleAdsField}}} = require( - '@google-cloud/nodejs-common'); +const lodash = require('lodash'); +const { + api: { googleadsapi: { GoogleAdsField } }, + utils: { changeNamingFromSnakeToLowerCamel }, +} = require('@google-cloud/nodejs-common'); /** * Definition of the field in BigQuery schema. @@ -37,24 +40,24 @@ let FieldInSchema; * objects defined in Google Ads API. Those detailed properties don't show up at * the GoogleAdsFieldService. They will fail the load job to BigQuery without * detailed definition in schema. - * So we need to manually define the (required) fields. - * TODO: (question) Is there a way to get this part? + * So we need to manually define the (required) fields in snake case to align + * with the responses of Google Ads API. + * Note, only needed fields are mapped. * * @type {{string: Array}} */ const GOOGLE_ADS_MESSAGES = { - // Only map needed fields. - // https://developers.google.com/google-ads/api/reference/rpc/v7/AdTextAsset + // https://developers.google.com/google-ads/api/reference/rpc/latest/AdTextAsset AdTextAsset: [ {name: 'text', type: 'STRING',}, {name: 'pinned_field', type: 'STRING',}, ], - // https://developers.google.com/google-ads/api/reference/rpc/v7/PolicyTopicEntry + // https://developers.google.com/google-ads/api/reference/rpc/latest/PolicyTopicEntry PolicyTopicEntry: [ {name: 'topic', type: 'STRING',}, {name: 'type', type: 'STRING',}, ], - // https://developers.google.com/google-ads/api/reference/rpc/v7/AdGroupAdAssetPolicySummary + // https://developers.google.com/google-ads/api/reference/rpc/latest/AdGroupAdAssetPolicySummary AdGroupAdAssetPolicySummary: [ { name: 'policy_topic_entries', @@ -68,7 +71,7 @@ const GOOGLE_ADS_MESSAGES = { {name: 'review_status', type: 'STRING',}, {name: 'approval_status', type: 'STRING',}, ], - // https://developers.google.com/google-ads/api/reference/rpc/v7/ChangeEvent.ChangedResource + // https://developers.google.com/google-ads/api/reference/rpc/latest/ChangeEvent.ChangedResource ChangedResource: [ { name: 'campaign', @@ -89,6 +92,40 @@ const GOOGLE_ADS_MESSAGES = { ], }; +/** + * By default, the mapped fields in snake_case. This function returns the mapped + * fields in lower camel case. + * @see GOOGLE_ADS_MESSAGES + * @param {!Array} mappedFields + * @return {!Array} + */ +const convertMappedMessageCase = (mappedFields) => { + return mappedFields.map((field) => { + const newField = lodash.merge({}, field, + { name: changeNamingFromSnakeToLowerCamel(field.name) }) + if (field.fields) { + newField.fields = convertMappedMessageCase(field.fields); + } + return newField; + }) +} + +/** + * Gets the mapped messages based on specified case type. + * @param {boolean=} snakeCase + * @return {!Array} + */ +const getMappedMessages = (snakeCase = false) => { + if (snakeCase === true) return GOOGLE_ADS_MESSAGES; + const GOOGLE_ADS_MESSAGES_CAMEL_CASE = {}; + Object.keys(GOOGLE_ADS_MESSAGES).forEach((key) => { + GOOGLE_ADS_MESSAGES_CAMEL_CASE[key] = + convertMappedMessageCase(GOOGLE_ADS_MESSAGES[key]); + }); + return GOOGLE_ADS_MESSAGES_CAMEL_CASE; +} + + /** * Maps the array of AdsFields to a structured object. e.g. AdsFields array * [ @@ -162,14 +199,13 @@ const getBigQueryDataType = (dateType) => { * @return {!FieldInSchema} */ const getSingleField = (name, adsField, mappedTypes) => { - // console.log(name, adsField); - const type = getBigQueryDataType(adsField.data_type); + const type = getBigQueryDataType(adsField.dataType); const field = {name, type}; - if (adsField.is_repeated) field.mode = 'REPEATED'; + if (adsField.isRepeated) field.mode = 'REPEATED'; if (type === 'RECORD') { - const types = adsField.type_url.split('.'); + const types = adsField.typeUrl.split('.'); const fields = mappedTypes[types[types.length - 1]]; - if (!fields) throw new Error(`${adsField.type_url} isn't defined.`); + if (!fields) throw new Error(`${adsField.typeUrl} isn't defined.`); field.fields = fields; } return field; @@ -200,22 +236,30 @@ const getSingleField = (name, adsField, mappedTypes) => { * * To understand more about: * 1. Google Ads segments: - * https://developers.google.com/google-ads/api/fields/v4/segments + * https://developers.google.com/google-ads/api/fields/latest/segments * 2. Google Ads metrics: - * https://developers.google.com/google-ads/api/fields/v4/metrics + * https://developers.google.com/google-ads/api/fields/latest/metrics * 3. Google Ads resources: - * https://developers.google.com/google-ads/api/reference/rpc/v4/overview + * https://developers.google.com/google-ads/api/reference/rpc/latest/overview * 4. BigQuery data type: * https://cloud.google.com/bigquery/docs/schemas#standard_sql_data_types * - * @param {Array} adsFieldNames - * @param {{string:GoogleAdsField}} adsFieldsMap - * @param {{string:Array}=} mappedTypes - * Default value @link GOOGLE_ADS_MESSAGES + * @param {!Array} adsFieldNames + * @param {!Array} adsFields + * @param {boolean=} snakeCase Default value False * @return {!Array} */ -const getSchemaFields = (adsFieldNames, adsFieldsMap, - mappedTypes = GOOGLE_ADS_MESSAGES) => { +const getSchemaFields = (adsFieldNames, adsFields, snakeCase = false) => { + const adsFieldsMap = {}; + adsFields.forEach((adsField) => { + const key = snakeCase + ? adsField.name : changeNamingFromSnakeToLowerCamel(adsField.name); + adsFieldsMap[key] = adsField; + }); + const finalFieldNames = snakeCase + ? adsFieldNames : adsFieldNames.map(changeNamingFromSnakeToLowerCamel); + const mappedTypes = getMappedMessages(snakeCase); + /** * Map an array of GoogleAdsFields to an array of fields in BigQuery load * schema. @@ -238,7 +282,7 @@ const getSchemaFields = (adsFieldNames, adsFieldsMap, (subKey) => getSchemaFromObject(subKey, value[subKey], newPrefix)), }; }; - const structuredAdFields = mapAdsFieldsToObject(adsFieldNames); + const structuredAdFields = mapAdsFieldsToObject(finalFieldNames); return Object.keys(structuredAdFields).map( (key) => getSchemaFromObject(key, structuredAdFields[key])); } diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/index.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/index.js index 080c000..7897044 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/index.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/index.js @@ -21,10 +21,12 @@ const {Report, ReportConfig} = require('./base_report.js'); const {CampaignManagerReport} = require('./campaign_manager_report.js'); const {DoubleClickSearchReport} = require('./doubleclick_search_report.js'); +const { SearchAdsReport } = require('./search_ads_report.js'); const {DoubleClickBidManagerReport} = require('./doubleclick_bidmanager_report.js'); const { GeneralApiResult } = require('./general_api_result.js'); -const {GoogleAdsReport} = require('./googleads_report.js'); +const { GoogleAdsReport: GoogleAdsReportLegacy } = require('./googleads_report.js'); +const { GoogleAdsReport } = require('./googleads_report_api.js'); const {YouTubeReport} = require('./youtube_report.js'); /** @@ -33,9 +35,12 @@ const {YouTubeReport} = require('./youtube_report.js'); */ const REPORTING_FACTORY = Object.freeze({ 'CM': CampaignManagerReport, - 'SA360': DoubleClickSearchReport, + 'CM360': CampaignManagerReport,// Alternative code support + 'DS': DoubleClickSearchReport, // This will be discontinued after 2024/07 + 'SA360': SearchAdsReport, 'DV360': DoubleClickBidManagerReport, 'API': GeneralApiResult, + 'ADSL': GoogleAdsReportLegacy, 'ADS': GoogleAdsReport, 'YT': YouTubeReport, }); diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/search_ads_report.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/search_ads_report.js new file mode 100644 index 0000000..c316b6a --- /dev/null +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report/search_ads_report.js @@ -0,0 +1,113 @@ +// Copyright 2024 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this fileAccessObject except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +/** + * @fileoverview Interface for Search Ads 360 Reporting API. + */ + +'use strict'; + +const { + api: { searchads: { SearchAds } }, + storage: { StorageFile }, + utils: { replaceParameters, changeNamingFromSnakeToLowerCamel }, +} = require('@google-cloud/nodejs-common'); +const { Report } = require('./base_report.js'); +const { getSchemaFields } = require('./googleads_report_helper.js'); + +/** Search Ads 360 Report class. */ +class SearchAdsReport extends Report { + + /**@override */ + constructor(config, apiInstance) { + super(config); + this.apiInstance = apiInstance; + } + + /** @override */ + generate(parameters) { + return Promise.resolve(); + } + + /** @override */ + isReady(parameters) { + return Promise.resolve(true); + } + + /** @override */ + isAsynchronous() { + return false; + } + + /** + * Initializes and returns the Api instance which will handle requests to the + * target Api. + */ + getApiInstance() { + if (!this.apiInstance) { + this.apiInstance = new SearchAds(super.getOption()); + } + return this.apiInstance; + } + + /** @override */ + async generateSchema() { + const query = await this.getFinalQuery(this.parameters); + const { snakeCase = false } = this.config; + /** @type {Array} */ const adsFieldNames = + query.replaceAll('\n', '').replace(/select(.*)from.*/gi, '$1') + .split(',').map((field) => field.trim()); + const adsFields = + await this.getApiInstance().searchReportField(adsFieldNames); + const fields = getSchemaFields(adsFieldNames, adsFields, snakeCase); + return { fields }; + } + + /** + * @override + */ + async getContent(parameters) { + const query = await this.getFinalQuery(parameters); + const { customerId, loginCustomerId, snakeCase = false } = this.config; + const stream = await this.getApiInstance().cleanedRestStreamReport( + customerId, loginCustomerId, query, snakeCase); + return stream; + } + + /** + * Returns the report query with parameters replaced. + * @return {!Promise} + */ + async getFinalQuery(parameters) { + const query = await this.getQuery(); + return replaceParameters(query, parameters); + } + + /** + * Returns the report query based on configuration. + * @return {!Promise} + */ + async getQuery() { + if (this.config.query) return this.config.query; + if (this.config.file) { + /** @const {StorageFileConfig} */ + const { bucket, name, projectId } = this.config.file; + const query = await StorageFile + .getInstance(bucket, name, { projectId }).loadContent(0); + return query; + } + throw new Error(`Fail to find report query for ReportTask.`); + } +} + +module.exports = { SearchAdsReport }; diff --git a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report_task.js b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report_task.js index 9490270..448df0f 100644 --- a/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report_task.js +++ b/marketing-analytics/activation/data-tasks-coordinator/src/tasks/report_task.js @@ -130,11 +130,10 @@ class ReportTask extends BaseTask { } catch (error) { if (report.isFatalError(error.toString())) { this.logger.error( - 'Fail immediately without retry for ReportTask error: ', - error.toString()); + 'Fail immediately without retry for ReportTask error: ', error); throw error; } else { - this.logger.error('Retry for ReportTask error: ', error.toString()); + this.logger.error('Retry for ReportTask error: ', error); throw new RetryableError(error.toString()); } } @@ -145,7 +144,20 @@ class ReportTask extends BaseTask { * @return {Report} Report instance. */ getReport() { - return this.options.buildReport(this.config.source); + if (!this.report) { + this.report = this.options.buildReport(this.config.source); + } + return this.report; + } + + /** + * Returns the schema of current report's data structure to help BigQuery load + * the data into Table. + * @return {!BqTableSchema} BigQuery load schema, see: + * https://cloud.google.com/bigquery/docs/schemas + */ + generateSchema() { + return this.getReport().generateSchema(this.parameters); } /** diff --git a/marketing-analytics/activation/gmp-googleads-connector/config_api.json.template b/marketing-analytics/activation/gmp-googleads-connector/config_api.json.template index 2cd4a82..ec33d32 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/config_api.json.template +++ b/marketing-analytics/activation/gmp-googleads-connector/config_api.json.template @@ -173,7 +173,7 @@ "operation": "create|remove", "listId": "[YOUR-CUSTOMER-MATCH-LIST-ID]", "listName": "[YOUR-CUSTOMER-MATCH-LIST-NAME]", - "uploadKeyType": "CONTACT_INFO|CRM_ID|MOBILE_ADVERTISING_ID|CRM_ID|undefined", + "uploadKeyType": "CONTACT_INFO|CRM_ID|MOBILE_ADVERTISING_ID|undefined", "userAttribute": "UserAttribute|undefined", "customerMatchUserListMetadata": { "consent": { diff --git a/marketing-analytics/activation/gmp-googleads-connector/deploy.sh b/marketing-analytics/activation/gmp-googleads-connector/deploy.sh index 61b42ad..b821a49 100755 --- a/marketing-analytics/activation/gmp-googleads-connector/deploy.sh +++ b/marketing-analytics/activation/gmp-googleads-connector/deploy.sh @@ -39,6 +39,7 @@ CONFIG_ITEMS=( "REGION" "GCS_BUCKET" "DATABASE_ID" + "DATABASE_MODE" "SECRET_NAME" "OUTBOUND" "ENABLE_VISUALIZATION" @@ -855,13 +856,15 @@ EOF quit_if_failed $? check_firestore_existence local auth - if [[ -f "$(pwd)/${OAUTH2_TOKEN_JSON}" ]]; then + if [[ -n "${SECRET_NAME}" ]]; then + auth="SECRET_NAME=${SECRET_NAME}" + elif [[ -f "$(pwd)/${OAUTH2_TOKEN_JSON}" ]]; then auth="OAUTH2_TOKEN_JSON=$(pwd)/${OAUTH2_TOKEN_JSON}" elif [[ -f "$(pwd)/${SA_KEY_FILE}" ]]; then auth="API_SERVICE_ACCOUNT=$(pwd)/${SA_KEY_FILE}" fi printf '%s\n' " Setting environment variable of auth: ${auth}" - env "${auth}" node -e "require('./index.js').localApiRequester(\ + env "${auth}" "DEBUG=true" node -e "require('./index.js').localApiRequester(\ '${PROJECT_NAMESPACE}', process.argv[1], process.argv[2])" "$@" } diff --git a/marketing-analytics/activation/gmp-googleads-connector/package.json b/marketing-analytics/activation/gmp-googleads-connector/package.json index ba4be3e..0efeab1 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/package.json +++ b/marketing-analytics/activation/gmp-googleads-connector/package.json @@ -1,6 +1,6 @@ { "name": "@google-cloud/gmp-googleads-connector", - "version": "4.3.0", + "version": "4.4.0", "description": "GMP & Google Ads connector based on Cloud Functions", "author": "Google Inc.", "license": "Apache-2.0", @@ -20,8 +20,8 @@ "homepage": "https://github.com/GoogleCloudPlatform/cloud-for-marketing/blob/master/marketing-analytics/activation/gmp-googleads-connector/README.md", "main": "index.js", "dependencies": { - "@google-cloud/nodejs-common": "2.1.0", - "@google-cloud/storage": "^7.0.1", + "@google-cloud/nodejs-common": "^2.2.0", + "@google-cloud/storage": "^7.9.0", "lodash": "^4.17.21", "nanoid": "^3.3.4", "ssh2-sftp-client": "^8.0.0" diff --git a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_call_conversions_upload.js b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_call_conversions_upload.js index 39ef5f5..4db93aa 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_call_conversions_upload.js +++ b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_call_conversions_upload.js @@ -21,7 +21,7 @@ const { api: { googleadsapi: { GoogleAdsApi: GoogleAds } }, - utils: { BatchResult }, + utils: { BatchResult, changeObjectNamingFromSnakeToLowerCamel }, } = require('@google-cloud/nodejs-common'); const { GoogleAdsConversionConfig, @@ -47,7 +47,8 @@ class GoogleAdsCallConversionUpload extends GoogleAdsClickConversionUpload { const result = await this.setCustomVariable(googleAds, config); if (result) return result; const managedSend = this.getManagedSendFn(config); - const { customerId, loginCustomerId, adsConfig } = config; + const { customerId, loginCustomerId, adsConfig } = + changeObjectNamingFromSnakeToLowerCamel(config); const configuredUpload = googleAds.getUploadCallConversionFn(customerId, loginCustomerId, adsConfig); return managedSend(configuredUpload, records, messageId); diff --git a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_click_conversions_upload.js b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_click_conversions_upload.js index b620565..9b3eb23 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_click_conversions_upload.js +++ b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_click_conversions_upload.js @@ -21,7 +21,7 @@ const { api: { googleadsapi: { GoogleAdsApi: GoogleAds, ConversionConfig } }, - utils: { getProperValue, BatchResult }, + utils: { getProperValue, changeObjectNamingFromSnakeToLowerCamel, BatchResult }, } = require('@google-cloud/nodejs-common'); const { ApiHandler } = require('./api_handler.js'); @@ -128,11 +128,12 @@ class GoogleAdsClickConversionUpload extends ApiHandler { * @return {!BatchResult|undefined} */ async setCustomVariable(googleAds, config) { - const { customerId, loginCustomerId, adsConfig } = config; - if (adsConfig.custom_variable_tags) { + const { customerId, loginCustomerId, adsConfig } = + changeObjectNamingFromSnakeToLowerCamel(config); + if (adsConfig.customVariableTags) { try { adsConfig.customVariables = await Promise.all( - adsConfig.custom_variable_tags.map(async (tag) => { + adsConfig.customVariableTags.map(async (tag) => { const id = await googleAds.getConversionCustomVariableId(tag, customerId, loginCustomerId); if (!id) throw new Error(`Couldn't find the tag named ${tag}.`); diff --git a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_conversion_adjustments_upload.js b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_conversion_adjustments_upload.js index 914d734..9ebda87 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_conversion_adjustments_upload.js +++ b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_conversion_adjustments_upload.js @@ -21,7 +21,7 @@ const { api: { googleadsapi: { GoogleAdsApi: GoogleAds } }, - utils: { BatchResult }, + utils: { BatchResult, changeObjectNamingFromSnakeToLowerCamel }, } = require('@google-cloud/nodejs-common'); const { GoogleAdsConversionConfig, @@ -45,7 +45,8 @@ class GoogleAdsConversionAdjustment extends GoogleAdsClickConversionUpload { */ async sendDataInternal(googleAds, records, messageId, config) { const managedSend = this.getManagedSendFn(config); - const { customerId, loginCustomerId, adsConfig } = config; + const { customerId, loginCustomerId, adsConfig } = + changeObjectNamingFromSnakeToLowerCamel(config); const configuredUpload = googleAds.getUploadConversionAdjustmentFn( customerId, loginCustomerId, adsConfig); return managedSend(configuredUpload, records, messageId); diff --git a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_customer_match_upload.js b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_customer_match_upload.js index d8ac64d..b1d23f0 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_customer_match_upload.js +++ b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_customer_match_upload.js @@ -21,7 +21,7 @@ const { api: { googleadsapi: { GoogleAdsApi: GoogleAds, CustomerMatchConfig } }, - utils: { getProperValue, BatchResult }, + utils: { getProperValue, changeObjectNamingFromSnakeToLowerCamel, BatchResult }, } = require('@google-cloud/nodejs-common'); const { GoogleAdsClickConversionUpload } = require('./google_ads_click_conversions_upload.js'); @@ -74,17 +74,16 @@ class GoogleAdsCustomerMatch extends GoogleAdsClickConversionUpload { * Creates the list if it doesn't exist and returns the Id. * @param {GoogleAds} googleAds Injected Google Ads instance. * @param {{ - * list_id:(string|undefined), - * list_name:(string|undefined), - * upload_key_type:('CONTACT_INFO'|'CRM_ID'|'MOBILE_ADVERTISING_ID'|undefined), + * listId:(string|undefined), + * listName:(string|undefined), + * uploadKeyType:('CONTACT_INFO'|'CRM_ID'|'MOBILE_ADVERTISING_ID'|undefined), * }} config * @return {string} User list Id. */ async getOrCreateUserList(googleAds, config) { - if (config.list_id) return config.list_id; - if (!config.list_name || !config.upload_key_type) { - throw new Error( - `Missing user list info in ${JSON.stringify(config)}`); + if (config.listId) return config.listId; + if (!config.listName || !config.uploadKeyType) { + throw new Error(`Missing user list info in ${JSON.stringify(config)}`); } const listId = await googleAds.getCustomerMatchUserListId(config); if (listId) { @@ -108,9 +107,10 @@ class GoogleAdsCustomerMatch extends GoogleAdsClickConversionUpload { * @return {!Promise} */ async sendDataInternal(googleAds, records, messageId, config) { - const { customerMatchConfig } = config; + const { customerMatchConfig } = + changeObjectNamingFromSnakeToLowerCamel(config); try { - customerMatchConfig.list_id = + customerMatchConfig.listId = await this.getOrCreateUserList(googleAds, customerMatchConfig); } catch (error) { this.logger.error('Error in UserdataService: ', error); diff --git a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_offline_userdata_job.js b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_offline_userdata_job.js index 58bc648..2d46d70 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_offline_userdata_job.js +++ b/marketing-analytics/activation/gmp-googleads-connector/src/api_handlers/google_ads_offline_userdata_job.js @@ -21,7 +21,7 @@ const { api: { googleadsapi: { GoogleAdsApi: GoogleAds, OfflineUserDataJobConfig } }, - utils: { getProperValue, BatchResult }, + utils: { getProperValue, BatchResult, changeObjectNamingFromSnakeToLowerCamel }, storage: { StorageFile }, } = require('@google-cloud/nodejs-common'); const { GoogleAdsCustomerMatch } = require('./google_ads_customer_match_upload.js'); @@ -100,9 +100,10 @@ class GoogleAdsOfflineUserDataJobUpload extends GoogleAdsCustomerMatch { records = message; } try { - const { offlineUserDataJobConfig } = config; + const { offlineUserDataJobConfig } = + changeObjectNamingFromSnakeToLowerCamel(config); if (offlineUserDataJobConfig.type.startsWith('CUSTOMER_MATCH')) { - offlineUserDataJobConfig.list_id = + offlineUserDataJobConfig.listId = await this.getOrCreateUserList(googleAds, offlineUserDataJobConfig); } const jobResourceName = @@ -110,7 +111,7 @@ class GoogleAdsOfflineUserDataJobUpload extends GoogleAdsCustomerMatch { this.logger.info('jobResourceName: ', jobResourceName); const managedSend = this.getManagedSendFn(config); const configedUpload = googleAds.getAddOperationsToOfflineUserDataJobFn( - config.offlineUserDataJobConfig, jobResourceName); + offlineUserDataJobConfig, jobResourceName); const result = await managedSend(configedUpload, records, messageId); this.logger.info('add userdata result: ', result); await googleAds.runOfflineUserDataJob( diff --git a/marketing-analytics/activation/gmp-googleads-connector/src/api_lock/api_lock_dao.js b/marketing-analytics/activation/gmp-googleads-connector/src/api_lock/api_lock_dao.js index e3beeda..927e9ec 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/src/api_lock/api_lock_dao.js +++ b/marketing-analytics/activation/gmp-googleads-connector/src/api_lock/api_lock_dao.js @@ -76,7 +76,7 @@ class ApiLockDao extends DataAccessObject { * @type {number} */ this.maxTimeForLocks = 10 * 60 * 1000; - this.logger = getLogger(`LOCK.${database}`); + this.logger = getLogger(`LOCK.${database.source}.${database.id}`); } /** diff --git a/marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js b/marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js index a0581fc..c233fce 100644 --- a/marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js +++ b/marketing-analytics/activation/gmp-googleads-connector/src/tentacles.js @@ -826,18 +826,29 @@ const getTentacles = (namespace, database) => { /** * Probes the Google Cloud Project's Firestore mode (Native or Datastore), then * uses it to create an instance of Tentacles. + * @param {(string|undefined)=} namespace + * @param {(string|undefined)=} projectId + * @param {(string|undefined)=} databaseId + * @param {(string|undefined)=} databaseMode * @return {!Promise} */ const guessTentacles = async (namespace = process.env['PROJECT_NAMESPACE'], projectId = process.env['GCP_PROJECT'], - databaseId = process.env['DATABASE_ID'] || DEFAULT_DATABASE) => { + databaseId = process.env['DATABASE_ID'] || DEFAULT_DATABASE, + databaseMode = process.env['DATABASE_MODE']) => { if (!namespace) { console.warn( 'Fail to find ENV variables PROJECT_NAMESPACE, will set as `tentacles`' ); namespace = 'tentacles'; } - const database = await getFirestoreDatabase(projectId, databaseId); + if (!databaseMode) { + console.warn( + 'Database mode is not set. Please consider upgrade this solution.'); + } + const database = databaseMode + ? { source: DataSource[databaseMode], id: databaseId } + : await getFirestoreDatabase(projectId, databaseId); return getTentacles(namespace, database); };