Skip to content

Commit

Permalink
STRATCONN 2557 - LiveRamp SFTP (segmentio#1265)
Browse files Browse the repository at this point in the history
* Initial scaffolding for LiveRamp

* temporarily remove warnings from scaffold

* STRATCONN-2553 Add mapping and destination settings for LiveRamp

* STRATCONN-2556 Add support for an S3 upload module

* improve formatting

* add testcases

* fix documentation

* last fixes

* refactor S3 into its own action

* add support for SFTP

* Fix merge

* fix outdated references

* restore snapshots

* last bug fixes

* generate types

* add testAuthentication for SFTP

* resolve dependencies

* update snapshots

* add package to the correct location

* move types to inner package

* DRY SFTP code

* remove awaits
  • Loading branch information
rhall-twilio authored May 18, 2023
1 parent e070029 commit 687a561
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 56 deletions.
4 changes: 3 additions & 1 deletion packages/destination-actions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"devDependencies": {
"@types/google-libphonenumber": "^7.4.23",
"@types/jest": "^27.0.0",
"@types/ssh2-sftp-client": "^9.0.0",
"jest": "^27.3.1",
"nock": "^13.1.4"
},
Expand All @@ -48,7 +49,8 @@
"escape-goat": "^3",
"google-libphonenumber": "^3.2.31",
"liquidjs": "^9.37.0",
"lodash": "^4.17.21"
"lodash": "^4.17.21",
"ssh2-sftp-client": "^9.1.0"
},
"jest": {
"preset": "ts-jest",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ exports[`Testing snapshot for LiverampAudiences's audienceEntered destination ac
Headers {
Symbol(map): Object {
"authorization": Array [
"AWS4-HMAC-SHA256 Credential=PywYAY/19700101/PywYAY/s3/aws4_request, SignedHeaders=content-length;content-type;host;x-amz-content-sha256;x-amz-date, Signature=3273c678e9edaf86b444eeadfd6021f8814141ee94adcb7409c7931c1f286dff",
"AWS4-HMAC-SHA256 Credential=PywYAY/19700101/PywYAY/s3/aws4_request, SignedHeaders=content-length;content-type;host;x-amz-content-sha256;x-amz-date, Signature=0e2838c897a25891722f065e3c21b3592e67626b4a50f2f1ad0f78ad1a7ad524",
],
"content-length": Array [
"39",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createTestEvent, createTestIntegration } from '@segment/actions-core'
import { generateTestData } from '../../../../lib/test-data'
import destination from '../../index'
import { generateTestData } from '../../../lib/test-data'
import destination from '../index'
import nock from 'nock'
import { enquoteIdentifier } from '../operations'

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { ActionDefinition } from '@segment/actions-core'
import { ActionDefinition, InvalidAuthenticationError, RequestClient } from '@segment/actions-core'
import type { Settings } from '../generated-types'
import type { Payload } from './generated-types'
import { processData } from './operations'
import { uploadS3, validateS3 } from './s3'
import { uploadSFTP, validateSFTP, Client as ClientSFTP } from './sftp'
import { generateFile } from '../operations'

const action: ActionDefinition<Settings, Payload> = {
title: 'Audience Entered',
Expand Down Expand Up @@ -30,19 +32,12 @@ const action: ActionDefinition<Settings, Payload> = {
required: true,
default: ','
},
audience_name: {
label: 'Audience name',
description: `Name of the audience the user has entered.`,
filename: {
label: 'Filename',
description: `Name of the CSV file to upload for LiveRamp ingestion.`,
type: 'string',
required: true,
default: { '@path': '$.properties.audience_key' }
},
received_at: {
label: 'Received At',
description: `Datetime at which the event was received. Used to disambiguate the resulting file.`,
type: 'datetime',
required: true,
default: { '@path': '$.receivedAt' }
default: { '@template': '{{properties.audience_key}}_PII_{{receivedAt}}.csv' }
}
},
perform: async (request, { settings, payload }) => {
Expand All @@ -53,4 +48,29 @@ const action: ActionDefinition<Settings, Payload> = {
}
}

async function processData(request: RequestClient, settings: Settings, payloads: Payload[]) {
// STRATCONN-2584: error if less than 25 elements in payload
switch (settings.upload_mode) {
case 'S3':
validateS3(settings)
break
case 'SFTP':
validateSFTP(settings)
break
default:
throw new InvalidAuthenticationError(`Unexpected upload mode: ${settings.upload_mode}`)
}

const { filename, fileContent } = generateFile(payloads)

switch (settings.upload_mode) {
case 'S3':
return uploadS3(settings, filename, fileContent, request)
case 'SFTP': {
const sftpClient = new ClientSFTP()
return uploadSFTP(sftpClient, settings, filename, fileContent)
}
}
}

export default action
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import generateS3RequestOptions from '../../lib/AWS/s3'
import generateS3RequestOptions from '../../../lib/AWS/s3'
import { InvalidAuthenticationError, ModifiedResponse, RequestOptions } from '@segment/actions-core'
import type { Settings } from './generated-types'
import { Settings } from '../generated-types'

function validateS3(settings: Settings) {
if (!settings.s3_aws_access_key) {
Expand All @@ -23,7 +23,7 @@ function validateS3(settings: Settings) {
async function uploadS3(
settings: Settings,
filename: string,
fileContent: string,
fileContent: Buffer,
request: <Data = unknown>(url: string, options?: RequestOptions) => Promise<ModifiedResponse<Data>>
) {
const method = 'PUT'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { InvalidAuthenticationError } from '@segment/actions-core'
import Client from 'ssh2-sftp-client'
import path from 'path'
import { Settings } from '../generated-types'

const LIVERAMP_SFTP_SERVER = 'files.liveramp.com'
const LIVERAMP_SFTP_PORT = 22

function validateSFTP(settings: Settings) {
if (!settings.sftp_username) {
throw new InvalidAuthenticationError('Selected SFTP upload mode, but missing credentials (Username)')
}

if (!settings.sftp_password) {
throw new InvalidAuthenticationError('Selected SFTP upload mode, but missing credentials (Password)')
}

if (!settings.sftp_folder_path) {
throw new InvalidAuthenticationError('Selected SFTP upload mode, but missing SFTP folder path.')
}
}

async function uploadSFTP(sftp: Client, settings: Settings, filename: string, fileContent: Buffer) {
return doSFTP(sftp, settings, async (sftp) => {
const targetPath = path.join(settings.sftp_folder_path as string, filename)
return sftp.put(fileContent, targetPath)
})
}

async function doSFTP(sftp: Client, settings: Settings, action: { (sftp: Client): Promise<unknown> }) {
await sftp.connect({
host: LIVERAMP_SFTP_SERVER,
port: LIVERAMP_SFTP_PORT,
username: settings.sftp_username,
password: settings.sftp_password
})

await action(sftp)
await sftp.end()
}

async function testAuthenticationSFTP(sftp: Client, settings: Settings) {
return doSFTP(sftp, settings, async (sftp) => {
return sftp.list(settings.sftp_folder_path as string)
})
}

export { validateSFTP, uploadSFTP, testAuthenticationSFTP, Client }

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { DestinationDefinition } from '@segment/actions-core'
import type { Settings } from './generated-types'

import audienceEntered from './audienceEntered'
import { testAuthenticationSFTP, Client as ClientSFTP } from './audienceEntered/sftp'

const destination: DestinationDefinition<Settings> = {
name: 'Liveramp Audiences',
Expand Down Expand Up @@ -54,15 +55,18 @@ const destination: DestinationDefinition<Settings> = {
},
sftp_folder_path: {
label: 'Folder Path (SFTP only)',
description: 'Path within the SFTP server to upload the files to.',
type: 'string'
description:
'Path within the LiveRamp SFTP server to upload the files to. This path must exist and all subfolders must be pre-created.',
type: 'string',
format: 'uri-reference'
}
},
testAuthentication: (_) => {
// Return a request that tests/validates the user's credentials.
// If you do not have a way to validate the authentication fields safely,
// you can remove the `testAuthentication` function, though discouraged.
// TODO: Validate SFTP
testAuthentication: async (_, { settings }) => {
// S3 authentication is skipped to avoid requiring a GetObject permission on the IAM role.
if (settings.upload_mode == 'SFTP') {
const sftpClient = new ClientSFTP()
await testAuthenticationSFTP(sftpClient, settings)
}
}
},
actions: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
import { PayloadValidationError, RequestClient } from '@segment/actions-core'
import type { Settings } from '../generated-types'
import type { Payload } from './generated-types'
import { uploadS3, validateS3 } from '../s3'
import type { Payload } from './audienceEntered/generated-types'

async function processData(request: RequestClient, settings: Settings, payloads: Payload[]) {
// STRATCONN-2554: Add support for SFTP
if (settings.upload_mode == 'S3') {
validateS3(settings)
} else {
throw new PayloadValidationError(`Unrecognized upload mode: ${settings.upload_mode}`)
}
// STRATCONN-2584: error if less than 25 elements in payload

// Prepare header row. Expected format:
// liveramp_audience_key[1],identifier_data[0..n]
/*
Generates the LiveRamp ingestion file. Expected format:
liveramp_audience_key[1],identifier_data[0..n]
*/
function generateFile(payloads: Payload[]) {
const rows = []
const headers = ['audience_key']
if (payloads[0].identifier_data) {
Expand All @@ -36,12 +27,10 @@ async function processData(request: RequestClient, settings: Settings, payloads:
}

// STRATCONN-2584: verify multiple emails are handled
const filename = `${payloads[0].audience_name}_PII_${payloads[0].received_at}.csv`
const fileContent = rows.join('\n')
const filename = payloads[0].filename
const fileContent = Buffer.from(rows.join('\n'))

if (settings.upload_mode == 'S3') {
return await uploadS3(settings, filename, fileContent, request)
}
return { filename, fileContent }
}

/*
Expand All @@ -56,4 +45,4 @@ function enquoteIdentifier(identifier: string) {
return `"${identifier.replace(/"/g, '""')}"`
}

export { processData, enquoteIdentifier }
export { generateFile, enquoteIdentifier }
Loading

0 comments on commit 687a561

Please sign in to comment.