Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: onboarding customerio segment destination #4028

Merged
merged 20 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v0/destinations/customerio_audience/config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export const MAX_ITEMS = 1000;

export const DEFAULT_ID_TYPE = 'id';

export const BASE_ENDPOINT = 'https://track.customer.io/api/v1/segments';

export const SegmentAction = {
Expand Down
33 changes: 27 additions & 6 deletions src/v0/destinations/customerio_audience/transform.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ConfigurationError, isDefinedAndNotNullAndNotEmpty } from '@rudderstack/integrations-lib';
import { SegmentAction } from './config';
import { EventStructure, RespList } from './type';
import { CustomerIORouterRequestType, RespList } from './type';

const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { batchResponseBuilder, getEventAction } = require('./utils');
Expand All @@ -10,7 +11,7 @@ interface ProcessedEvent extends RespList {
eventAction: keyof typeof SegmentAction;
}

const createEventChunk = (event: EventStructure): ProcessedEvent => {
const createEventChunk = (event: CustomerIORouterRequestType): ProcessedEvent => {
const eventAction = getEventAction(event);
const { identifiers } = event?.message || {};
const id: string | number = Object.values(identifiers)[0];
Expand All @@ -22,7 +23,7 @@ const createEventChunk = (event: EventStructure): ProcessedEvent => {
};
};

const validateEvent = (event: EventStructure): boolean => {
const validateEvent = (event: CustomerIORouterRequestType): boolean => {
const eventType = getEventType(event?.message);
if (eventType !== EventType.RECORD) {
throw new InstrumentationError(`message type ${eventType} is not supported`);
Expand All @@ -33,15 +34,35 @@ const validateEvent = (event: EventStructure): boolean => {
throw new InstrumentationError(`action ${eventAction} is not supported`);
}

const identifiers = event?.message?.identifiers || {};
if (Object.entries(identifiers).length === 0) {
const identifiers = event?.message?.identifiers;
if (!identifiers || Object.keys(identifiers).length === 0) {
throw new InstrumentationError(`identifiers cannot be empty`);
}

const id = Object.values(identifiers)[0];
if (typeof id !== 'string' && typeof id !== 'number') {
throw new ConfigurationError(`identifier type should be a string or integer`);
}

const siteId = event?.destination?.Config?.siteId;
manish339k marked this conversation as resolved.
Show resolved Hide resolved
if (!isDefinedAndNotNullAndNotEmpty(siteId) || typeof siteId !== 'string') {
manish339k marked this conversation as resolved.
Show resolved Hide resolved
throw new ConfigurationError('siteId is required and must be a string, aborting.');
}

const apiKey = event?.destination?.Config?.apiKey;
if (!isDefinedAndNotNullAndNotEmpty(apiKey) || typeof apiKey !== 'string') {
throw new ConfigurationError('apiKey is required and must be a string, aborting.');
}

const audienceId = event?.connection?.config?.destination?.audienceId;
if (!audienceId) {
throw new InstrumentationError('audienceId is required, aborting.');
}

return true;
};

const processRouterDest = async (inputs: any[], reqMetadata: any) => {
const processRouterDest = async (inputs: CustomerIORouterRequestType[], reqMetadata: any) => {
if (!inputs?.length) return [];

const { destination, connection } = inputs[0];
Expand Down
94 changes: 63 additions & 31 deletions src/v0/destinations/customerio_audience/type.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,11 @@
import { Connection, Destination, Metadata } from '../../../types';
import { FixMe } from '../../../util/types';

export type RespList = {
payload: {
ids: (string | number)[];
};
metadata: Record<string, unknown>;
};

export type EventStructure = {
message: {
action: string;
identifiers: Record<string, string | number>;
};
metadata: Record<string, unknown>;
destination: DestinationStructure;
connection: ConnectionStructure;
};

export type ConnectionStructure = {
config: {
destination: {
audienceId: string | number;
identifierMappings: {
from: string;
to: string;
}[];
};
};
};

export type DestinationStructure = {
Config: {
apiKey: string;
appApiKey: string;
siteId: string;
};
metadata: CustomerIOMetadataType;
};

export type SegmentationPayloadType = {
Expand All @@ -47,3 +20,62 @@ export type SegmentationHeadersType = {
'Content-Type': string;
Authorization: string;
};

type CIOConnectionConfigType = {
destination: {
audienceId: string | number;
identifierMappings: {
from: string;
to: string;
}[];
[key: string]: any;
};
[key: string]: any;
};

type CIODestinationConfigType = {
apiKey: string;
appApiKey: string;
siteId: string;
[key: string]: any;
};

type GenericRouterRequestData<
CIOMessage = object,
CIODestination = Destination,
CIOConnection = Connection,
> = {
message: CIOMessage;
metadata: Metadata;
destination: CIODestination;
connection: CIOConnection;
[key: string]: any;
};

type CIODestination<CIODestinationConfig = FixMe> = {
Config: CIODestinationConfig;
[key: string]: any;
};

type CIOConnection<CIOConnectionConfig = Record<string, unknown>> = {
config: CIOConnectionConfig;
[key: string]: any;
};

export type CustomerIOMessageType = {
action: string;
identifiers: Record<string, string | number>;
[key: string]: any;
};

export type CustomerIOMetadataType = Metadata;

export type CustomerIODestinationType = CIODestination<CIODestinationConfigType>;

export type CustomerIOConnectionType = CIOConnection<CIOConnectionConfigType>;

export type CustomerIORouterRequestType = GenericRouterRequestData<
CustomerIOMessageType,
CustomerIODestinationType,
CustomerIOConnectionType
>;
35 changes: 18 additions & 17 deletions src/v0/destinations/customerio_audience/utils.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,46 @@
import { base64Convertor } from '@rudderstack/integrations-lib';
import { BatchUtils } from '@rudderstack/workflow-engine';
import { BASE_ENDPOINT, MAX_ITEMS } from './config';
import { BASE_ENDPOINT, DEFAULT_ID_TYPE, MAX_ITEMS } from './config';
import {
ConnectionStructure,
DestinationStructure,
EventStructure,
CustomerIOConnectionType,
CustomerIODestinationType,
CustomerIOMetadataType,
CustomerIORouterRequestType,
RespList,
SegmentationHeadersType,
SegmentationParamType,
SegmentationPayloadType,
} from './type';

const getIdType = (connection: ConnectionStructure): string =>
connection.config.destination.identifierMappings[0]?.to || 'id';
const getIdType = (connection: CustomerIOConnectionType): string =>
connection.config.destination.identifierMappings[0]?.to || DEFAULT_ID_TYPE;

const getSegmentId = (connection: ConnectionStructure): string | number =>
const getSegmentId = (connection: CustomerIOConnectionType): string | number =>
connection.config.destination.audienceId;

const getHeaders = (destination: DestinationStructure): SegmentationHeadersType => ({
const getHeaders = (destination: CustomerIODestinationType): SegmentationHeadersType => ({
'Content-Type': 'application/json',
Authorization: `Basic ${base64Convertor(`${destination.Config.siteId}:${destination.Config.apiKey}`)}`,
manish339k marked this conversation as resolved.
Show resolved Hide resolved
});

const getParams = (connection: ConnectionStructure): SegmentationParamType => ({
const getParams = (connection: CustomerIOConnectionType): SegmentationParamType => ({
id_type: getIdType(connection),
});

const getMergedPayload = (batch: RespList[]): SegmentationPayloadType => ({
ids: batch.flatMap((input) => input.payload.ids),
});

const getMergedMetadata = (batch: RespList[]): Record<string, unknown>[] =>
const getMergedMetadata = (batch: RespList[]): CustomerIOMetadataType[] =>
batch.map((input) => input.metadata);

const buildBatchedResponse = (
payload: SegmentationPayloadType,
endpoint: string,
headers: SegmentationHeadersType,
params: SegmentationParamType,
metadata: Record<string, unknown>[],
destination: DestinationStructure,
metadata: CustomerIOMetadataType[],
destination: CustomerIODestinationType,
) => ({
batchedRequest: {
body: {
Expand All @@ -65,8 +66,8 @@ const buildBatchedResponse = (
const processBatch = (
respList: RespList[],
endpoint: string,
destination: DestinationStructure,
connection: ConnectionStructure,
destination: CustomerIODestinationType,
connection: CustomerIOConnectionType,
): any[] => {
if (!respList?.length) {
return [];
Expand All @@ -93,8 +94,8 @@ const processBatch = (
const batchResponseBuilder = (
insertOrUpdateRespList: RespList[],
deleteRespList: RespList[],
destination: DestinationStructure,
connection: ConnectionStructure,
destination: CustomerIODestinationType,
connection: CustomerIOConnectionType,
): any[] => {
const segmentId = getSegmentId(connection);

Expand All @@ -115,7 +116,7 @@ const batchResponseBuilder = (
return [...insertResponses, ...deleteResponses];
};

const getEventAction = (event: EventStructure): string =>
const getEventAction = (event: CustomerIORouterRequestType): string =>
event?.message?.action?.toLowerCase() || '';

export { batchResponseBuilder, getEventAction };
11 changes: 11 additions & 0 deletions test/integrations/destinations/customerio_audience/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ const connection: Connection = {
},
};

const inValidConnection: Connection = {
...connection,
config: {
...connection.config,
destination: {
audienceId: '',
},
},
};

const insertOrUpdateEndpoint =
'https://track.customer.io/api/v1/segments/test-segment-id/add_customers';

Expand Down Expand Up @@ -77,6 +87,7 @@ export {
channel,
destination,
connection,
inValidConnection,
processorInstrumentationErrorStatTags,
RouterInstrumentationErrorStatTags,
headers,
Expand Down
Loading
Loading