diff --git a/CHANGELOG.md b/CHANGELOG.md index c6ab7a4..63562f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,18 @@ -## 1.0.0 (October 8, 2019) +# 1.1.0 (December 5, 2019) -* Initial release -* Add custom port field instead of defaulting to 22 -* Add more unit tests -* Fix integration test using `path.resolve` based on test machine -* Format repository according to Airbnb styling +## General Changes + * Add `Delete file` action + * Add `Lookup file by name` action + * Add `Get new and updated files` trigger + +## Actions + +### Upload files + * Add custom name for uploaded file + +# 1.0.0 (October 8, 2019) + * Initial release + * Add custom port field instead of defaulting to 22 + * Add more unit tests + * Fix integration test using `path.resolve` based on test machine + * Format repository according to Airbnb styling diff --git a/README.md b/README.md index d22e594..d757725 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,12 @@ * [Host](#host) * [Port](#port) * [Triggers](#triggers) - * [Read](#read) + * [Read files](#read-files) + * [Get new and updated files](#get-new-and-updated-files) * [Actions](#actions) - * [Upload](#upload) + * [Upload files](#upload-files) + * [Delete file](#delete-file) + * [Lookup file by name](#lookup-file-by-name) * [Known limitations](#known-limitations) * [SSH2 SFTP Client API and Documentation links](#ssh2-sftp-client-api-and-documentation-links) @@ -35,7 +38,7 @@ Optional, port of SFTP server. Defaults to 22 if not set. ## Triggers -### Read +### Read files The following configuration fields are available: * **Directory**: The directory of the files to read from. @@ -65,19 +68,230 @@ The next component may read from `url` in `attachments` for a memory-efficient w * Note: you may need to consider cleaning up the `.elasticio_processed` directory manually +### Get new and updated files +Triggers to get all new and updated files since last polling. + +The following configuration fields are available: +* **Directory**: The directory of the files to read from. +* **Emit Behaviour**: Options are: default is `Emit Individually` emits each object in separate message, `Fetch All` emits all objects in one message +* **Start Time**: Start datetime of polling. Default min date:`-271821-04-20T00:00:00.000Z` +* **End Time**: End datetime of polling. Default max date: `+275760-09-13T00:00:00.000Z` + + +#### Expected output metadata +```json +{ + "type": "object", + "properties": { + "filename": { + "title": "File Name", + "type": "string", + "required": true + }, + "size": { + "title": "File Size", + "type": "number", + "required": true + }, + "type": { + "title": "File Type", + "type": "string", + "required": true + }, + "modifyTime": { + "title": "Last Modification Time", + "type": "number", + "required": true + }, + "accessTime": { + "title": "Last Access Time", + "type": "number", + "required": true + }, + "directory": { + "title": "Directory", + "type": "string", + "required": true + }, + "path": { + "title": "Full Path", + "type": "string", + "required": true + } + } +} + +``` + ## Actions -### Upload +### Upload files The following configuration fields are available: -|* **Directory**: The directory where the file will be uploaded to. +- **Directory**: The directory where the file will be uploaded to. * Note: if the directory does not exist, it will create it at the risk of possibly overwriting any files that may have the same name. +Input metadata: + +- **Filename**: Custom name for uploaded file. + +Notes: +* Uploaded file name will get filename of income file if new `Filename` doesn't provided +* `Filename` will be added at the beggining of attachment name if income message contains multiple attachments: `[SpecifiedFilename]_[NameOfExistedFile]` +* File will be overwrited in case when file with specified name already exists in directory + +### Delete file +Action to delete file by provided full file path. + +#### Expected input metadata +```json +{ + "type": "object", + "properties": { + "path": { + "title": "Full Path", + "type": "string", + "required": true + } + } +} +``` + +#### Expected output metadata +```json +{ + "type": "object", + "properties": { + "id": { + "title": "Full Path", + "type": "string", + "required": true + } + } +} + +``` + +### Lookup file by name +Finds a file by name in the provided directory and uploads (streams) to the attachment storage (a.k.a. steward). +After the upload, the READ-URL of the file will be used to generate a message with content like below: + +```json +{ + "id": "0c196dca-4187-4b49-bf90-5cfe9030955b", + "attachments": { + "1.txt": { + "url": "http://steward-service.platform.svc.cluster.local:8200/files/99999-6613-410a-9da8-c5f6d529b683", + "size": 7 + } + }, + "body": { + "type": "-", + "name": "1.txt", + "size": 7, + "modifyTime": "2019-12-02T13:05:42.000Z", + "accessTime": "2019-12-04T14:14:54.000Z", + "rights": { + "user": "rw", + "group": "r", + "other": "r" + }, + "owner": 1002, + "group": 1002, + "attachment_url": "http://steward-service.platform.svc.cluster.local:8200/files/99999-6613-410a-9da8-c5f6d529b683", + "directory": "/www/olhav", + "path": "/www/olhav/1.txt" + } +} +``` + +The next component may read from `url` in `attachments` for a memory-efficient way to read/parse data. + +#### List of Expected Config fields +##### Allow Empty Result +Default `No`. In case `No` is selected - an error will be thrown when no objects were found, +If `Yes` is selected - an empty object will be returned instead of throwing an error. + +##### Allow ID to be Omitted +Default `No`. In case `No` is selected - an error will be thrown when object id is missing in metadata, if `Yes` is selected - an empty object will be returned instead of throwing an error. + +#### Expected input metadata +```json +{ + "type": "object", + "properties": { + "path": { + "title": "Path and File Name", + "type": "string" + } + } +} +``` + +#### Expected output metadata + +
+Output metadata + +```json + +{ + "type": "object", + "properties": { + "type": { + "title": "Type", + "type": "string", + "required": true + }, + "name": { + "title": "File Name", + "type": "string", + "required": true + }, + "size": { + "title": "File Size", + "type": "number", + "required": true + }, + "modifyTime": { + "title": "modifyTime", + "type": "string", + "required": true + }, + "accessTime": { + "title": "accessTime", + "type": "string", + "required": true + }, + "directory": { + "title": "directory", + "type": "string", + "required": true + }, + "path": { + "title": "path", + "type": "string", + "required": true + }, + "attachment_url": { + "title": "File Size", + "type": "number", + "required": true + } + } +} + +``` +
+ ## Known limitations -* The maximum file size accepted by the SFTP component is limited to 100 MiB (Mebibytes) +* The maximum file size accepted by the SFTP component is limited to 100 MB. * The attachments mechanism does not work with [Local Agent Installation](https://support.elastic.io/support/solutions/articles/14000076461-announcing-the-local-agent-) +* `Get new and updated files` trigger mechanism is based on SFTP file `modifyTime` metadata field. For correct processing the trigger requires correct time configuration on the SFTP server. +* `Get new and updated files` trigger does not support empty files processing. +* `Get new and updated files` trigger does not support `fetch page` Emit Behaviour ## SSH2 SFTP Client API and Documentation links diff --git a/component.json b/component.json index 147a80a..116758a 100644 --- a/component.json +++ b/component.json @@ -34,7 +34,7 @@ "upload": { "main": "./lib/actions/upload.js", "title": "Upload files", - "description": "Upload files (attachments) to SFTP directory", + "description": "Upload files in a defined SFTP directory", "fields": { "directory": { "viewClass": "TextFieldView", @@ -43,6 +43,161 @@ "placeholder": "/foo/bar", "note": "Directory will be created if not exists, attachments with the same name would be overwritten" } + }, + "metadata": { + "in": { + "type": "object", + "properties": { + "filename": { + "title": "File Name", + "type": "string", + "required": false + } + } + }, + "out": { + "type": "object", + "properties": { + "result": { + "title": "Result", + "type": "object", + "required": true, + "properties": { + "results": { + "type": "array", + "items": { + "type": "object", + "properties": { + "attachment": { + "title": "Attachment Name", + "type": "string", + "required": true + }, + "uploadedOn": { + "title": "Uploaded on", + "type": "string", + "required": true + }, + "fileName": { + "title": "File Name", + "type": "string", + "required": true + } + } + } + } + } + } + } + } + } + }, + "delete": { + "main": "./lib/actions/delete.js", + "title": "Delete file", + "description": "Delete file by name in provided path", + "metadata": { + "in": { + "type": "object", + "properties": { + "path": { + "title": "Full Path", + "type": "string", + "required": true + } + } + }, + "out": { + "type": "object", + "properties": { + "id": { + "title": "Full Path", + "type": "string", + "required": true + } + } + } + } + }, + "lookupObject": { + "main": "./lib/actions/lookupObject.js", + "title": "Lookup file by name", + "description": "Lookup file by name in a defined SFTP directory", + "fields": { + "allowEmptyResult": { + "viewClass": "SelectView", + "label": "Allow Empty Result", + "model": { + "Yes": "Yes", + "No": "No" + }, + "prompt": "Allow empty result if object not found? Default: No." + }, + "allowCriteriaToBeOmitted": { + "viewClass": "SelectView", + "label": "Allow File name to be Omitted", + "model": { + "Yes": "Yes", + "No": "No" + }, + "prompt": "Allow File name to be omitted? Default: No." + } + }, + "metadata": { + "in": { + "type": "object", + "properties": { + "path": { + "title": "Path and File Name", + "type": "string" + } + } + }, + "out": { + "type": "object", + "properties": { + "type": { + "title": "Type", + "type": "string", + "required": true + }, + "name": { + "title": "File Name", + "type": "string", + "required": true + }, + "size": { + "title": "File Size", + "type": "number", + "required": true + }, + "modifyTime": { + "title": "modifyTime", + "type": "string", + "required": true + }, + "accessTime": { + "title": "accessTime", + "type": "string", + "required": true + }, + "directory": { + "title": "directory", + "type": "string", + "required": true + }, + "path": { + "title": "path", + "type": "string", + "required": true + }, + "attachment_url": { + "title": "File Size", + "type": "number", + "required": true + } + } + } } } }, @@ -66,6 +221,61 @@ "placeholder": "Pattern" } }, + "metadata": { + "out": { + "type": "object", + "properties": { + "path": { + "title": "Full Path", + "type": "string", + "required": true + }, + "size": { + "title": "File Size", + "type": "number", + "required": true + } + } + } + } + }, + "pollingTrigger": { + "main": "./lib/triggers/polling.js", + "title": "Get new and updated files", + "type": "polling", + "description": "Will continuously poll remote SFTP location for files that match given pattern. Found files will be transferred as attachments to the next component.", + "fields": { + "directory": { + "viewClass": "TextFieldView", + "label": "Directory Name", + "required": true, + "placeholder": "Directory" + }, + "emitBehaviour": { + "label": "Emit Behaviour", + "viewClass": "SelectView", + "required": false, + "prompt": "Default Emit Individually", + "model": { + "emitIndividually": "Emit Individually", + "fetchAll": "Fetch All" + } + }, + "startTime": { + "label": "Start Time", + "viewClass": "TextFieldView", + "required": false, + "note": "Default: minimum time", + "placeholder": "Format ISO 8601: 2019-11-21T04:08:01Z" + }, + "endTime": { + "label": "End Time", + "viewClass": "TextFieldView", + "required": false, + "note": "Default: maximum time", + "placeholder": "Format ISO 8601: 2019-11-21T04:08:01Z" + } + }, "metadata": { "out": { "type": "object", @@ -79,6 +289,31 @@ "title": "File Size", "type": "number", "required": true + }, + "type": { + "title": "File Type", + "type": "string", + "required": true + }, + "modifyTime": { + "title": "Last Modification Time", + "type": "number", + "required": true + }, + "accessTime": { + "title": "Last Access Time", + "type": "number", + "required": true + }, + "directory": { + "title": "Directory", + "type": "string", + "required": true + }, + "path": { + "title": "Full Path", + "type": "string", + "required": true } } } diff --git a/lib/actions/delete.js b/lib/actions/delete.js new file mode 100644 index 0000000..d0434b4 --- /dev/null +++ b/lib/actions/delete.js @@ -0,0 +1,11 @@ +const { SftpDelete } = require('../utils/deleteUtil'); +const Sftp = require('../Sftp'); + +async function process(msg, cfg, snapshot = {}) { + const sftpClient = new Sftp(this.logger, cfg); + await sftpClient.connect(); + const deleteAction = new SftpDelete(this.logger, sftpClient); + return deleteAction.process(msg, cfg, snapshot); +} + +module.exports.process = process; diff --git a/lib/actions/lookupObject.js b/lib/actions/lookupObject.js new file mode 100644 index 0000000..0a28a31 --- /dev/null +++ b/lib/actions/lookupObject.js @@ -0,0 +1,13 @@ +const { SftpLookupObject } = require('../utils/lookupObjectUtil'); +const Sftp = require('../Sftp'); + +async function process(msg, cfg, snapshot = {}) { + const sftpClient = new Sftp(this.logger, cfg); + await sftpClient.connect(); + const lookupObjectAction = new SftpLookupObject(this.logger, sftpClient); + const result = await lookupObjectAction.process(msg, cfg, snapshot); + await sftpClient.end(); + return result; +} + +module.exports.process = process; diff --git a/lib/actions/upload.js b/lib/actions/upload.js index 12368d3..a1a7965 100644 --- a/lib/actions/upload.js +++ b/lib/actions/upload.js @@ -11,6 +11,7 @@ const Sftp = require('../Sftp'); * @param cfg configuration that is account information and configuration field values */ exports.process = async function processAction(msg, cfg) { + this.logger.info('Connecting to sftp server...'); const sftp = new Sftp(this.logger, cfg); await sftp.connect(); @@ -18,15 +19,24 @@ exports.process = async function processAction(msg, cfg) { results: [], }; const dir = cfg.directory || '/'; + // eslint-disable-next-line no-use-before-define + const filename = prepareFilename(msg); + this.logger.info(`Prepared filename: ${filename}`); + const isExists = await sftp.exists(dir); if (!isExists) { await sftp.mkdir(dir, true); } + + this.logger.info(`Found ${Object.keys(msg.attachments).length} attachments`); // eslint-disable-next-line no-restricted-syntax for (const key of Object.keys(msg.attachments)) { const attachment = msg.attachments[key]; const cur = await sftp.cwd(); - const targetPath = (cur.charAt(0) === '/') ? path.posix.resolve(dir, key) : path.resolve(dir, key); + // eslint-disable-next-line no-use-before-define + const keyName = prepareKeyname(key, filename, msg); + const targetPath = (cur.charAt(0) === '/') ? path.posix.resolve(dir, keyName) : path.resolve(dir, keyName); + this.logger.info(`Writing attachment to ${targetPath}`); this.logger.info(`Getting attachment for ${key}`); const file = await new AttachmentProcessor().getAttachment(attachment.url, 'stream'); @@ -37,9 +47,29 @@ exports.process = async function processAction(msg, cfg) { result.results.push({ attachment: key, uploadedOn: new Date().toISOString(), - fileName: targetPath, + path: targetPath, }); } await sftp.end(); return eioUtils.newMessageWithBody(result); }; + +function prepareFilename(msg) { + if (msg.body.filename) { + if (Object.keys(msg.attachments).length > 1) { + return msg.body.filename.split('.')[0]; + } + return msg.body.filename; + } + return null; +} + +function prepareKeyname(key, filename, msg) { + if (filename) { + if (Object.keys(msg.attachments).length > 1) { + return `${filename}_${key}`; + } + return filename; + } + return key; +} diff --git a/lib/attachments.js b/lib/attachments.js index 6fa6d98..3f3c983 100644 --- a/lib/attachments.js +++ b/lib/attachments.js @@ -1,9 +1,13 @@ +/* eslint-disable no-param-reassign */ + const { AttachmentProcessor } = require('@elastic.io/component-commons-library'); +const { Transform, Readable } = require('stream'); +const path = require('path'); +const { unixTimeToIsoDate } = require('../lib/utils/utils'); async function addAttachment(msg, name, stream, contentLength) { try { const result = await new AttachmentProcessor().uploadAttachment(stream, 'stream'); - // eslint-disable-next-line no-param-reassign msg.attachments[name] = { url: result.config.url, size: contentLength, @@ -13,4 +17,64 @@ async function addAttachment(msg, name, stream, contentLength) { } } +async function uploadFromSftpToAttachment(context, body, dir) { + const { logger, client } = context; + const filePath = path.join(dir, body.name); + const transform = new Transform({ + writableObjectMode: true, + readableObjectMode: true, + transform: (chunk, _, cb) => { + cb(null, chunk); + }, + }); + logger.info('About to start saving file: %s', filePath); + await client.get(filePath, transform); + + const attachmentProcessor = new AttachmentProcessor(); + const uploadResult = await attachmentProcessor.uploadAttachment(transform); + const attachmentUrl = uploadResult.config.url; + logger.info('File %s successfully uploaded to URL: %s', filePath, attachmentUrl); + const attachments = { + [body.name]: { + url: uploadResult.config.url, + size: body.size, + }, + }; + body.attachment_url = attachmentUrl; + body.directory = dir; + body.path = filePath; + body.modifyTime = unixTimeToIsoDate(body.modifyTime); + body.accessTime = unixTimeToIsoDate(body.accessTime); + return { body, attachments }; +} + +async function uploadFromSftpToAttachmentBuffer(context, body, dir) { + const { logger, client } = context; + const filePath = path.join(dir, body.name); + const buffer = await client.get(filePath); + const readStream = new Readable(); + readStream.push(buffer); + readStream.push(null); + logger.info('About to start saving file: %s', filePath); + + const attachmentProcessor = new AttachmentProcessor(); + const uploadResult = await attachmentProcessor.uploadAttachment(readStream); + const attachmentUrl = uploadResult.config.url; + logger.info('File %s successfully uploaded to URL: %s', filePath, attachmentUrl); + const attachments = { + [body.name]: { + url: uploadResult.config.url, + size: body.size, + }, + }; + body.attachment_url = attachmentUrl; + body.directory = dir; + body.path = filePath; + body.modifyTime = unixTimeToIsoDate(body.modifyTime); + body.accessTime = unixTimeToIsoDate(body.accessTime); + return { body, attachments }; +} + exports.addAttachment = addAttachment; +exports.uploadFromSftpToAttachment = uploadFromSftpToAttachment; +exports.uploadFromSftpToAttachmentBuffer = uploadFromSftpToAttachmentBuffer; diff --git a/lib/triggers/polling.js b/lib/triggers/polling.js new file mode 100644 index 0000000..db20ab7 --- /dev/null +++ b/lib/triggers/polling.js @@ -0,0 +1,12 @@ +const { SftpPolling } = require('../utils/pollingUtil'); +const Sftp = require('../Sftp'); + +async function process(msg, cfg, snapshot = {}) { + const sftpClient = new Sftp(this.logger, cfg); + await sftpClient.connect(); + const pollingTrigger = new SftpPolling(this.logger, this, sftpClient, cfg); + await pollingTrigger.process(cfg, snapshot); + return sftpClient.end(); +} + +module.exports.process = process; diff --git a/lib/utils/deleteUtil.js b/lib/utils/deleteUtil.js new file mode 100644 index 0000000..b18aeac --- /dev/null +++ b/lib/utils/deleteUtil.js @@ -0,0 +1,22 @@ +const { DeleteById } = require('@elastic.io/oih-standard-library/lib/actions/delete'); + +class SftpDelete extends DeleteById { + constructor(logger, client) { + super(logger); + this.client = client; + } + + // eslint-disable-next-line class-methods-use-this, no-unused-vars + getId(msg, cfg) { + return msg.body.path; + } + + async deleteObject(path) { + this.logger.info(`Deleting file by path: ${path}`); + await this.client.delete(path); + await this.client.end(); + return path; + } +} + +exports.SftpDelete = SftpDelete; diff --git a/lib/utils/lookupObjectUtil.js b/lib/utils/lookupObjectUtil.js new file mode 100644 index 0000000..5e2456c --- /dev/null +++ b/lib/utils/lookupObjectUtil.js @@ -0,0 +1,86 @@ +const { messages } = require('elasticio-node'); +const path = require('path'); +const { LookupObjectById } = require('@elastic.io/oih-standard-library/lib/actions/lookupObject'); +const attachments = require('../attachments'); + +class SftpLookupObject extends LookupObjectById { + constructor(logger, client) { + super(logger); + this.client = client; + } + + // eslint-disable-next-line class-methods-use-this, no-unused-vars + getId(msg, cfg) { + return msg.body.path; + } + + async getFile(dir, filename) { + this.logger.info(`Starting verify is file with name: ${filename} is exists in directory ${dir}`); + const list = await this.client.list(dir, new RegExp(filename)); + const files = list.filter((file) => file.name === filename && file.type === '-'); + if (files.length !== 1) { + if (files.length === 0) { + this.logger.info(`File with name ${filename} was not found`); + } else { + this.logger.info('More than one file were found'); + } + return null; + } + const [file] = files; + this.logger.trace(`File with name: ${filename}: ${JSON.stringify(file)}`); + this.logger.info(`File with name: ${filename} is exists in directory ${dir}`); + return file; + } + + async lookupObject(filePath) { + const directory = path.posix.dirname(filePath); + const filename = path.basename(filePath); + const file = await this.getFile(directory, filename); + if (!file) { + return null; + } + const uploadResult = await attachments.uploadFromSftpToAttachmentBuffer(this, file, directory); + const result = messages.newMessageWithBody(uploadResult.body); + result.attachments = uploadResult.attachments; + return result; + } + + async process(msg, cfg, snapshot) { + try { + this.logger.info('Starting processing lookupObjectById action'); + this.logger.trace('Incoming configuration: %j', cfg); + this.logger.trace('Incoming message: %j', msg); + this.logger.trace('Incoming snapshot: %j', snapshot); + const id = this.getId(msg, cfg); + if (id === undefined || id === null) { + this.logger.trace('filename is empty'); + if (this.isOmittedCriteriaAllowed(cfg, msg)) { + this.logger.trace('Empty filename allowed, returning empty object'); + this.logger.info('Finished processing lookupObjectById action'); + return messages.newEmptyMessage(); + } + this.logger.trace('Empty filename is not allowed throwing error'); + throw new Error('Empty filename is not allowed.'); + } + const result = await this.lookupObject(id, cfg); + this.logger.trace('Result of lookup: %j', result); + if (result === null) { + this.logger.trace('Object not found for filename: %j', id); + if (this.isEmptyResultAllowed(cfg, msg)) { + this.logger.trace('Empty result allowed, returning empty object'); + this.logger.info('Finished processing lookupObjectById action'); + return messages.newEmptyMessage(); + } + this.logger.trace('Empty result not allowed, throwing error'); + throw new Error(`Object with filename: ${JSON.stringify(id)} not found. Empty result is not allowed.`); + } + this.logger.info('Finished processing lookupObjectById action'); + return result; + } catch (e) { + this.logger.error('Unexpected error while processing lookupObjectById call for message: %j, cfg: %j', msg, cfg, e); + throw e; + } + } +} + +exports.SftpLookupObject = SftpLookupObject; diff --git a/lib/utils/pollingUtil.js b/lib/utils/pollingUtil.js new file mode 100644 index 0000000..62b247d --- /dev/null +++ b/lib/utils/pollingUtil.js @@ -0,0 +1,116 @@ +const { PollingTrigger } = require('@elastic.io/oih-standard-library/lib/triggers/getNewAndUpdated'); +const { AttachmentProcessor } = require('@elastic.io/component-commons-library'); +const { Readable } = require('stream'); +const { messages } = require('elasticio-node'); + +const Sftp = require('../Sftp'); +const { getDirectory } = require('./utils'); + +class SftpPolling extends PollingTrigger { + constructor(logger, context, client, cfg) { + super(logger, context); + this.client = client; + this.cfg = cfg; + } + + async getObjects(objectType, startTime, endTime, cfg) { + const formattedStartTime = new Date(startTime); + const formattedEndTime = new Date(endTime); + const fileList = await this.client.list(cfg.directory); + return fileList + .filter((file) => file.type !== 'd') + .filter((file) => new Date(file.modifyTime) >= formattedStartTime) + .filter((file) => new Date(file.modifyTime) < formattedEndTime); + } + + async emitIndividually(results) { + this.logger.debug('Start emitting data'); + const attachmentProcessor = new AttachmentProcessor(); + for (let i = 0; i < results.length; i += 1) { + const r = results[i]; + if (r === null || r === undefined) { + this.logger.trace('Not emitting result with empty body, result was: %j', r); + } else { + try { + this.logger.debug('Processing file with name: %s, size: %d', r.name, r.size); + // eslint-disable-next-line no-await-in-loop + const attachmentResult = await this.uploadFileAsAttachment(attachmentProcessor, r); + + const resultMessage = messages.newMessageWithBody(this.prepareMessageDescription(r)); + + resultMessage.attachments[r.name] = { + url: attachmentResult.config.url, + size: r.size, + }; + + this.logger.trace('Emitting new message with body: %j', resultMessage.body); + // eslint-disable-next-line no-await-in-loop + await this.context.emit('data', resultMessage); + } catch (e) { + // eslint-disable-next-line no-await-in-loop + await this.context.emit('error', e); + } + } + } + this.logger.debug('Finished emitting data'); + } + + async emitAll(results) { + this.logger.debug('Start emitting data'); + const attachmentProcessor = new AttachmentProcessor(); + if (results === null || results === undefined || results.length === 0) { + this.logger.trace('Not emitting result with empty body, results was: %j', results); + return; + } + + const resultMessage = messages.newMessageWithBody({ results: [] }); + for (let i = 0; i < results.length; i += 1) { + const r = results[i]; + + try { + this.logger.debug('Processing file with name: %s, size: %d', r.name, r.size); + // eslint-disable-next-line no-await-in-loop + const attachmentResult = await this.uploadFileAsAttachment(attachmentProcessor, r); + resultMessage.attachments[r.name] = { + url: attachmentResult.config.url, + size: r.size, + }; + + resultMessage.body.results.push(this.prepareMessageDescription(r)); + } catch (e) { + // eslint-disable-next-line no-await-in-loop + await this.context.emit('error', e); + } + } + + this.logger.trace('Emitting new message with body: %j', resultMessage.body); + await this.context.emit('data', resultMessage); + this.logger.debug('Finished emitting data'); + } + + // eslint-disable-next-line class-methods-use-this + prepareMessageDescription(file) { + const dir = getDirectory(this.cfg); + return { + type: file.type, + filename: file.name, + size: file.size, + modifyTime: new Date(file.modifyTime).toISOString(), + accessTime: new Date(file.accessTime).toISOString(), + directory: dir, + path: `${dir}/${file.name}`, + }; + } + + async uploadFileAsAttachment(attachmentProcessor, file) { + const path = Sftp.createPath(this.cfg.directory, file.name); + + const buffer = await this.client.get(path); + const readStream = new Readable(); + readStream.push(buffer); + readStream.push(null); + return attachmentProcessor.uploadAttachment(readStream); + } +} + +exports.SftpPolling = SftpPolling; diff --git a/lib/utils/utils.js b/lib/utils/utils.js new file mode 100644 index 0000000..9009274 --- /dev/null +++ b/lib/utils/utils.js @@ -0,0 +1,15 @@ +const moment = require('moment'); + +function getDirectory(cfg) { + const { directory } = cfg; + return directory.substring(directory.length - 1) === '/' + ? directory.substring(0, directory.length - 1) + : directory; +} + +function unixTimeToIsoDate(unixTime) { + return moment.utc(unixTime, 'x', true).toISOString(); +} + +exports.getDirectory = getDirectory; +exports.unixTimeToIsoDate = unixTimeToIsoDate; diff --git a/logo.png b/logo.png index 3a65f0c..9f083e5 100644 Binary files a/logo.png and b/logo.png differ diff --git a/package-lock.json b/package-lock.json index c9173c1..7df5156 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "sftp-component", - "version": "1.0.0", + "version": "1.1.0", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -43,11 +43,46 @@ "typescript": "3.4.4" } }, + "@elastic.io/component-logger": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/@elastic.io/component-logger/-/component-logger-0.0.1.tgz", + "integrity": "sha512-H+xdlkjoHsikPkdTGB7mskkzGY36i8zj0F6IXkGjdz98Hz7HzO0iE5ORNGgTw6naFgDhAe7lmKEnDjre3ENUkQ==", + "requires": { + "bunyan": "1.8.12", + "bunyan-format": "^0.2.1", + "bunyan-serializers": "0.0.2" + } + }, "@elastic.io/jsonata-moment": { "version": "1.1.3", "resolved": "https://registry.npmjs.org/@elastic.io/jsonata-moment/-/jsonata-moment-1.1.3.tgz", "integrity": "sha512-2m2jTvrAxYHRW2ktlQnEsQf2iMETppSgbw6xUAvUzkW1wgPM4xJlZ+BhIEVbjzlgeCO1XbVbNMUapZRUwmtV9Q==" }, + "@elastic.io/oih-standard-library": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/@elastic.io/oih-standard-library/-/oih-standard-library-1.0.0.tgz", + "integrity": "sha512-zPN7prReAiaUEYELELGE+qS6Cj2G0680hMUU5tmt8+3Ws5uQlTs5WGVUPuSHaxudfcsWLkcz+FWJaunH7U1dWQ==", + "requires": { + "@elastic.io/component-logger": "0.0.1", + "@types/mocha": "5.2.7", + "@types/node": "12.11.7", + "elasticio-node": "0.0.9", + "elasticio-rest-node": "1.2.3", + "typescript": "3.6.4" + }, + "dependencies": { + "@types/node": { + "version": "12.11.7", + "resolved": "https://registry.npmjs.org/@types/node/-/node-12.11.7.tgz", + "integrity": "sha512-JNbGaHFCLwgHn/iCckiGSOZ1XYHsKFwREtzPwSGCVld1SGhOlmZw2D4ZI94HQCrBHbADzW9m4LER/8olJTRGHA==" + }, + "typescript": { + "version": "3.6.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-3.6.4.tgz", + "integrity": "sha512-unoCll1+l+YK4i4F8f22TaNVPRHcD9PA3yCuZ8g5e0qGqlVlJ/8FSateOLLSagn+Yg5+ZwuPkL8LFUc0Jcvksg==" + } + } + }, "@sinonjs/commons": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-1.6.0.tgz", @@ -92,6 +127,11 @@ "integrity": "sha512-+iTbntw2IZPb/anVDbypzfQa+ay64MW0Zo8aJ8gZPWMMK6/OubMVb6lUPMagqjOPnmtauXnFCACVl3O7ogjeqQ==", "dev": true }, + "@types/mocha": { + "version": "5.2.7", + "resolved": "https://registry.npmjs.org/@types/mocha/-/mocha-5.2.7.tgz", + "integrity": "sha512-NYrtPht0wGzhwe9+/idPaBB+TqkY9AhTvOLMkThm0IoEfLaiVQZwBwyJ5puCkO3AUCWrmcoePjp2mbFocKy4SQ==" + }, "@types/node": { "version": "8.10.51", "resolved": "https://registry.npmjs.org/@types/node/-/node-8.10.51.tgz", @@ -751,6 +791,20 @@ "type-detect": "^4.0.0" } }, + "deep-equal": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/deep-equal/-/deep-equal-1.1.1.tgz", + "integrity": "sha512-yd9c5AdiqVcR+JjcwUQb9DkhJc8ngNr0MahEBGvDiJw8puWab2yZlh+nkasOnZP+EGTAP6rRp2JzJhJZzvNF8g==", + "dev": true, + "requires": { + "is-arguments": "^1.0.4", + "is-date-object": "^1.0.1", + "is-regex": "^1.0.4", + "object-is": "^1.0.1", + "object-keys": "^1.1.1", + "regexp.prototype.flags": "^1.2.0" + } + }, "deep-is": { "version": "0.1.3", "resolved": "https://registry.npmjs.org/deep-is/-/deep-is-0.1.3.tgz", @@ -1984,6 +2038,12 @@ } } }, + "is-arguments": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/is-arguments/-/is-arguments-1.0.4.tgz", + "integrity": "sha512-xPh0Rmt8NE65sNzvyUmWgI1tz3mKq74lGA0mL8LYZcoIzKOzDh6HmrYm3d18k60nHerC8A9Km8kYu87zfSFnLA==", + "dev": true + }, "is-arrayish": { "version": "0.2.1", "resolved": "https://registry.npmjs.org/is-arrayish/-/is-arrayish-0.2.1.tgz", @@ -2565,6 +2625,23 @@ "path-to-regexp": "^1.7.0" } }, + "nock": { + "version": "9.6.1", + "resolved": "https://registry.npmjs.org/nock/-/nock-9.6.1.tgz", + "integrity": "sha512-EDgl/WgNQ0C1BZZlASOQkQdE6tAWXJi8QQlugqzN64JJkvZ7ILijZuG24r4vCC7yOfnm6HKpne5AGExLGCeBWg==", + "dev": true, + "requires": { + "chai": "^4.1.2", + "debug": "^3.1.0", + "deep-equal": "^1.0.0", + "json-stringify-safe": "^5.0.1", + "lodash": "^4.17.5", + "mkdirp": "^0.5.0", + "propagate": "^1.0.0", + "qs": "^6.5.1", + "semver": "^5.5.0" + } + }, "node-environment-flags": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/node-environment-flags/-/node-environment-flags-1.0.4.tgz", @@ -2643,6 +2720,12 @@ "integrity": "sha512-GJzfBZ6DgDAmnuaM3104jR4s1Myxr3Y3zfIyN4z3UdqN69oSRacNK8UhnobDdC+7J2AHCjGwxQubNJfE70SXXQ==", "dev": true }, + "object-is": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/object-is/-/object-is-1.0.1.tgz", + "integrity": "sha1-CqYOyZiaCz7Xlc9NBvYs8a1lObY=", + "dev": true + }, "object-keys": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-1.1.1.tgz", @@ -2976,6 +3059,12 @@ "integrity": "sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==", "dev": true }, + "propagate": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/propagate/-/propagate-1.0.0.tgz", + "integrity": "sha1-AMLa7t2iDofjeCs0Stuhzd1q1wk=", + "dev": true + }, "psl": { "version": "1.1.31", "resolved": "https://registry.npmjs.org/psl/-/psl-1.1.31.tgz", @@ -3093,6 +3182,15 @@ "safe-regex": "^1.1.0" } }, + "regexp.prototype.flags": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/regexp.prototype.flags/-/regexp.prototype.flags-1.2.0.tgz", + "integrity": "sha512-ztaw4M1VqgMwl9HlPpOuiYgItcHlunW0He2fE6eNfT6E/CF2FtYi9ofOYe4mKntstYk0Fyh/rDRBdS3AnxjlrA==", + "dev": true, + "requires": { + "define-properties": "^1.1.2" + } + }, "regexpp": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/regexpp/-/regexpp-2.0.1.tgz", diff --git a/package.json b/package.json index 6425fba..d08cf9e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "sftp-component", - "version": "1.0.0", + "version": "1.1.0", "description": "elastic.io SFTP component", "scripts": { "pretest": "eslint spec spec-integration lib verifyCredentials.js --fix", @@ -15,9 +15,11 @@ "license": "BSD-2-Clause", "dependencies": { "@elastic.io/component-commons-library": "0.0.6", + "@elastic.io/oih-standard-library": "1.0.0", "elasticio-node": "0.0.9", "elasticio-rest-node": "1.2.3", "elasticio-sailor-nodejs": "2.5.1", + "moment": "2.24.0", "ssh2-sftp-client": "4.1.0" }, "engines": { @@ -31,6 +33,7 @@ "eslint-config-airbnb-base": "14.0.0", "eslint-plugin-import": "2.18.2", "mocha": "6.0.2", + "nock": "9.6.1", "sinon": "7.4.2" } } diff --git a/spec-integration/integration.spec.js b/spec-integration/integration.spec.js index 56a3e61..2fed9a6 100644 --- a/spec-integration/integration.spec.js +++ b/spec-integration/integration.spec.js @@ -1,9 +1,13 @@ const { expect } = require('chai'); const EventEmitter = require('events'); const bunyan = require('bunyan'); +const sinon = require('sinon'); +const { AttachmentProcessor } = require('@elastic.io/component-commons-library'); const Sftp = require('../lib/Sftp'); +const deleteAction = require('../lib/actions/delete'); const upload = require('../lib/actions/upload'); const read = require('../lib/triggers/read'); +const lookupObject = require('../lib/actions/lookupObject'); require('dotenv').config(); const PROCESSED_FOLDER_NAME = '.elasticio_processed'; @@ -36,9 +40,9 @@ describe('SFTP integration test - upload then download', function () { const testNumber = Math.floor(Math.random() * 10000); before(() => { - if (!process.env.HOSTNAME) { throw new Error('Please set HOSTNAME env variable to proceed'); } - host = process.env.HOSTNAME; - username = process.env.USER; + if (!process.env.SFTP_HOSTNAME) { throw new Error('Please set SFTP_HOSTNAME env variable to proceed'); } + host = process.env.SFTP_HOSTNAME; + username = process.env.USERNAME; password = process.env.PASSWORD; port = process.env.PORT; directory = `/home/eiotesti/www/integration-test/test-${testNumber}/`; @@ -65,7 +69,7 @@ describe('SFTP integration test - upload then download', function () { }, }; const result = await upload.process.call(sender, msg, cfg); - + expect(result.body.results).to.be.an('array'); expect(result.body.results.length).to.equal(1); expect(result.body.results[0].attachment).to.equal('logo.svg'); @@ -116,6 +120,93 @@ describe('SFTP integration test - upload then download', function () { await sftp.rmdir(cfg.directory, false); }); + it('Uploads and reads attachments with custom name', async () => { + const cfg = { + host, + username, + password, + port, + directory, + }; + sftp = new Sftp(bunyan.createLogger({ name: 'dummy' }), cfg); + await sftp.connect(); + + await upload.process.call(new TestEmitter(), { + body: { filename: 'custom.svg' }, + attachments: { + 'logo.svg': { + url: 'https://app.elastic.io/img/logo.svg', + }, + 'logo2.svg': { + url: 'https://app.elastic.io/img/logo.svg', + }, + }, + }, cfg); + + const receiver = new TestEmitter(); + const msg = {}; + await read.process.call(receiver, msg, cfg); + expect(receiver.data.length).to.equal(2); + expect(receiver.data[0].body.filename).to.equal('custom_logo.svg'); + expect(receiver.data[0].body.size).to.equal(4379); + expect(receiver.data[1].body.filename).to.equal('custom_logo2.svg'); + expect(receiver.data[1].body.size).to.equal(4379); + const logoFilename = (await sftp.list(`${cfg.directory}${PROCESSED_FOLDER_NAME}`))[0].name; + const logo2Filename = (await sftp.list(`${cfg.directory}${PROCESSED_FOLDER_NAME}`))[1].name; + await sftp.delete(`${cfg.directory}${PROCESSED_FOLDER_NAME}/${logoFilename}`); + await sftp.delete(`${cfg.directory}${PROCESSED_FOLDER_NAME}/${logo2Filename}`); + await sftp.rmdir(`${cfg.directory}${PROCESSED_FOLDER_NAME}`, false); + await sftp.rmdir(cfg.directory, false); + }); + + it('Uploads, read and deletes attachments with custom name', async () => { + const cfg = { + host, + username, + password, + port, + directory, + }; + sftp = new Sftp(bunyan.createLogger({ name: 'dummy' }), cfg); + await sftp.connect(); + + await upload.process.call(new TestEmitter(), { + body: { filename: 'custom.svg' }, + attachments: { + 'logo.svg': { + url: 'https://app.elastic.io/img/logo.svg', + }, + 'logo2.svg': { + url: 'https://app.elastic.io/img/logo.svg', + }, + }, + }, cfg); + + const receiver = new TestEmitter(); + const msg = {}; + await read.process.call(receiver, msg, cfg); + expect(receiver.data.length).to.equal(2); + expect(receiver.data[0].body.filename).to.equal('custom_logo.svg'); + expect(receiver.data[0].body.size).to.equal(4379); + expect(receiver.data[1].body.filename).to.equal('custom_logo2.svg'); + expect(receiver.data[1].body.size).to.equal(4379); + + const logoFilename = (await sftp.list(`${cfg.directory}${PROCESSED_FOLDER_NAME}`))[0].name; + const logo2Filename = (await sftp.list(`${cfg.directory}${PROCESSED_FOLDER_NAME}`))[1].name; + + const dir = `${cfg.directory}${PROCESSED_FOLDER_NAME}`; + const deleteResult = await deleteAction.process.call(receiver, + { body: { path: `${dir}/${logoFilename}` } }, cfg); + const deleteResult2 = await deleteAction.process.call(receiver, + { body: { path: `${dir}/${logo2Filename}` } }, cfg); + + expect(deleteResult.body.id).to.equal(`${dir}/${logoFilename}`); + expect(deleteResult2.body.id).to.equal(`${dir}/${logo2Filename}`); + + await sftp.rmdir(`${cfg.directory}${PROCESSED_FOLDER_NAME}`, false); + await sftp.rmdir(cfg.directory, false); + }); + it('Uploads, reads, and filters files by pattern match', async () => { const cfg = { host, @@ -161,6 +252,48 @@ describe('SFTP integration test - upload then download', function () { await sftp.rmdir(cfg.directory, false); }); + it('Uploads and lookup', async () => { + const attachmentProcessorStub = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment'); + const callAttachmentProcessor = attachmentProcessorStub.returns({ config: { url: 'https://url' } }); + const cfg = { + host, + username, + password, + port, + directory, + }; + sftp = new Sftp(bunyan.createLogger({ name: 'dummy' }), cfg); + await sftp.connect(); + + await upload.process.call(new TestEmitter(), { + body: { + filename: 'logo.svg', + }, + attachments: { + 'logo.svg': { + url: 'https://app.elastic.io/img/logo.svg', + }, + }, + }, cfg); + + const list = await sftp.list(cfg.directory); + expect(list.length).to.equal(1); + expect(list[0].name).to.equal('logo.svg'); + + const receiver = new TestEmitter(); + const msg = { + body: { + path: `${directory}/logo.svg`, + }, + }; + const result = await lookupObject.process.call(receiver, msg, cfg); + expect(result.body.name).to.equal('logo.svg'); + expect(callAttachmentProcessor.calledOnce).to.be.equal(true); + await sftp.delete(`${cfg.directory}logo.svg`); + await sftp.rmdir(cfg.directory, false); + attachmentProcessorStub.restore(); + }); + afterEach(async () => { await sftp.end(); }); diff --git a/spec-integration/pollingIntegration.spec.js b/spec-integration/pollingIntegration.spec.js new file mode 100644 index 0000000..3b8ac7d --- /dev/null +++ b/spec-integration/pollingIntegration.spec.js @@ -0,0 +1,94 @@ +const { expect } = require('chai'); +const EventEmitter = require('events'); +const bunyan = require('bunyan'); +const nock = require('nock'); +const Sftp = require('../lib/Sftp'); +const upload = require('../lib/actions/upload'); +const poll = require('../lib/triggers/polling'); +require('dotenv').config(); + +class TestEmitter extends EventEmitter { + constructor() { + super(); + this.data = []; + this.end = 0; + this.error = []; + this.logger = bunyan.createLogger({ name: 'dummy' }); + + this.on('data', (value) => this.data.push(value)); + this.on('error', (value) => this.error.push(value)); + this.on('end', () => { + this.end += 1; + }); + } +} + +// eslint-disable-next-line func-names +describe('SFTP integration test - polling', function () { + this.timeout(2000000); + let sftp; + let host; + let username; + let password; + let port; + let directory; + const testNumber = Math.floor(Math.random() * 10000); + + before(() => { + if (!process.env.SFTP_HOSTNAME) { throw new Error('Please set SFTP_HOSTNAME env variable to proceed'); } + host = process.env.SFTP_HOSTNAME; + username = process.env.USERNAME; + password = process.env.PASSWORD; + port = process.env.PORT; + directory = `/home/eiotesti/www/integration-test/test-${testNumber}/`; + }); + + it('Uploads and poll attachment', async () => { + nock('https://api.elastic.io/', { encodedQueryParams: true }) + .post('/v2/resources/storage/signed-url') + .reply(200, { put_url: 'http://api.io/some', get_url: 'http://api.io/some' }); + nock('http://api.io/', { encodedQueryParams: true }) + .put('/some').reply(200, { signedUrl: { put_url: 'http://api.io/some' } }); + + const cfg = { + host, + username, + password, + port, + directory, + }; + sftp = new Sftp(bunyan.createLogger({ name: 'dummy' }), cfg); + await sftp.connect(); + + const sender = new TestEmitter(); + const msg = { + body: {}, + attachments: { + 'logo.svg': { + url: 'https://app.elastic.io/img/logo.svg', + }, + }, + }; + const result = await upload.process.call(sender, msg, cfg); + + expect(result.body.results).to.be.an('array'); + expect(result.body.results.length).to.equal(1); + expect(result.body.results[0].attachment).to.equal('logo.svg'); + const list = await sftp.list(cfg.directory); + expect(list.length).to.equal(1); + expect(list[0].name).to.equal('logo.svg'); + expect(list[0].size).to.equal(4379); + + await poll.process.call(sender, {}, cfg); + + expect(sender.data[0].body.path).to.equal(`${cfg.directory}logo.svg`); + expect(sender.data[0].body.size).to.equal(4379); + + await sftp.delete(`${cfg.directory}logo.svg`); + await sftp.rmdir(cfg.directory, false); + }); + + afterEach(async () => { + await sftp.end(); + }); +}); diff --git a/spec-integration/verifyCredentials.spec.js b/spec-integration/verifyCredentials.spec.js index 1131620..f591ec2 100644 --- a/spec-integration/verifyCredentials.spec.js +++ b/spec-integration/verifyCredentials.spec.js @@ -5,15 +5,17 @@ const sinon = require('sinon'); const verifyCredentials = require('../verifyCredentials'); require('dotenv').config(); -describe('verifyCredentials', () => { +describe('verifyCredentials', function () { + this.timeout(100000); + const spy = sinon.spy(); let credentials; before(() => { credentials = { - host: process.env.HOSTNAME, + host: process.env.SFTP_HOSTNAME, port: Number(process.env.PORT), - username: process.env.USER, + username: process.env.USERNAME, password: process.env.PASSWORD, }; }); diff --git a/spec/action/lookupObject.spec.js b/spec/action/lookupObject.spec.js new file mode 100644 index 0000000..46cce50 --- /dev/null +++ b/spec/action/lookupObject.spec.js @@ -0,0 +1,100 @@ +const { expect } = require('chai'); +const sinon = require('sinon'); +const bunyan = require('bunyan'); +const { AttachmentProcessor } = require('@elastic.io/component-commons-library'); +const Sftp = require('../../lib/Sftp'); +const { SftpLookupObject } = require('../../lib/utils/lookupObjectUtil'); + +const logger = bunyan.createLogger({ name: 'dummy' }); + +describe('SFTP test - lookup file by file name', () => { + const buffer = Buffer.from('Hello'); + const res = { config: { url: 'https://url' } }; + const cfg = { + directory: 'www/test', + }; + const sftpClient = new Sftp(logger, cfg); + const lookupObjectAction = new SftpLookupObject(logger, sftpClient); + it('Lookup file by name process successful', async () => { + const msg = { + body: { + path: 'www/olhav/1.txt', + }, + }; + const list = [ + { + type: 'd', + name: '.elasticio_processed', + size: 4096, + }, + { + type: '-', + name: '1.txt', + size: 7, + accessTime: '1575379317000', + modifyTime: '1575291942000', + }, + ]; + const sftpClientListStub = sinon.stub(Sftp.prototype, 'list').returns(list); + const sftpClientGetStub = sinon.stub(Sftp.prototype, 'get').returns(buffer); + const attachStub = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment').returns(res); + + const expectedAttachments = { + '1.txt': { + size: 7, + url: 'https://url', + }, + }; + const expectedBody = { + type: '-', + name: '1.txt', + size: 7, + attachment_url: 'https://url', + accessTime: '2019-12-03T13:21:57.000Z', + modifyTime: '2019-12-02T13:05:42.000Z', + directory: 'www/olhav', + path: 'www/olhav/1.txt', + }; + + + const result = await lookupObjectAction.process(msg, cfg, {}); + + expect(result.body).to.deep.equal(expectedBody); + expect(result.attachments).to.deep.equal(expectedAttachments); + expect(sftpClientListStub.calledOnce).to.be.equal(true); + expect(sftpClientGetStub.calledOnce).to.be.equal(true); + expect(attachStub.calledOnce).to.be.equal(true); + sftpClientListStub.restore(); + sftpClientGetStub.restore(); + attachStub.restore(); + }); + + it('lookupObject Action getFile', async () => { + const dir = 'www/test'; + const filename = '1.txt'; + const list = [ + { + type: 'd', + name: '.elasticio_processed', + size: 4096, + }, + { + type: '-', + name: '1.txt', + size: 7, + }, + ]; + const sftpClientListStub = sinon.stub(Sftp.prototype, 'list'); + sftpClientListStub.withArgs(dir, new RegExp(filename)).returns(list); + sftpClientListStub.withArgs(dir, new RegExp('nonexists')).returns([]); + const result = await lookupObjectAction.getFile(dir, filename); + expect(result).to.deep.equal({ + type: '-', + name: '1.txt', + size: 7, + }); + const result2 = await lookupObjectAction.getFile(dir, 'nonexists'); + expect(result2).to.equal(null); + sftpClientListStub.restore(); + }); +}); diff --git a/spec/attachments.spec.js b/spec/attachments.spec.js index 9277643..a4b1920 100644 --- a/spec/attachments.spec.js +++ b/spec/attachments.spec.js @@ -7,7 +7,6 @@ const attachments = require('../lib/attachments'); // stub things const result = { config: { url: '/hello/world' } }; const self = { emit: sinon.spy() }; -let uploadAttachment = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment').resolves(result); // parameters const msg = { attachments: {} }; @@ -17,24 +16,26 @@ const contentLength = 10; describe('Attachment tests', () => { afterEach(() => { - uploadAttachment.restore(); self.emit.resetHistory(); }); it('Adds an attachment correctly and returns the correct message', async () => { + const uploadAttachment = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment').resolves(result); await attachments.addAttachment.call(self, msg, name, stream, contentLength); expect(uploadAttachment.calledOnceWithExactly(stream, 'stream')).to.be.equal(true); expect(msg).to.be.deep.equal({ attachments: { file: { url: '/hello/world', size: 10 } } }); + uploadAttachment.restore(); }); it('Emits an error upon failure', async () => { - uploadAttachment = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment').throws(new Error('This input should be rejected')); + const uploadAttachment = sinon.stub(AttachmentProcessor.prototype, 'uploadAttachment').throws(new Error('This input should be rejected')); await attachments.addAttachment.call(self, msg, name, 'not a stream', contentLength) .catch((e) => { expect(e.message).to.be.equal('This input should be rejected'); expect(uploadAttachment.getCall(0).args[0]).to.be.equal('not a stream'); expect(uploadAttachment.getCall(0).args[1]).to.be.equal('stream'); + uploadAttachment.restore(); }); }); }); diff --git a/spec/triggers/read.spec.js b/spec/triggers/read.spec.js index 7e36b2f..4e3f99b 100644 --- a/spec/triggers/read.spec.js +++ b/spec/triggers/read.spec.js @@ -12,9 +12,9 @@ require('dotenv').config(); describe('SFTP', () => { const sftp = new Sftp(bunyan.createLogger({ name: 'dummy' }), { - host: process.env.HOSTNAME, - port: process.env.PORT, - username: process.env.USER, + host: process.env.SFTP_HOSTNAME, + port: Number(process.env.PORT), + username: process.env.USERNAME, password: process.env.PASSWORD, }); let endStub;