Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move hubspot to transformer proxy to enable partial batch handling #3308

Merged
merged 32 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
722f3d1
feat: move hubspot to transformer proxy to enable partial batch handling
ItsSudip Apr 24, 2024
a09afd6
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Apr 26, 2024
3993401
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip May 21, 2024
d66755c
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip May 22, 2024
effc2e1
chore: refactor code
ItsSudip May 22, 2024
f622710
chore: add unit tests
ItsSudip May 24, 2024
8d428e8
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip May 24, 2024
43c1174
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip May 27, 2024
1b9c83b
chore: refactor code
ItsSudip May 27, 2024
0927f2a
chore: update error message
ItsSudip May 27, 2024
85de831
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jun 5, 2024
3d896d4
Merge branch 'develop' of github.com:rudderlabs/rudder-transformer in…
ItsSudip Jun 11, 2024
cded119
fix: update network handler for legacy api
ItsSudip Jun 12, 2024
a2852d7
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jun 12, 2024
4f1915f
chore: address comment
ItsSudip Jun 12, 2024
dfa4008
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jun 14, 2024
06d60ae
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jun 20, 2024
e87d031
chore: update failed tests
ItsSudip Jun 20, 2024
cc3c7c6
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jun 21, 2024
f88bb52
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jul 4, 2024
7eadc12
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jul 12, 2024
722bfbc
fix: update network handler
ItsSudip Jul 15, 2024
2fe5fbd
feat: add verification of single event
ItsSudip Jul 17, 2024
e1606a0
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jul 18, 2024
4d175c1
chore: resolve conflicts
ItsSudip Jul 22, 2024
571842c
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jul 23, 2024
cedb653
chore: resolve conflicts
ItsSudip Jul 30, 2024
653e25c
chore: add test cases
ItsSudip Jul 31, 2024
63c03e7
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jul 31, 2024
8246fcb
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Jul 31, 2024
e1ecb95
Merge branch 'develop' into feat.hs-handle-partial-error
ItsSudip Aug 5, 2024
f82f877
Merge branch 'hotfix/12082024' into feat.hs-handle-partial-error
ItsSudip Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions src/v0/destinations/hs/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
fetchFinalSetOfTraits,
getProperties,
validateDestinationConfig,
convertToResponseFormat,
} = require('./util');

const processSingleMessage = async ({ message, destination, metadata }, propertyMap) => {
Expand Down Expand Up @@ -147,20 +148,38 @@
}),
);

