Skip to content

Commit

Permalink
queue alerts and retry send
Browse files Browse the repository at this point in the history
  • Loading branch information
Tucsky committed Mar 6, 2024
1 parent 2a885b9 commit 6087a43
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 52 deletions.
10 changes: 7 additions & 3 deletions src/exchange.js
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,13 @@ class Exchange extends EventEmitter {
api._pending.push(pair)

if (api.readyState === WebSocket.OPEN) {
setTimeout(() => {
this.subscribePendingPairs(api)
})
this.schedule(
() => {
this.subscribePendingPairs(api)
},
'subscribe-' + api.url,
1000
)
}

return api
Expand Down
8 changes: 0 additions & 8 deletions src/exchanges/bybit.js
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,6 @@ class Bybit extends Exchange {

onApiRemoved(api) {
this.stopKeepAlive(api)

if (api.freezeTimeout) {
clearTimeout(api.freezeTimeout)
}

if (api.downTimeout) {
clearTimeout(api.downTimeout)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ class Server extends EventEmitter {
alertService.alertEndpoints[alert.endpoint] &&
alertService.alertEndpoints[alert.endpoint].user === user
) {
alertService.sendAlert(alert, alert.market, Date.now())
alertService.queueAlert(alert, alert.market, Date.now())
return `${alert.market} @${alert.price}`
}
}
Expand Down
124 changes: 84 additions & 40 deletions src/services/alert.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AlertService extends EventEmitter {
this.enabled = false
this.alerts = {}
this.alertEndpoints = {}
this.alertsQueue = []

if (config.influxCollectors && config.collect && !config.api) {
// node is a collector: listen for toggleAlerts op
Expand All @@ -25,10 +26,13 @@ class AlertService extends EventEmitter {
this.getAlerts()
}

console.log(config.publicVapidKey)
console.log(config.privateVapidKey)

if (config.publicVapidKey && config.privateVapidKey) {
this.enabled = true
webPush.setVapidDetails(
'mailto:[email protected]',
'mailto: [email protected]',
config.publicVapidKey,
config.privateVapidKey
)
Expand Down Expand Up @@ -336,11 +340,11 @@ class AlertService extends EventEmitter {
now - this.alertEndpoints[endpoint].timestamp >
config.alertEndpointExpiresAfter
) {
/* console.warn(
console.warn(
`[alert/get] removed expired endpoint (last updated ${ago(
this.alertEndpoints[endpoint].timestamp
)} ago)`
) */
)
delete this.alertEndpoints[endpoint]
}
}
Expand Down Expand Up @@ -510,7 +514,7 @@ class AlertService extends EventEmitter {
}
}

sendAlert(alert, market, timestamp, direction) {
queueAlert(alert, market, timestamp, direction) {
if (!this.alertEndpoints[alert.endpoint]) {
console.error(
`[alert/send] attempted to send alert without matching endpoint`,
Expand All @@ -521,12 +525,6 @@ class AlertService extends EventEmitter {

const elapsedTime = timestamp - alert.timestamp

console.log(
`[alert/send/${
this.alertEndpoints[alert.endpoint].user
}] send alert ${market} @${alert.price} (${getHms(elapsedTime)} after)`
)

alert.triggered = true

let message
Expand All @@ -553,7 +551,7 @@ class AlertService extends EventEmitter {
}

const payload = JSON.stringify({
title: `${market}`,
title,
body: message,
origin: alert.origin,
price: alert.price,
Expand All @@ -569,37 +567,83 @@ class AlertService extends EventEmitter {
type: 'triggered'
})

return webPush
.sendNotification(this.alertEndpoints[alert.endpoint], payload, {
vapidDetails: {
subject: 'mailto:[email protected]',
publicKey: config.publicVapidKey,
privateKey: config.privateVapidKey
},
contentEncoding: 'aes128gcm'
})
.then(() => {
return sleep(100)
})
.catch(err => {
if (
err instanceof webPush.WebPushError
) {
console.error(
`[alert/send] ${err.statusCode}: \n${err.body}`
)
this.alertsQueue.push({
attempts: 0,
market,
price: alert.price,
elapsedTime,
subscription: this.alertEndpoints[alert.endpoint],
payload
})

if (err.statusCode > 400 && err.statusCode < 499) {
// delete user
}
if (!this.isProcessingQueue) {
this.processQueue()
}
}

} else {
console.error(
`[alert/send] failed to send push notification`,
err.message
)
async processQueue() {
if (!this.alertsQueue.length) {
this.isProcessingQueue = false
console.log(`[alert.queue] no more alerts in queue`)
return
}

this.isProcessingQueue = true

const queuedAlert = this.alertsQueue.shift()
queuedAlert.attempts++

try {
console.log(
`[alert/send/${queuedAlert.subscription.user}.${
queuedAlert.attempts
}] send alert ${queuedAlert.market} @${
queuedAlert.price
} (after ${getHms(queuedAlert.elapsedTime)})`
)
await this.sendAlert(queuedAlert.subscription, queuedAlert.payload)
queuedAlert.succeeded = true
} catch (error) {
if (error instanceof webPush.WebPushError) {
console.error(
`[alert/send/${queuedAlert.subscription.user}.${queuedAlert.attempts}] ${error.message}\n\t-> ${error.statusCode}: ${error.body}`
)

if (error.statusCode > 400 && error.statusCode < 499) {
// delete user
}
})
} else {
console.error(
`[alert/send/${queuedAlert.subscription.user}.${queuedAlert.attempts}] failed to send push notification`,
error.message
)
}

/**
* @todo: only retry on network-related error
*/
if (queuedAlert.attempts > 4) {
console.error(
`[alert/send/${queuedAlert.subscription.user}.${queuedAlert.attempts}] won't be sent (too many attempts)`
)
} else {
this.alertsQueue.push(queuedAlert)
}
}

await sleep(1000)
this.processQueue()
}

sendAlert(subscription, payload) {
return webPush.sendNotification(subscription, payload, {
vapidDetails: {
subject: 'mailto: [email protected]',
publicKey: config.publicVapidKey,
privateKey: config.privateVapidKey
},
contentEncoding: 'aes128gcm'
})
}

/**
Expand Down Expand Up @@ -651,7 +695,7 @@ class AlertService extends EventEmitter {
console.log(
`[alert/checkPriceCrossover] ${alert.price} (ajusted to ${alert.priceCompare}) ${market} ${low} <-> ${high}`
)
await this.sendAlert(alert, market, now, direction)
this.queueAlert(alert, market, now, direction)

if (this.unregisterAlert(alert, market, true)) {
i--
Expand Down

0 comments on commit 6087a43

Please sign in to comment.