diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index c1815e13..9d5749a4 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -109,3 +109,13 @@ model aqi_units { update_user_id String @db.VarChar(200) update_utc_timestamp DateTime @db.Timestamp(6) } + +model ftp_users { + id Int @id @default(autoincrement()) + username String @unique @db.VarChar(200) + email String @db.VarChar(200) + create_user_id String @db.VarChar(200) + create_utc_timestamp DateTime @db.Timestamp(6) + update_user_id String @db.VarChar(200) + update_utc_timestamp DateTime @db.Timestamp(6) +} diff --git a/backend/src/cron-job/cron-job.service.ts b/backend/src/cron-job/cron-job.service.ts index 111bc388..9b1b8883 100644 --- a/backend/src/cron-job/cron-job.service.ts +++ b/backend/src/cron-job/cron-job.service.ts @@ -112,7 +112,7 @@ export class CronJobService { } } - @Cron("0 0 */2 * * *") + @Cron("0 0 */2 * * *") // every 2 hours private async fetchLocations() { this.logger.log(`#######################################################`); this.logger.log(`Starting Code Table Cron Job`); diff --git a/backend/src/ftp/ftp.module.ts b/backend/src/ftp/ftp.module.ts index c99db29c..b1f49aa2 100644 --- a/backend/src/ftp/ftp.module.ts +++ b/backend/src/ftp/ftp.module.ts @@ -2,8 +2,10 @@ import { Module } from "@nestjs/common"; import { FtpService } from "./ftp.service"; import { FtpController } from "./ftp.controller"; import { FtpFileValidationService } from "./ftp_file_validation.service"; +import { NotificationsModule } from "src/notifications/notifications.module"; @Module({ + imports: [NotificationsModule], providers: [FtpService, FtpFileValidationService], controllers: [FtpController], }) diff --git a/backend/src/ftp/ftp.service.ts b/backend/src/ftp/ftp.service.ts index 06034f0b..2e29fa97 100644 --- a/backend/src/ftp/ftp.service.ts +++ b/backend/src/ftp/ftp.service.ts @@ -5,6 +5,8 @@ import * as path from "path"; import * as dotenv from "dotenv"; import { Writable } from "stream"; import { FtpFileValidationService } from "./ftp_file_validation.service"; +import { NotificationsService } from "src/notifications/notifications.service"; +import { PrismaService } from "nestjs-prisma"; dotenv.config(); @@ -14,7 +16,11 @@ export class FtpService { private client: ftp.Client; private remoteBasePath: string; - constructor(private ftpFileValidationService: FtpFileValidationService) { + constructor( + private ftpFileValidationService: FtpFileValidationService, + private notificationService: NotificationsService, + private prisma: PrismaService, + ) { this.client = new ftp.Client(); this.client.ftp.verbose = true; this.remoteBasePath = process.env.FTP_PATH; @@ -71,39 +77,47 @@ export class FtpService { const filePath: string = path.join(folderPath, file.name); this.logger.log(`Processing file: ${filePath}`); - const fileExtension = path - .extname(filePath) - .toLowerCase() - .replace(".", ""); - - const dataBuffer = []; - - const writableStream = new Writable({ - write(chunk, encoding, callback) { - dataBuffer.push(chunk); - callback(); - }, - }); try { - await this.client.downloadTo(writableStream, filePath); - const fileBuffer = Buffer.concat(dataBuffer); - // pass file buffer to validation - const errors: string[] = - await this.ftpFileValidationService.processFile( + let errors: string[] = []; + // if the file is > 10MB, don't download it + if (file.size > 10 * 1024 * 1024) { + errors = ["File size exceeds the limit of 10MB."]; + } else { + const dataBuffer = []; + // download file to a stream that puts chunks into an array + const writableStream = new Writable({ + write(chunk, encoding, callback) { + dataBuffer.push(chunk); + callback(); + }, + }); + // download file to the writable stream + await this.client.downloadTo(writableStream, filePath); + // convert chunk array to buffer + const fileBuffer = Buffer.concat(dataBuffer); + // pass file buffer to validation + errors = await this.ftpFileValidationService.processFile( fileBuffer, - fileExtension, + filePath, ); + } if (errors.length > 0) { this.logger.log(`Validation failure for: ${filePath}`); errors.forEach((error) => this.logger.log(error)); this.logger.log(``); // send out a notification to the file submitter & ministry contact outlining the errors - const username = folder.name; - this.logger.log(`Notifying ${username} of error`); + const ministryContact = ""; // should be obtained from file somehow + await this.notifyUserOfError( + folder.name, + file.name, + errors, + ministryContact, + ); } else { this.logger.log(`Validation success for: ${filePath}`); this.logger.log(``); // pass to file validation service + // await this.validationService.handleFile(file); // made up function call } // this.logger.log(`Cleaning up file: ${filePath}`); // await this.client.remove(filePath); @@ -124,7 +138,53 @@ export class FtpService { } } - // @Cron("* */5 * * * *") // every 5 minutes + /** + * Notifies the Data Submitter & Ministry Contact of the file validation errors. + * @param username + * @param fileName + * @param errors + * @param ministryContact + */ + async notifyUserOfError( + username: string, + fileName: string, + errors: string[], + ministryContact: string, + ) { + const ftpUser = await this.prisma.ftp_users.findUnique({ + where: { username: username }, + }); + const notificationVars = { + file_name: fileName, + user_account_name: username, + location_ids: [], + file_status: "FAILED", + errors: errors.join(","), + warnings: "", + }; + + // Notify the Data Submitter + if (this.isValidEmail(ftpUser.email)) { + await this.notificationService.sendDataSubmitterNotification( + ftpUser.email, + notificationVars, + ); + } + // Notify the Ministry Contact (if they have not disabled notifications) + if (this.isValidEmail(ministryContact)) { + await this.notificationService.sendContactNotification( + ministryContact, + notificationVars, + ); + } + } + + isValidEmail(email: string): boolean { + const emailRegex = /^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$/; + return emailRegex.test(email); + } + + // @Cron("0 */5 * * * *") // every 5 minutes // @Cron("0,30 * * * * *") // every 30s async handleCron() { this.logger.log("START ################"); diff --git a/backend/src/ftp/ftp_file_validation.service.ts b/backend/src/ftp/ftp_file_validation.service.ts index 780cfc6f..a7cb0434 100644 --- a/backend/src/ftp/ftp_file_validation.service.ts +++ b/backend/src/ftp/ftp_file_validation.service.ts @@ -1,24 +1,23 @@ import { Injectable, Logger } from "@nestjs/common"; +import path from "path"; import * as XLSX from "xlsx"; @Injectable() export class FtpFileValidationService { private readonly logger = new Logger(FtpFileValidationService.name); - async processFile( - fileBuffer: Buffer, - fileExtension: string, - ): Promise { + async processFile(fileBuffer: Buffer, filePath: string): Promise { const errors = []; // Required column headers const requiredHeaders = ["test1", "test2", "test3"]; // Check file size is under 10 MB if (fileBuffer.length > 10 * 1024 * 1024) { - errors.push("File size exceeds 10 MB."); + errors.push("File size exceeds the limit of 10 MB."); } // Determine file type using headers let fileType = await this.getFileType(fileBuffer); // csv and txt are not detected using file headers, use file extension as fallback + const fileExtension = path.extname(filePath).toLowerCase().replace(".", ""); if (!fileType) fileType = fileExtension; // Check row count and headers let headerError: string | null = null; diff --git a/backend/src/notifications/notifications.module.ts b/backend/src/notifications/notifications.module.ts index 9ac1198b..c80e3860 100644 --- a/backend/src/notifications/notifications.module.ts +++ b/backend/src/notifications/notifications.module.ts @@ -7,5 +7,6 @@ import { HttpModule } from "@nestjs/axios"; imports: [HttpModule], controllers: [NotificationsController], providers: [NotificationsService], + exports: [NotificationsService], }) export class NotificationsModule {} diff --git a/migrations/sql/V1.0.5__ftp_users_table.sql b/migrations/sql/V1.0.5__ftp_users_table.sql new file mode 100644 index 00000000..0fa351f7 --- /dev/null +++ b/migrations/sql/V1.0.5__ftp_users_table.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS enmods.ftp_users ( + id SERIAL PRIMARY KEY, + username varchar(200) NOT NULL UNIQUE, + email varchar(200) NOT NULL, + create_user_id varchar(200) NOT NULL, + create_utc_timestamp timestamp NOT NULL, + update_user_id varchar(200) NOT NULL, + update_utc_timestamp timestamp NOT NULL +); +INSERT INTO enmods.ftp_users ( + username, + email, + create_user_id, + create_utc_timestamp, + update_user_id, + update_utc_timestamp + ) +VALUES ( + 'mtennant', + 'mtennant@salussystems.com', + 'system', + (now() at time zone 'utc'), + 'system', + (now() at time zone 'utc') + ); \ No newline at end of file