-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
per org messaging service #175
Changes from all commits
e5385d2
5ca0d6d
82eaf56
94deb7a
bb0a159
edbae67
f97b0fb
20c3282
5183526
509d2c7
76d05fb
2819fd9
958a5c7
5173f18
f0f7379
48c9b5a
821fc75
bcb38ee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
require("dotenv").config(); | ||
const _ = require("lodash"); | ||
const knex = require("knex"); | ||
|
||
const config = { | ||
client: "postgresql", | ||
connection: process.env.DATABASE_URL, | ||
pool: { | ||
min: process.env.ROW_CONCURRENCY, | ||
min: process.env.ROW_CONCURRENCY, | ||
} | ||
}; | ||
|
||
const db = knex(config); | ||
const BATCH_SIZE = 5000; | ||
|
||
const getMessagingServiceSIDs = () => { | ||
// Gather multiple messaging service SIDs (may be split across multiple env vars) | ||
const envVarKeys = Object.keys(process.env).filter(key => | ||
key.startsWith(`TWILIO_MESSAGE_SERVICE_SIDS`) | ||
); | ||
envVarKeys.sort(); | ||
|
||
let messagingServiceIds = []; | ||
for (const envVarKey of envVarKeys) { | ||
const envVarValue = process.env[envVarKey]; | ||
const newServiceIds = envVarValue | ||
.split(",") | ||
.map(serviceSid => serviceSid.trim()); | ||
messagingServiceIds = messagingServiceIds.concat(newServiceIds); | ||
} | ||
|
||
return messagingServiceIds; | ||
}; | ||
|
||
const MESSAGING_SERVICE_SIDS = getMessagingServiceSIDs(); | ||
|
||
const getMessageServiceSID = cell => { | ||
// Check for single message service | ||
if (!!process.env.TWILIO_MESSAGE_SERVICE_SID) { | ||
return process.env.TWILIO_MESSAGE_SERVICE_SID; | ||
} | ||
|
||
const messagingServiceIndex = deterministicIntWithinRange( | ||
cell, | ||
MESSAGING_SERVICE_SIDS.length | ||
); | ||
const messagingServiceId = MESSAGING_SERVICE_SIDS[messagingServiceIndex]; | ||
|
||
if (!messagingServiceId) | ||
throw new Error(`Could not find Twilio message service SID for ${cell}!`); | ||
|
||
return messagingServiceId; | ||
}; | ||
|
||
const doBatch = async () => { | ||
const { rows } = await knex.raw( | ||
` | ||
select distinct campaign_contact.cell | ||
from campaign_contact | ||
left join messaging_service_stick | ||
on messaging_service_stick.cell = campaign_contact.cell | ||
where messaging_service_stick.messaging_service_sid is null | ||
limit ${BATCH_SIZE} | ||
`, | ||
[organizationId, campaignId] | ||
); | ||
|
||
const cells = rows.map(r => r.cell); | ||
|
||
console.log("Doing ", cells.length); | ||
|
||
if (cells.length === 0) { | ||
return 0; | ||
} | ||
|
||
const toInsert = cells.map(c => ({ | ||
cell: c, | ||
organization_id: 1, | ||
messaging_service_sid: getMessageServiceSID(c) | ||
})); | ||
|
||
await knex("messaging_service_stick").insert(toInsert); | ||
|
||
console.log("Did ", cells.length); | ||
|
||
return cells.length; | ||
}; | ||
|
||
async function main() { | ||
let done = 0; | ||
let did = 0; | ||
while ((did = await doBatch()) > 0) { | ||
console.log("Did ", did); | ||
done = done + did; | ||
connsole.log("Done ", done); | ||
} | ||
|
||
for (let c of campaigns) { | ||
console.log("Doing campaign: ", c.id); | ||
await ensureAllNumbersHaveMessagingServiceSIDs(c.id, 1); | ||
console.log("...done"); | ||
} | ||
} | ||
|
||
main() | ||
.then(console.log) | ||
.catch(console.error); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
// Add index for fetching current assignment target | ||
exports.up = function(knex, Promise) { | ||
return Promise.all([ | ||
knex.schema.createTable("messaging_service", t => { | ||
t.text("messaging_service_sid").primary(); // if we choose not to have the foreign key on sticks, we won't need the index here | ||
t.integer("organization_id").references("organization(id)"); | ||
t.index("organization_id"); | ||
}), | ||
knex.schema.createTable("messaging_service_stick", t => { | ||
t.text("cell"); | ||
t.index("cell"); | ||
t.integer("organization_id").references("organization(id)"); | ||
t.text("messaging_service_sid").references( | ||
"messaging_service(messaging_service_sid)" | ||
); // for performance, we may want to skip the foreign key | ||
t.index( | ||
["cell", "organization_id"], | ||
"messaging_service_stick_cell_organization_index" | ||
); | ||
t.unique( | ||
["cell", "organization_id"], | ||
"messaging_service_stick_cell_organization_unique_constraint" | ||
); | ||
}) | ||
]); | ||
}; | ||
|
||
// Drop index for fetching current assignment target | ||
exports.down = function(knex, Promise) { | ||
return Promise.all([ | ||
knex.schema.dropTable("messaging_service"), | ||
knex.schema.dropTable("messaging_service_stick") | ||
]); | ||
}; |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,18 +1,93 @@ | ||||||
import { r } from "../../models"; | ||||||
|
||||||
export async function getLastMessage({ contactNumber, service }) { | ||||||
const lastMessage = await r | ||||||
.knex("message") | ||||||
.where({ | ||||||
contact_number: contactNumber, | ||||||
is_from_contact: false, | ||||||
service | ||||||
}) | ||||||
.orderBy("created_at", "desc") | ||||||
.limit(1) | ||||||
.first("assignment_id", "campaign_contact_id"); | ||||||
|
||||||
return lastMessage; | ||||||
/* | ||||||
This was changed to accommodate multiple organizationIds. There were two potential approaches: | ||||||
- option one: with campaign_id_options as select campaigns from organizationId, where campaign_id = campaign.id | ||||||
----------------------------------- | ||||||
with chosen_organization as ( | ||||||
select organization_id | ||||||
from messaging_service | ||||||
where messaging_service_sid = ? | ||||||
) | ||||||
with campaign_contact_option as ( | ||||||
select id | ||||||
from campaign_contact | ||||||
join campaign | ||||||
on campaign_contact.campaign_id = campaign.id | ||||||
where | ||||||
campaign.organization_id in ( | ||||||
select id from chosen_organization | ||||||
) | ||||||
and campaign_contact.cell = ? | ||||||
) | ||||||
select campaign_contact_id, assignment_id | ||||||
from message | ||||||
join campaign_contact_option | ||||||
on message.campaign_contact_id = campaign_contact_option.id | ||||||
where | ||||||
message.is_from_contact = false | ||||||
order by created_at desc | ||||||
limit 1 | ||||||
----------------------------------- | ||||||
|
||||||
- option two: join campaign_contact, join campaign, where campaign.org_id = org_id | ||||||
----------------------------------- | ||||||
select campaign_contact_id, assignment_id | ||||||
from message | ||||||
join campaign_contact | ||||||
on message.campaign_contact_id = campaign_contact.id | ||||||
join campaign | ||||||
on campaign.id = campaign_contact.campaign_id | ||||||
where | ||||||
campaign.organization_id = ? | ||||||
and campaign_contact.cell = ? | ||||||
and message.is_from_contact = false | ||||||
order by created_at desc | ||||||
limit 1 | ||||||
----------------------------------- | ||||||
|
||||||
- must do explain analyze | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
- both query options were pretty good – the campaign_contact.cell and message.campaign_contact_id | ||||||
index filters are fast enough and the result set to filter through small enough that the rest doesn't | ||||||
really matter | ||||||
- first one was much easier to plan, so going with that one | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
*/ | ||||||
|
||||||
export async function getCampaignContactAndAssignmentForIncomingMessage({ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is Twilio-specific and belongs in |
||||||
contactNumber, | ||||||
service, | ||||||
messaging_service_sid | ||||||
}) { | ||||||
const { rows } = await r.knex.raw( | ||||||
` | ||||||
with chosen_organization as ( | ||||||
select organization_id | ||||||
from messaging_service | ||||||
where messaging_service_sid = ? | ||||||
), | ||||||
campaign_contact_option as ( | ||||||
select campaign_contact.id | ||||||
from campaign_contact | ||||||
join campaign | ||||||
on campaign_contact.campaign_id = campaign.id | ||||||
where | ||||||
campaign.organization_id in ( | ||||||
select organization_id from chosen_organization | ||||||
) | ||||||
and campaign_contact.cell = ? | ||||||
) | ||||||
select campaign_contact_id, assignment_id | ||||||
from message | ||||||
join campaign_contact_option | ||||||
on message.campaign_contact_id = campaign_contact_option.id | ||||||
where | ||||||
message.is_from_contact = false | ||||||
order by created_at desc | ||||||
limit 1`, | ||||||
[messaging_service_sid, contactNumber] | ||||||
); | ||||||
|
||||||
return rows[0]; | ||||||
} | ||||||
|
||||||
export async function saveNewIncomingMessage(messageInstance) { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensureAllNumbersHaveMessagingServiceSIDs
is not defined or imported in this file. Thisfor (let c of campaigns)
block seems redundant?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, yes