Skip to content

Commit

Permalink
feat: added customerio group call support (#1869)
Browse files Browse the repository at this point in the history
* feat: added customerio group call support

* feat: endpoint reference added

* feat: 500kb size limit check added

* feat: object_type_id mapping support added
  • Loading branch information
mihir-4116 authored Feb 23, 2023
1 parent 5d5c1d3 commit 5e692ea
Show file tree
Hide file tree
Showing 10 changed files with 1,268 additions and 123 deletions.
5 changes: 3 additions & 2 deletions features.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"ZENDESK": true,
"MP": true,
"TIKTOK_ADS_OFFLINE_EVENTS": true,
"CRITEO_AUDIENCE": true
"CRITEO_AUDIENCE": true,
"CUSTOMERIO": true
}
}
}
27 changes: 25 additions & 2 deletions src/v0/destinations/customerio/config.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const { getMappingConfig } = require('../../util');

const IDENTITY_ENDPOINT = 'https://track.customer.io/api/v1/customers/:id';

const USER_EVENT_ENDPOINT = 'https://track.customer.io/api/v1/customers/:id/events';
Expand All @@ -10,11 +12,32 @@ const DEVICE_REGISTER_ENDPOINT = 'https://track.customer.io/api/v1/customers/:id

const DEVICE_DELETE_ENDPOINT = 'https://track.customer.io/api/v1/customers/:id/devices/:device_id';

const OBJECT_EVENT_ENDPOINT = 'https://track.customer.io/api/v2/batch';

const CONFIG_CATEGORIES = {
OBJECT_EVENTS: {
type: 'group',
name: 'customerIoGroup',
},
};

const MAX_BATCH_SIZE = 1000;
const DEFAULT_OBJECT_ACTION = "identify";
const OBJECT_ACTIONS = ["identify", "delete", "add_relationships", "delete_relationships"];

const MAPPING_CONFIG = getMappingConfig(CONFIG_CATEGORIES, __dirname);

module.exports = {
MAX_BATCH_SIZE,
MAPPING_CONFIG,
OBJECT_ACTIONS,
CONFIG_CATEGORIES,
IDENTITY_ENDPOINT,
MERGE_USER_ENDPOINT,
USER_EVENT_ENDPOINT,
ANON_EVENT_ENDPOINT,
DEVICE_REGISTER_ENDPOINT,
OBJECT_EVENT_ENDPOINT,
DEFAULT_OBJECT_ACTION,
DEVICE_DELETE_ENDPOINT,
MERGE_USER_ENDPOINT,
DEVICE_REGISTER_ENDPOINT,
};
49 changes: 49 additions & 0 deletions src/v0/destinations/customerio/data/customerIoGroup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[
{
"destKey": "object_id",
"sourceKeys": "groupId",
"sourceFromGenericMap": true,
"required": true
},
{
"destKey": "object_type_id",
"sourceKeys": "traits.objectTypeId",
"metadata": {
"defaultValue": "1"
},
"required": false
},
{
"destKey": "userId",
"sourceKeys": "userIdOnly",
"sourceFromGenericMap": true,
"required": false
},
{
"destKey": "email",
"sourceKeys": [
"context.traits.email",
"properties.email",
"context.externalId.0.id"
],
"required": false
},
{
"destKey": "action",
"sourceKeys": [
"traits.action",
"properties.action"
],
"required": false
},
{
"destKey": "attributes",
"sourceKeys": "traits",
"required": false,
"metadata": {
"excludes": [
"action"
]
}
}
]
131 changes: 117 additions & 14 deletions src/v0/destinations/customerio/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,43 @@ const set = require('set-value');
const btoa = require('btoa');
const truncate = require('truncate-utf8-bytes');
const { isAppleFamily } = require('rudder-transformer-cdk/build/utils');

const {
EventType,
SpecedTraits,
TraitsMapping,
MappedToDestinationKey,
} = require('../../../constants');

const {
adduserIdFromExternalId,
constructPayload,
getErrorRespEvents,
getSuccessRespEvents,
defaultRequestConfig,
addExternalIdToTraits,
removeUndefinedValues,
defaultPostRequestConfig,
adduserIdFromExternalId,
defaultPutRequestConfig,
defaultRequestConfig,
defaultPostRequestConfig,
getFieldValueFromMessage,
addExternalIdToTraits,
simpleProcessRouterDest,
handleRtTfSingleEventError,
} = require('../../util');

const {
MAPPING_CONFIG,
OBJECT_ACTIONS,
CONFIG_CATEGORIES,
IDENTITY_ENDPOINT,
MERGE_USER_ENDPOINT,
USER_EVENT_ENDPOINT,
ANON_EVENT_ENDPOINT,
DEVICE_REGISTER_ENDPOINT,
OBJECT_EVENT_ENDPOINT,
DEFAULT_OBJECT_ACTION,
DEVICE_DELETE_ENDPOINT,
MERGE_USER_ENDPOINT,
DEVICE_REGISTER_ENDPOINT,
} = require('./config');
const logger = require('../../../logger');
const { getEventChunks } = require('./util');
const { InstrumentationError } = require('../../util/errorTypes');