if (successRespList.length > 0) {
const dontBatchTrueResponses = [];
const dontBatchFalseOrUndefinedResponses = [];
// segregating successRepList depending on dontbatch value
successRespList.forEach((successResp) => {
if (successResp.metadata?.dontBatch) {
dontBatchTrueResponses.push(successResp);
} else {
dontBatchFalseOrUndefinedResponses.push(successResp);
}
});

// batch implementation
if (dontBatchFalseOrUndefinedResponses.length > 0) {
if (destination.Config.apiVersion === API_VERSION.v3) {
batchedResponseList = batchEvents(successRespList);
batchedResponseList = batchEvents(dontBatchFalseOrUndefinedResponses);
} else {
batchedResponseList = legacyBatchEvents(successRespList);
batchedResponseList = legacyBatchEvents(dontBatchFalseOrUndefinedResponses);
}
}
return { batchedResponseList, errorRespList };
return {
batchedResponseList,
errorRespList,
// if there are any events where dontbatch set to true we need to update them according to the response format
dontBatchEvents: convertToResponseFormat(dontBatchTrueResponses),
};
};
// we are batching by default at routerTransform
const processRouterDest = async (inputs, reqMetadata) => {
const tempNewInputs = batchEventsInOrder(inputs);
const batchedResponseList = [];
const errorRespList = [];
const dontBatchEvents = [];
const promises = tempNewInputs.map(async (inputEvents) => {
const response = await processBatchRouter(inputEvents, reqMetadata);
return response;
Expand All @@ -171,8 +190,10 @@
results.forEach((response) => {
errorRespList.push(...response.errorRespList);
batchedResponseList.push(...response.batchedResponseList);
dontBatchEvents.push(...response.dontBatchEvents);
});
return [...batchedResponseList, ...errorRespList];
console.log(JSON.stringify([...batchedResponseList, ...errorRespList, ...dontBatchEvents]));

Check warning on line 195 in src/v0/destinations/hs/transform.js

View workflow job for this annotation

GitHub Actions / Check for formatting & lint errors

Unexpected console statement

Check warning on line 195 in src/v0/destinations/hs/transform.js

View workflow job for this annotation

GitHub Actions / Code Coverage

Unexpected console statement
return [...batchedResponseList, ...errorRespList, ...dontBatchEvents];
};

module.exports = { process, processRouterDest };
32 changes: 32 additions & 0 deletions src/v0/destinations/hs/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ const {
getValueFromMessage,
isNull,
validateEventName,
defaultBatchRequestConfig,
defaultPostRequestConfig,
getSuccessRespEvents,
} = require('../../util');
const {
CONTACT_PROPERTY_MAP_ENDPOINT,
Expand Down Expand Up @@ -844,6 +847,34 @@ const addExternalIdToHSTraits = (message) => {
set(getFieldValueFromMessage(message, 'traits'), externalIdObj.identifierType, externalIdObj.id);
};

const convertToResponseFormat = (successRespListWithDontBatchTrue) => {
const response = [];
if (Array.isArray(successRespListWithDontBatchTrue)) {
successRespListWithDontBatchTrue.forEach((event) => {
const { message, metadata, destination } = event;
const endpoint = get(message, 'endpoint');

const batchedResponse = defaultBatchRequestConfig();
batchedResponse.batchedRequest.headers = message.headers;
batchedResponse.batchedRequest.endpoint = endpoint;
batchedResponse.batchedRequest.body = message.body;
batchedResponse.batchedRequest.params = message.params;
batchedResponse.batchedRequest.method = defaultPostRequestConfig.requestMethod;
batchedResponse.metadata = [metadata];
batchedResponse.destination = destination;

response.push(
getSuccessRespEvents(
batchedResponse.batchedRequest,
batchedResponse.metadata,
batchedResponse.destination,
),
);
});
}
return response;
};

module.exports = {
validateDestinationConfig,
addExternalIdToHSTraits,
Expand All @@ -863,4 +894,5 @@ module.exports = {
getObjectAndIdentifierType,
extractIDsForSearchAPI,
getRequestData,
convertToResponseFormat,
};
147 changes: 147 additions & 0 deletions src/v1/destinations/hs/networkHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { TransformerProxyError } from '../../../v0/util/errorTypes';
import { prepareProxyRequest, proxyRequest } from '../../../adapters/network';
import { isHttpStatusSuccess, getAuthErrCategoryFromStCode } from '../../../v0/util/index';
import { DeliveryV1Response, DeliveryJobState } from '../../../types/index';

import { processAxiosResponse, getDynamicErrorType } from '../../../adapters/utils/networkUtils';

const tags = require('../../../v0/util/tags');

/**
*
* @param results
* @param rudderJobMetadata
* @param destinationConfig
* @returns boolean
*/

const findFeatureandVersion = (response, rudderJobMetadata, destinationConfig) => {
const { results, errors } = response;
if (Array.isArray(rudderJobMetadata) && rudderJobMetadata.length === 1) {
return 'singleEvent';
}
if (destinationConfig?.apiVersion === 'legacyApi') {
return 'legacyApiWithMultipleEvents';
}
if (destinationConfig?.apiVersion === 'newApi') {
if (Array.isArray(results) && results.length === rudderJobMetadata.length)
return 'newApiWithMultipleEvents';

if (
Array.isArray(results) &&
results.length !== rudderJobMetadata.length &&
results.length + errors.length === rudderJobMetadata.length

Check warning on line 33 in src/v1/destinations/hs/networkHandler.ts

View check run for this annotation

Codecov / codecov/patch

src/v1/destinations/hs/networkHandler.ts#L32-L33

Added lines #L32 - L33 were not covered by tests
)
return 'newApiWithMultipleEventsAndErrors';

Check warning on line 35 in src/v1/destinations/hs/networkHandler.ts

View check run for this annotation

Codecov / codecov/patch

src/v1/destinations/hs/networkHandler.ts#L35

Added line #L35 was not covered by tests
}
return 'unableToFindVersionWithMultipleEvents';
};

const populateResponseWithDontBatch = (rudderJobMetadata, response) => {
const errorMessage = JSON.stringify(response);
const responseWithIndividualEvents: DeliveryJobState[] = [];

rudderJobMetadata.forEach((metadata) => {
responseWithIndividualEvents.push({
statusCode: 500,
metadata: { ...metadata, dontBatch: true },
error: errorMessage,
});
});
return responseWithIndividualEvents;
};

type Response = {
status?: string;
results?: Array<object>;
errors?: Array<object>;
startedAt?: Date;
completedAt?: Date;
message?: string;
correlationId?: string;
failureMessages?: Array<object>;
};

const responseHandler = (responseParams) => {
const { destinationResponse, rudderJobMetadata, destinationRequest } = responseParams;
const successMessage = `[HUBSPOT Response V1 Handler] - Request Processed Successfully`;
const failureMessage =
'HUBSPOT: Error in transformer proxy v1 during HUBSPOT response transformation';
const responseWithIndividualEvents: DeliveryJobState[] = [];
const { response, status } = destinationResponse;

if (isHttpStatusSuccess(status)) {
// populate different response for each event
const destResponse = response as Response;
let proxyOutputObj: DeliveryJobState;
const featureAndVersion = findFeatureandVersion(
destResponse,
rudderJobMetadata,
destinationRequest?.destinationConfig,
);
switch (featureAndVersion) {
case 'singleEvent':
proxyOutputObj = {
statusCode: status,
metadata: rudderJobMetadata[0],
error: JSON.stringify(destResponse),
};
responseWithIndividualEvents.push(proxyOutputObj);
break;
case 'newApiWithMultipleEvents':
rudderJobMetadata.forEach((metadata: any, index: string | number) => {
proxyOutputObj = {
statusCode: 200,
metadata,
error: JSON.stringify(destResponse.results?.[index]),
};
responseWithIndividualEvents.push(proxyOutputObj);
});
break;
default:
rudderJobMetadata.forEach((metadata) => {
proxyOutputObj = {
statusCode: 200,
metadata,
error: 'success',
};
responseWithIndividualEvents.push(proxyOutputObj);
});
break;
}
return {
status,
message: successMessage,
response: responseWithIndividualEvents,
} as DeliveryV1Response;
}

// At least one event in the batch is invalid.
if (status === 400 && Array.isArray(rudderJobMetadata) && rudderJobMetadata.length > 1) {
// sending back 500 for retry only when events came in a batch
return {
status: 500,
message: failureMessage,
response: populateResponseWithDontBatch(rudderJobMetadata, response),
} as DeliveryV1Response;
}
throw new TransformerProxyError(
failureMessage,
status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status),
},
destinationResponse,
getAuthErrCategoryFromStCode(status),
response,
);
};

function networkHandler(this: any) {
this.prepareProxy = prepareProxyRequest;
this.proxy = proxyRequest;
this.processAxiosResponse = processAxiosResponse;
this.responseHandler = responseHandler;
}

module.exports = { networkHandler };
Loading
Loading