Skip to content

Commit

Permalink
feat: onboarding customerio segment destination (#4028)
Browse files Browse the repository at this point in the history
* feat: onboarding customerio segment  destination

* feat: added preprocessing of input to filter unavailable customer

* fix: minor issue

* fix: some changes

* feat: refactoring name to customerio_audience

* fix: revert preprocessing changes after suggestion

* fix: some minor comments

* chore: avoid forEach for better code readability

* chore: refactor customerio utils

* fix: added Config validation

* fix: added types

* fix: added tests and some minor change

* fix: minor change

* chore: simplify types and generalise existing types

* fix: minor change

* fix: addressing comments

---------

Co-authored-by: Dilip Kola <[email protected]>
Co-authored-by: Vinay Teki <[email protected]>
Co-authored-by: Dilip Kola <[email protected]>
  • Loading branch information
4 people authored Feb 3, 2025
1 parent d62ba40 commit 16b927a
Show file tree
Hide file tree
Showing 9 changed files with 787 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/features.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const defaultFeaturesConfig: FeaturesConfig = {
INTERCOM_V2: true,
LINKEDIN_AUDIENCE: true,
TOPSORT: true,
CUSTOMERIO_AUDIENCE: true,
},
regulations: [
'BRAZE',
Expand Down
16 changes: 10 additions & 6 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ type DestinationDefinition = {
Config: FixMe;
};

type Destination = {
type Destination<DestinationConfig = FixMe> = {
ID: string;
Name: string;
DestinationDefinition: DestinationDefinition;
Config: FixMe;
Config: DestinationConfig;
Enabled: boolean;
WorkspaceID: string;
Transformations: UserTransformationInput[];
Expand Down Expand Up @@ -164,12 +164,16 @@ type ProcessorTransformationRequest = {
credentials?: Credential[];
};

type RouterTransformationRequestData = {
type RouterTransformationRequestData<
Message = object,
DestinationType = Destination,
ConnectionType = Connection,
> = {
request?: object;
message: object;
message: Message;
metadata: Metadata;
destination: Destination;
connection?: Connection;
destination: DestinationType;
connection?: ConnectionType;
};

type RouterTransformationRequest = {
Expand Down
11 changes: 11 additions & 0 deletions src/v0/destinations/customerio_audience/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export const MAX_ITEMS = 1000;

export const DEFAULT_ID_TYPE = 'id';

export const BASE_ENDPOINT = 'https://track.customer.io/api/v1/segments';

export const SegmentAction = {
INSERT: 'insert',
UPDATE: 'update',
DELETE: 'delete',
};
113 changes: 113 additions & 0 deletions src/v0/destinations/customerio_audience/transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { ConfigurationError } from '@rudderstack/integrations-lib';
import { SegmentAction } from './config';
import { CustomerIORouterRequestType, RespList } from './type';

const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { batchResponseBuilder, getEventAction } = require('./utils');
const { handleRtTfSingleEventError, getEventType } = require('../../util');
const { EventType } = require('../../../constants');

interface ProcessedEvent extends RespList {
eventAction: keyof typeof SegmentAction;
}

const createEventChunk = (event: CustomerIORouterRequestType): ProcessedEvent => {
const eventAction = getEventAction(event);
const { identifiers } = event?.message || {};
const id: string | number = Object.values(identifiers)[0];

return {
payload: { ids: [id] },
metadata: event.metadata,
eventAction,
};
};

const validateEvent = (event: CustomerIORouterRequestType): boolean => {
const eventType = getEventType(event?.message);
if (eventType !== EventType.RECORD) {
throw new InstrumentationError(`message type ${eventType} is not supported`);
}

const eventAction = getEventAction(event);
if (!Object.values(SegmentAction).includes(eventAction)) {
throw new InstrumentationError(`action ${eventAction} is not supported`);
}

const identifiers = event?.message?.identifiers;
if (!identifiers || Object.keys(identifiers).length === 0) {
throw new InstrumentationError(`identifiers cannot be empty`);
}

if (Object.keys(identifiers).length > 1) {
throw new InstrumentationError(`only one identifier is supported`);
}

const id = Object.values(identifiers)[0];
if (typeof id !== 'string' && typeof id !== 'number') {
throw new ConfigurationError(`identifier type should be a string or integer`);
}

const audienceId = event?.connection?.config?.destination?.audienceId;
if (!audienceId) {
throw new InstrumentationError('audienceId is required, aborting.');
}

const identifierMappings = event?.connection?.config?.destination?.identifierMappings;
if (!identifierMappings || Object.keys(identifierMappings).length === 0) {
throw new InstrumentationError('identifierMappings cannot be empty');
}

return true;
};

const processRouterDest = async (inputs: CustomerIORouterRequestType[], reqMetadata: any) => {
if (!inputs?.length) return [];

const { destination, connection } = inputs[0];

// Process events and separate valid and error cases
const processedEvents = inputs.map((event) => {
try {
validateEvent(event);
return {
success: true,
data: createEventChunk(event),
};
} catch (error) {
return {
success: false,
error: handleRtTfSingleEventError(event, error, reqMetadata),
};
}
});

// Separate successful and failed events
const successfulEvents = processedEvents
.filter((result) => result.success)
.map((result) => result.data as ProcessedEvent);

const errorEvents = processedEvents
.filter((result) => !result.success)
.map((result) => result.error);

// Split successful events into delete and insert/update lists
const deleteRespList = successfulEvents
.filter((event) => event.eventAction === SegmentAction.DELETE)
.map(({ payload, metadata }) => ({ payload, metadata }));

const insertOrUpdateRespList = successfulEvents
.filter((event) => event.eventAction !== SegmentAction.DELETE)
.map(({ payload, metadata }) => ({ payload, metadata }));

const batchSuccessfulRespList = batchResponseBuilder(
insertOrUpdateRespList,
deleteRespList,
destination,
connection,
);

return [...batchSuccessfulRespList, ...errorEvents];
};

export { processRouterDest };
60 changes: 60 additions & 0 deletions src/v0/destinations/customerio_audience/type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Connection, Destination, Metadata, RouterTransformationRequestData } from '../../../types';

// Basic response type for audience list operations
export type RespList = {
payload: {
ids: (string | number)[];
};
metadata: Metadata;
};

// Types for API request components
export type SegmentationPayloadType = {
ids: (string | number)[];
};

export type SegmentationParamType = {
id_type: string;
};

export type SegmentationHeadersType = {
'Content-Type': string;
Authorization: string;
};

// CustomerIO specific configuration types
type CustomerIODestinationConfig = {
apiKey: string;
appApiKey: string;
siteId: string;
[key: string]: any;
};

type CustomerIOConnectionConfig = {
destination: {
audienceId: string | number;
identifierMappings: {
from: string;
to: string;
}[];
};
};

// Message type specific to CustomerIO
export type CustomerIOMessageType = {
action: string;
identifiers: Record<string, string | number>;
[key: string]: any;
};

// Final exported types using generics from base types
export type CustomerIODestinationType = Destination<CustomerIODestinationConfig>;
export type CustomerIOConnectionType = Connection & {
config: CustomerIOConnectionConfig;
};

export type CustomerIORouterRequestType = RouterTransformationRequestData<
CustomerIOMessageType,
CustomerIODestinationType,
CustomerIOConnectionType
>;
121 changes: 121 additions & 0 deletions src/v0/destinations/customerio_audience/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import { base64Convertor } from '@rudderstack/integrations-lib';
import { BatchUtils } from '@rudderstack/workflow-engine';
import { BASE_ENDPOINT, DEFAULT_ID_TYPE, MAX_ITEMS } from './config';
import {
CustomerIOConnectionType,
CustomerIODestinationType,
CustomerIORouterRequestType,
RespList,
SegmentationHeadersType,
SegmentationParamType,
SegmentationPayloadType,
} from './type';
import { Metadata } from '../../../types';

const getIdType = (connection: CustomerIOConnectionType): string =>
connection.config.destination.identifierMappings[0]?.to || DEFAULT_ID_TYPE;

const getSegmentId = (connection: CustomerIOConnectionType): string | number =>
connection.config.destination.audienceId;

const getHeaders = (destination: CustomerIODestinationType): SegmentationHeadersType => ({
'Content-Type': 'application/json',
Authorization: `Basic ${base64Convertor(`${destination.Config.siteId}:${destination.Config.apiKey}`)}`,
});

const getParams = (connection: CustomerIOConnectionType): SegmentationParamType => ({
id_type: getIdType(connection),
});

const getMergedPayload = (batch: RespList[]): SegmentationPayloadType => ({
ids: batch.flatMap((input) => input.payload.ids),
});

const getMergedMetadata = (batch: RespList[]): Metadata[] => batch.map((input) => input.metadata);

const buildBatchedResponse = (
payload: SegmentationPayloadType,
endpoint: string,
headers: SegmentationHeadersType,
params: SegmentationParamType,
metadata: Metadata[],
destination: CustomerIODestinationType,
) => ({
batchedRequest: {
body: {
JSON: payload,
JSON_ARRAY: {},
XML: {},
FORM: {},
},
version: '1',
type: 'REST',
method: 'POST',
endpoint,
headers,
params,
files: {},
},
metadata,
batched: true,
statusCode: 200,
destination,
});

const processBatch = (
respList: RespList[],
endpoint: string,
destination: CustomerIODestinationType,
connection: CustomerIOConnectionType,
): any[] => {
if (!respList?.length) {
return [];
}

const headers = getHeaders(destination);
const params = getParams(connection);
const batches = BatchUtils.chunkArrayBySizeAndLength(respList, { maxItems: MAX_ITEMS });

return batches.items.map((batch) => {
const mergedPayload = getMergedPayload(batch);
const mergedMetadata = getMergedMetadata(batch);
return buildBatchedResponse(
mergedPayload,
endpoint,
headers,
params,
mergedMetadata,
destination,
);
});
};

const batchResponseBuilder = (
insertOrUpdateRespList: RespList[],
deleteRespList: RespList[],
destination: CustomerIODestinationType,
connection: CustomerIOConnectionType,
): any[] => {
const segmentId = getSegmentId(connection);

const insertResponses = processBatch(
insertOrUpdateRespList,
`${BASE_ENDPOINT}/${segmentId}/add_customers`,
destination,
connection,
);

const deleteResponses = processBatch(
deleteRespList,
`${BASE_ENDPOINT}/${segmentId}/remove_customers`,
destination,
connection,
);

return [...insertResponses, ...deleteResponses];
};

const getEventAction = (event: CustomerIORouterRequestType): string =>
event?.message?.action?.toLowerCase() || '';

export { batchResponseBuilder, getEventAction };
Loading

0 comments on commit 16b927a

Please sign in to comment.