Skip to content

Commit

Permalink
Merge pull request #1346 from MoveOnOrg/load-messages
Browse files Browse the repository at this point in the history
add job to load messages in from a csv and fix some crucial bugs in twilio.js
  • Loading branch information
ibrand authored Jan 2, 2020
2 parents f3703ee + bb8d304 commit 70b51af
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 11 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"knex": "knex --knexfile ./knexfile.env.js",
"clean": "rm -rf $OUTPUT_DIR",
"lint": "eslint --fix --ext js --ext jsx src",
"process-message-csv": "./dev-tools/babel-run-with-env.js ./src/workers/process-message-csv.js",
"prod-build-client": "webpack --config ./webpack/config.js",
"prod-build-server": "babel ./src -d ./build/server --source-maps --copy-files; babel ./migrations -d ./build/server/migrations/ --source-maps --copy-files",
"prod-build": "npm run clean && npm run prod-build-client && npm run prod-build-server",
Expand Down
17 changes: 8 additions & 9 deletions src/server/api/lib/message-sending.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ export async function getLastMessage({ contactNumber, service }) {

export async function saveNewIncomingMessage(messageInstance) {
if (messageInstance.service_id) {
const countResult = await r.getCount(
r.knex("message").where("service_id", messageInstance.service_id)
);
if (countResult) {
console.error(
"DUPLICATE MESSAGE SAVED",
countResult.count,
messageInstance
);
const [duplicateMessage] = await r
.knex("message")
.where("service_id", messageInstance.service_id)
.select("id")
.limit(1);
if (duplicateMessage) {
console.error("DUPLICATE MESSAGE", duplicateMessage, messageInstance);
return;
}
}
await messageInstance.save();
Expand Down
9 changes: 7 additions & 2 deletions src/server/api/lib/twilio.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ async function convertMessagePartsToMessage(messageParts) {
const lastMessage = await getLastMessage({
contactNumber
});
if (!lastMessage) {
return;
}
return new Message({
contact_number: contactNumber,
user_number: userNumber,
is_from_contact: true,
text,
error_code: null,
service_id: serviceMessages[0].MessagingServiceSid,
service_id: firstPart.service_id,
assignment_id: lastMessage.assignment_id,
service: "twilio",
send_status: "DELIVERED"
Expand Down Expand Up @@ -345,7 +348,6 @@ async function handleIncomingMessage(message) {
user_number: userNumber,
contact_number: contactNumber
});

if (!process.env.JOBS_SAME_PROCESS) {
// If multiple processes, just insert the message part and let another job handle it
await r.knex("pending_message_part").insert(pendingMessagePart);
Expand All @@ -355,6 +357,9 @@ async function handleIncomingMessage(message) {
pendingMessagePart
]);
if (finalMessage) {
if (message.spokeCreatedAt) {
finalMessage.created_at = message.spokeCreatedAt;
}
await saveNewIncomingMessage(finalMessage);
}
}
Expand Down
39 changes: 39 additions & 0 deletions src/workers/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,45 @@ export async function handleIncomingMessageParts() {
}
}

export async function loadMessages(csvFile) {
return new Promise((resolve, reject) => {
Papa.parse(csvFile, {
header: true,
complete: ({ data, meta, errors }, file) => {
const fields = meta.fields;
console.log("FIELDS", fields);
console.log("FIRST LINE", data[0]);
const promises = [];
data.forEach(row => {
if (!row.contact_number) {
return;
}
const twilioMessage = {
From: `+1${row.contact_number}`,
To: `+1${row.user_number}`,
Body: row.text,
MessageSid: row.service_id,
MessagingServiceSid: row.service_id,
FromZip: row.zip, // unused at the moment
spokeCreatedAt: row.created_at
};
promises.push(serviceMap.twilio.handleIncomingMessage(twilioMessage));
});
console.log("Started all promises for CSV");
Promise.all(promises)
.then(doneDid => {
console.log(`Processed ${doneDid.length} rows for CSV`);
resolve(doneDid);
})
.catch(err => {
console.error("Error processing for CSV", err);
reject(err);
});
}
});
});
}

// Temporary fix for orgless users
// See https://github.com/MoveOnOrg/Spoke/issues/934
// and job-processes.js
Expand Down
20 changes: 20 additions & 0 deletions src/workers/process-message-csv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { loadMessages } from "./jobs";
import fs from "fs";

const csvFilename = process.argv.filter(f => /\.csv/.test(f))[0];

new Promise((resolve, reject) => {
fs.readFile(csvFilename, "utf8", function(err, contents) {
loadMessages(contents)
.then(msgs => {
resolve(msgs);
process.exit();
})
.catch(err => {
console.log(err);
reject(err);
console.log("Error", err);
process.exit();
});
});
});

0 comments on commit 70b51af

Please sign in to comment.