Skip to content

Commit

Permalink
Add performBatch in upsert profile (segmentio#1741)
Browse files Browse the repository at this point in the history
* Add performBatch in upsert profile

* Add enable batching

* Update error handling

* Update error handling

* Remove unused import
  • Loading branch information
harsh-joshi99 authored Dec 4, 2023
1 parent f45356f commit 6ae4522
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { APIError, RequestClient, DynamicFieldResponse } from '@segment/actions-core'
import { API_URL, REVISION_DATE } from './config'
import { KlaviyoAPIError, ListIdResponse, ProfileData, listData } from './types'
import { ImportJobPayload, KlaviyoAPIError, ListIdResponse, ProfileData, listData } from './types'
import { Payload } from './upsertProfile/generated-types'

export async function getListIdDynamicData(request: RequestClient): Promise<DynamicFieldResponse> {
try {
Expand Down Expand Up @@ -99,3 +100,36 @@ export function buildHeaders(authKey: string) {
'Content-Type': 'application/json'
}
}

export const createImportJobPayload = (profiles: Payload[], listId?: string): { data: ImportJobPayload } => ({
data: {
type: 'profile-bulk-import-job',
attributes: {
profiles: {
data: profiles.map(({ list_id, enable_batching, ...attributes }) => ({
type: 'profile',
attributes
}))
}
},
...(listId
? {
relationships: {
lists: {
data: [{ type: 'list', id: listId }]
}
}
}
: {})
}
})

export const sendImportJobRequest = async (request: RequestClient, importJobPayload: { data: ImportJobPayload }) => {
return await request(`${API_URL}/profile-bulk-import-jobs/`, {
method: 'POST',
headers: {
revision: '2023-10-15.pre'
},
json: importJobPayload
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const destination: AudienceDestinationDefinition<Settings> = {
scheme: 'custom',
fields: {
api_key: {
type: 'string',
type: 'password',
label: 'API Key',
description: `You can find this by going to Klaviyo's UI and clicking Account > Settings > API Keys > Create API Key`,
required: true
Expand Down
45 changes: 45 additions & 0 deletions packages/destination-actions/src/destinations/klaviyo/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,48 @@ export interface GetListResultContent {
}
}[]
}

export interface Location {
address1?: string | null
address2?: string | null
city?: string | null
region?: string | null
zip?: string | null
latitude?: string | null
longitude?: string | null
country?: string | null
}

export interface ProfileAttributes {
email?: string
phone_number?: string
external_id?: string
first_name?: string
last_name?: string
organization?: string
title?: string
image?: string
location?: Location | null
properties?: Record<string, any>
list_id?: string
}

export interface ImportJobPayload {
type: string
attributes: {
profiles: {
data: {
type: string
attributes: ProfileAttributes
}[]
}
}
relationships?: {
lists: {
data: {
type: string
id: string
}[]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ describe('Upsert Profile', () => {

await expect(
testDestination.testAction('upsertProfile', { event, settings, useDefaultMappings: true })
).rejects.toThrowError('An error occurred while processing the request')
).rejects.toThrowError('Internal Server Error')
})

it('should add a profile to a list if list_id is provided', async () => {
Expand Down Expand Up @@ -263,3 +263,108 @@ describe('Upsert Profile', () => {
expect(Functions.addProfileToList).toHaveBeenCalledWith(expect.anything(), profileId, listId)
})
})

describe('Upsert Profile Batch', () => {
beforeEach(() => {
nock.cleanAll()
jest.resetAllMocks()
})

it('should discard profiles without email, phone_number, or external_id', async () => {
const events = [createTestEvent({ traits: { first_name: 'John', last_name: 'Doe' } })]

const response = await testDestination.testBatchAction('upsertProfile', {
settings,
events,
useDefaultMappings: true
})

expect(response).toEqual([])
})

it('should process profiles with and without list_ids separately', async () => {
const eventWithListId = createTestEvent({
traits: { first_name: 'John', last_name: 'Doe', email: '[email protected]', list_id: 'abc123' }
})
const eventWithoutListId = createTestEvent({
traits: { first_name: 'Jane', last_name: 'Smith', email: '[email protected]' }
})

nock(API_URL).post('/profile-bulk-import-jobs/').reply(200, { success: true, withList: true })
nock(API_URL).post('/profile-bulk-import-jobs/').reply(200, { success: true, withoutList: true })

const responseWithList = await testDestination.testBatchAction('upsertProfile', {
settings,
events: [eventWithListId],
mapping: { list_id: 'abc123' },
useDefaultMappings: true
})

const responseWithoutList = await testDestination.testBatchAction('upsertProfile', {
settings,
events: [eventWithoutListId],
mapping: {},
useDefaultMappings: true
})

expect(responseWithList[0]).toMatchObject({
data: { success: true, withList: true }
})

expect(responseWithoutList[0]).toMatchObject({
data: { success: true, withoutList: true }
})
})

it('should process profiles with list_ids only', async () => {
const events = [createTestEvent({ traits: { email: '[email protected]', list_id: 'abc123' } })]

nock(API_URL).post('/profile-bulk-import-jobs/').reply(200, { success: true, withList: true })

const response = await testDestination.testBatchAction('upsertProfile', {
settings,
events,
mapping: { list_id: 'abc123' },
useDefaultMappings: true
})

expect(response[0].data).toMatchObject({
success: true,
withList: true
})
expect(response).toHaveLength(1)
})

it('should process profiles without list_ids only', async () => {
const events = [createTestEvent({ traits: { email: '[email protected]' } })]

nock(API_URL).post('/profile-bulk-import-jobs/').reply(200, { success: true, withoutList: true })

const response = await testDestination.testBatchAction('upsertProfile', {
settings,
events,
mapping: {},
useDefaultMappings: true
})

expect(response[0].data).toMatchObject({
success: true,
withoutList: true
})
expect(response).toHaveLength(1)
})

it('should handle errors when sending profiles to Klaviyo', async () => {
const events = [createTestEvent({ traits: { email: '[email protected]' } })]

nock(API_URL).post('/profile-bulk-import-jobs/').reply(500, { error: 'Server error' })

await expect(
testDestination.testBatchAction('upsertProfile', {
settings,
events,
useDefaultMappings: true
})
).rejects.toThrow()
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import type { Settings } from '../generated-types'
import type { Payload } from './generated-types'

import { API_URL } from '../config'
import { APIError, PayloadValidationError } from '@segment/actions-core'
import { PayloadValidationError } from '@segment/actions-core'
import { KlaviyoAPIError, ProfileData } from '../types'
import { addProfileToList, getListIdDynamicData } from '../functions'
import { addProfileToList, createImportJobPayload, getListIdDynamicData, sendImportJobRequest } from '../functions'

const action: ActionDefinition<Settings, Payload> = {
title: 'Upsert Profile',
Expand All @@ -19,6 +19,11 @@ const action: ActionDefinition<Settings, Payload> = {
format: 'email',
default: { '@path': '$.traits.email' }
},
enable_batching: {
type: 'boolean',
label: 'Batch Data to Klaviyo',
description: 'When enabled, the action will use the klaviyo batch API.'
},
phone_number: {
label: 'Phone Number',
description: `Individual's phone number in E.164 format. If SMS is not enabled and if you use Phone Number as identifier, then you have to provide one of Email or External ID.`,
Expand Down Expand Up @@ -135,7 +140,7 @@ const action: ActionDefinition<Settings, Payload> = {
}
},
perform: async (request, { payload }) => {
const { email, external_id, phone_number, list_id, ...otherAttributes } = payload
const { email, external_id, phone_number, list_id, enable_batching, ...otherAttributes } = payload

if (!email && !phone_number && !external_id) {
throw new PayloadValidationError('One of External ID, Phone Number and Email is required.')
Expand Down Expand Up @@ -186,7 +191,32 @@ const action: ActionDefinition<Settings, Payload> = {
}
}

throw new APIError('An error occurred while processing the request', 400)
throw error
}
},

performBatch: async (request, { payload }) => {
payload = payload.filter((profile) => profile.email || profile.external_id || profile.phone_number)
const profilesWithList = payload.filter((profile) => profile.list_id)
const profilesWithoutList = payload.filter((profile) => !profile.list_id)

let importResponseWithList
let importResponseWithoutList

if (profilesWithList.length > 0) {
const listId = profilesWithList[0].list_id
const importJobPayload = createImportJobPayload(profilesWithList, listId)
importResponseWithList = await sendImportJobRequest(request, importJobPayload)
}

if (profilesWithoutList.length > 0) {
const importJobPayload = createImportJobPayload(profilesWithoutList)
importResponseWithoutList = await sendImportJobRequest(request, importJobPayload)
}

return {
withList: importResponseWithList,
withoutList: importResponseWithoutList
}
}
}
Expand Down

0 comments on commit 6ae4522

Please sign in to comment.