const deviceRelatedEventNames = [
Expand Down Expand Up @@ -149,6 +160,25 @@ function responseBuilder(message, evType, evName, destination, messageType) {
rawPayload.primary.id = userId;
rawPayload.secondary = {};
rawPayload.secondary.id = message.previousId;
} else if (evType === EventType.GROUP) {
endpoint = OBJECT_EVENT_ENDPOINT;
const payload = constructPayload(message, MAPPING_CONFIG[CONFIG_CATEGORIES.OBJECT_EVENTS.name]);
rawPayload.identifiers = {
object_id: payload.object_id,
object_type_id: payload.object_type_id,
};
rawPayload.type = 'object';
rawPayload.action =
payload.action && OBJECT_ACTIONS.includes(payload.action)
? payload.action
: DEFAULT_OBJECT_ACTION;
rawPayload.attributes = payload.attributes || {};
rawPayload.cio_relationships = [];
if (payload.userId) {
rawPayload.cio_relationships.push({ identifiers: { id: payload.userId } });
} else if (payload.email) {
rawPayload.cio_relationships.push({ identifiers: { email: payload.email } });
}
} else {
// any other event type except identify
const token = get(message, 'context.device.token');
Expand Down Expand Up @@ -261,6 +291,9 @@ function processSingleMessage(message, destination) {
case EventType.ALIAS:
evType = 'alias';
break;
case EventType.GROUP:
evType = 'group';
break;
default:
logger.error(`could not determine type ${messageType}`);
throw new InstrumentationError(`could not determine type ${messageType}`);
Expand All @@ -276,20 +309,90 @@ function processSingleMessage(message, destination) {
}

function process(event) {
const respList = [];
const { message, destination } = event;
const result = processSingleMessage(message, destination);
if (!result.statusCode) {
result.statusCode = 200;
}
respList.push(result);

return respList;
return result;
}

const processRouterDest = async (inputs, reqMetadata) => {
const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
return respList;
const batchEvents = (successRespList) => {
const batchedResponseList = [];
const groupEvents = [];
// Filtering out group calls to process batching
successRespList.forEach((resp) => {
if (resp.message.endpoint !== OBJECT_EVENT_ENDPOINT) {
batchedResponseList.push(
getSuccessRespEvents(resp.message, [resp.metadata], resp.destination),
);
} else {
groupEvents.push(resp);
}
});

if (groupEvents.length > 0) {
// Extracting metadata, destination and message from the first event in a batch
const { destination, message } = groupEvents[0];
const { headers, endpoint } = message;

// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
const eventChunks = getEventChunks(groupEvents);

/**
* Ref : https://www.customer.io/docs/api/track/#operation/batch
*/
eventChunks.forEach((chunk) => {
const request = defaultRequestConfig();
request.endpoint = endpoint;
request.headers = { ...headers, 'Content-Type': 'application/json' };
// Setting the request body to an object with a single property called "batch" containing the batched data
request.body.JSON = { batch: chunk.data };

batchedResponseList.push(getSuccessRespEvents(request, chunk.metadata, destination));
});
}
return batchedResponseList;
};

const processRouterDest = (inputs, reqMetadata) => {
if (!Array.isArray(inputs) || inputs.length <= 0) {
const respEvents = getErrorRespEvents(null, 400, 'Invalid event array');
return [respEvents];
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
const { destination } = inputs[0];
inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
successRespList.push({
message: event.message,
metadata: event.metadata,
destination,
});
} else {
// if not transformed
const transformedPayload = {
message: process(event),
metadata: event.metadata,
destination,
};
successRespList.push(transformedPayload);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, reqMetadata);
batchErrorRespList.push(errRespEvent);
}
});

if (successRespList.length > 0) {
batchResponseList = batchEvents(successRespList);
}

return [...batchResponseList, ...batchErrorRespList];
};

module.exports = { process, processRouterDest };
43 changes: 43 additions & 0 deletions src/v0/destinations/customerio/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
const { MAX_BATCH_SIZE } = require('./config');

const getSizeInBytes = (obj) => {
let str = null;
if (typeof obj === 'string') {
// If obj is a string, then use it
str = obj;
} else {
// Else, make obj into a string
str = JSON.stringify(obj);
}
// Get the length of the Uint8Array
const bytes = new TextEncoder().encode(str).length;
return bytes;
};

const getEventChunks = (groupEvents) => {
const eventChunks = [];
let batchedData = [];
let metadata = [];
let size = 0;

groupEvents.forEach((events) => {
const objSize = getSizeInBytes(events);
size += objSize;
if (batchedData.length === MAX_BATCH_SIZE || size > 500000) {
eventChunks.push({ data: batchedData, metadata });
batchedData = [];
metadata = [];
size = 0;
}
metadata.push(events.metadata);
batchedData.push(events.message.body.JSON);
});

if (batchedData.length > 0) {
eventChunks.push({ data: batchedData, metadata });
}

return eventChunks;
};

module.exports = { getEventChunks };
2 changes: 1 addition & 1 deletion test/__tests__/customerio.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ for (let index = 0; index < inputData.length; index++) {
let output, expected;
try {
output = transformer.process(inputData[index]);
expected = [expectedData[index]];
expected = expectedData[index];
} catch (error) {
output = error.message;
expected = expectedData[index].message;
Expand Down
Loading

0 comments on commit 5e692ea

Please sign in to comment.