Skip to content

Commit

Permalink
feat(platform): add support for job retries (constant, linear, and ex…
Browse files Browse the repository at this point in the history
…ponential)
azasypkin committed Dec 4, 2023
1 parent ca505e6 commit f3decab
Showing 41 changed files with 4,225 additions and 1,151 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion migrations/20231129185220_v1.0.0-alpha.4.sql
Original file line number Diff line number Diff line change
@@ -49,8 +49,8 @@ CREATE TABLE IF NOT EXISTS user_data_web_scraping_trackers
name TEXT NOT NULL COLLATE NOCASE,
url TEXT NOT NULL,
kind BLOB NOT NULL,
schedule TEXT,
job_id BLOB UNIQUE,
job_config BLOB,
data BLOB NOT NULL,
created_at INTEGER NOT NULL,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
1 change: 1 addition & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ use crate::{
};
use handlebars::Handlebars;

#[derive(Clone)]
pub struct Api<DR: DnsResolver, ET: EmailTransport> {
pub db: Database,
pub search_index: SearchIndex,
15 changes: 15 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -537,6 +537,21 @@ mod tests {
)
}

pub fn mock_schedule_in_secs(secs: &[u64]) -> String {
format!(
"{} * * * * *",
secs.iter()
.map(|secs| {
OffsetDateTime::now_utc()
.add(Duration::from_secs(*secs))
.second()
.to_string()
})
.collect::<Vec<_>>()
.join(",")
)
}

pub mod webauthn {
pub const SERIALIZED_PASSKEY: &str = r#"{
"cred": {
114 changes: 63 additions & 51 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
mod api_ext;
mod database_ext;
mod job_ext;
mod schedule_ext;
mod scheduler_job;
mod scheduler_job_config;
mod scheduler_job_metadata;
mod scheduler_job_retry_state;
mod scheduler_job_retry_strategy;
mod scheduler_jobs;
mod scheduler_store;

@@ -9,6 +16,12 @@ use std::{collections::HashSet, sync::Arc};
use tokio_cron_scheduler::{JobScheduler, SimpleJobCode, SimpleNotificationCode};
use uuid::Uuid;

pub use self::{
schedule_ext::ScheduleExt, scheduler_job::SchedulerJob,
scheduler_job_config::SchedulerJobConfig, scheduler_job_metadata::SchedulerJobMetadata,
scheduler_job_retry_state::SchedulerJobRetryState,
scheduler_job_retry_strategy::SchedulerJobRetryStrategy,
};
use crate::{
api::Api,
network::{DnsResolver, EmailTransport, EmailTransportError},
@@ -17,7 +30,6 @@ use crate::{
WebPageTrackersTriggerJob,
},
};
pub use scheduler_job::SchedulerJob;
use scheduler_store::SchedulerStore;

/// Defines a maximum number of jobs that can be retrieved from the database at once.
@@ -89,13 +101,13 @@ where
.map(Uuid::from)
.ok_or_else(|| anyhow!("The job does not have ID: `{:?}`", job_data))?;

let job_type = match SchedulerJob::try_from(job_data.extra.as_ref()) {
Ok(job_type) if unique_resumed_jobs.contains(&job_type) => {
let job_meta = match SchedulerJobMetadata::try_from(job_data.extra.as_ref()) {
Ok(job_meta) if unique_resumed_jobs.contains(&job_meta.job_type) => {
// There can only be one job of each type. If we detect that there are multiple, we log
// a warning and remove the job, keeping only the first one.
log::error!(
"Found multiple jobs of type `{:?}`. All duplicated jobs except for the first one will be removed.",
job_type
job_meta.job_type
);
db.remove_scheduler_job(job_id).await?;
continue;
@@ -110,12 +122,12 @@ where
db.remove_scheduler_job(job_id).await?;
continue;
}
Ok(job_type) => job_type,
Ok(job_meta) => job_meta,
};

// First try to resume the job, and if it's not possible, the job will be removed and
// re-scheduled at a later step if needed.
let job = match &job_type {
let job = match &job_meta.job_type {
SchedulerJob::WebPageTrackersTrigger { kind } => {
WebPageTrackersTriggerJob::try_resume(self.api.clone(), job_id, job_data, *kind)
.await?
@@ -134,17 +146,17 @@ where

match job {
Some(job) => {
log::debug!("Resumed job (`{:?}`): {}.", job_type, job_id);
log::debug!("Resumed job (`{:?}`): {}.", job_meta.job_type, job_id);
self.inner_scheduler.add(job).await?;

if job_type.is_unique() {
unique_resumed_jobs.insert(job_type);
if job_meta.job_type.is_unique() {
unique_resumed_jobs.insert(job_meta.job_type);
}
}
None => {
log::warn!(
"Failed to resume job (`{:?}`): {}. The job will be removed and re-scheduled if needed.",
job_type,
job_meta.job_type,
job_id
);
db.remove_scheduler_job(job_id).await?;
@@ -158,7 +170,7 @@ where

#[cfg(test)]
mod tests {
use super::Scheduler;
use super::{Scheduler, SchedulerJobConfig, SchedulerJobMetadata};
use crate::{
scheduler::scheduler_job::SchedulerJob,
tests::{mock_api, mock_user},
@@ -172,7 +184,7 @@ mod tests {

fn mock_job_data(
job_id: JobId,
typ: SchedulerJob,
job_type: SchedulerJob,
schedule: impl Into<String>,
) -> JobStoredData {
JobStoredData {
@@ -184,7 +196,7 @@ mod tests {
ran: false,
stopped: false,
last_updated: None,
extra: typ.try_into().unwrap(),
extra: SchedulerJobMetadata::new(job_type).try_into().unwrap(),
job: Some(JobStored::CronJob(CronJob {
schedule: schedule.into(),
})),
@@ -212,12 +224,15 @@ mod tests {
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 1,
schedule: Some("1 2 3 4 5 6 2030".to_string()),
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -230,12 +245,15 @@ mod tests {
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 1,
schedule: Some("1 2 3 4 5 6 2030".to_string()),
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -319,27 +337,18 @@ mod tests {

let mut jobs = jobs
.into_iter()
.map(|job_result| {
job_result.and_then(|job| {
Ok((
job.job_type,
SchedulerJob::try_from(job.extra.as_ref())?,
job.job,
))
})
})
.map(|job_result| job_result.map(|job| (job.job_type, job.extra, job.job)))
.collect::<anyhow::Result<Vec<(_, _, _)>>>()?;
jobs.sort_by(|job_a, job_b| {
Vec::try_from(job_a.1)
.unwrap()
.cmp(&Vec::try_from(job_b.1).unwrap())
});
jobs.sort_by(|job_a, job_b| job_a.1.cmp(&job_b.1));

assert_debug_snapshot!(jobs, @r###"
[
(
0,
WebPageTrackersSchedule,
[
1,
0,
],
Some(
CronJob(
CronJob {
@@ -350,7 +359,10 @@ mod tests {
),
(
0,
WebPageTrackersFetch,
[
2,
0,
],
Some(
CronJob(
CronJob {
@@ -361,7 +373,10 @@ mod tests {
),
(
0,
NotificationsSend,
[
3,
0,
],
Some(
CronJob(
CronJob {
@@ -426,27 +441,18 @@ mod tests {

let mut jobs = jobs
.into_iter()
.map(|job_result| {
job_result.and_then(|job| {
Ok((
job.job_type,
SchedulerJob::try_from(job.extra.as_ref())?,
job.job,
))
})
})
.map(|job_result| job_result.map(|job| (job.job_type, job.extra, job.job)))
.collect::<anyhow::Result<Vec<(_, _, _)>>>()?;
jobs.sort_by(|job_a, job_b| {
Vec::try_from(job_a.1)
.unwrap()
.cmp(&Vec::try_from(job_b.1).unwrap())
});
jobs.sort_by(|job_a, job_b| job_a.1.cmp(&job_b.1));

assert_debug_snapshot!(jobs, @r###"
[
(
0,
WebPageTrackersSchedule,
[
1,
0,
],
Some(
CronJob(
CronJob {
@@ -457,7 +463,10 @@ mod tests {
),
(
0,
WebPageTrackersFetch,
[
2,
0,
],
Some(
CronJob(
CronJob {
@@ -468,7 +477,10 @@ mod tests {
),
(
0,
NotificationsSend,
[
3,
0,
],
Some(
CronJob(
CronJob {
152 changes: 152 additions & 0 deletions src/scheduler/api_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use crate::{
api::Api,
network::{DnsResolver, EmailTransport},
scheduler::{SchedulerJobMetadata, SchedulerJobRetryState, SchedulerJobRetryStrategy},
};
use std::ops::Add;
use time::OffsetDateTime;
use tokio_cron_scheduler::JobId;

pub struct SchedulerApiExt<'a, DR: DnsResolver, ET: EmailTransport> {
api: &'a Api<DR, ET>,
}

impl<'a, DR: DnsResolver, ET: EmailTransport> SchedulerApiExt<'a, DR, ET> {
/// Creates Scheduler API.
pub fn new(api: &'a Api<DR, ET>) -> Self {
Self { api }
}

/// Tries to schedule a retry for a specified job. If retry is not possible, returns `None`.
pub async fn schedule_retry(
&self,
job_id: JobId,
retry_strategy: &SchedulerJobRetryStrategy,
) -> anyhow::Result<Option<SchedulerJobRetryState>> {
let db = &self.api.db;
let SchedulerJobMetadata { job_type, retry } =
db.get_scheduler_job_meta(job_id).await?.ok_or_else(|| {
anyhow::anyhow!(
"Could not find a job state for a scheduler job ('{}').",
job_id
)
})?;

let retry_attempts = retry
.map(|retry_state| retry_state.attempts)
.unwrap_or_default();
// Check if retry is possible.
let retry_state = if retry_attempts >= retry_strategy.max_attempts() {
log::warn!(
"Retry limit reached ('{}') for a scheduler job ('{job_id}').",
retry_attempts
);
None
} else {
let retry_interval = retry_strategy.interval(retry_attempts);
log::debug!(
"Scheduling a retry for job ('{job_id}') in {}.",
humantime::format_duration(retry_interval),
);

Some(SchedulerJobRetryState {
attempts: retry_attempts + 1,
next_at: OffsetDateTime::now_utc().add(retry_interval),
})
};

db.update_scheduler_job_meta(
job_id,
SchedulerJobMetadata {
job_type,
retry: retry_state,
},
)
.await?;

Ok(retry_state)
}
}

impl<DR: DnsResolver, ET: EmailTransport> Api<DR, ET> {
/// Returns an API to work with scheduler jobs.
pub fn scheduler(&self) -> SchedulerApiExt<DR, ET> {
SchedulerApiExt::new(self)
}
}

#[cfg(test)]
mod tests {
use crate::{
scheduler::{SchedulerJob, SchedulerJobMetadata, SchedulerJobRetryStrategy},
tests::mock_api,
};
use std::{ops::Add, time::Duration};
use time::OffsetDateTime;
use tokio_cron_scheduler::{CronJob, JobStored, JobStoredData, JobType};
use uuid::uuid;

#[tokio::test]
async fn properly_schedules_retry() -> anyhow::Result<()> {
let api = mock_api().await?;
let scheduler = api.scheduler();

let job_id = uuid!("67e55044-10b1-426f-9247-bb680e5fe0c8");
let job = JobStoredData {
id: Some(job_id.into()),
last_updated: Some(946720800u64),
last_tick: Some(946720700u64),
next_tick: 946720900u64,
count: 3,
job_type: JobType::Cron as i32,
extra: SchedulerJobMetadata::new(SchedulerJob::NotificationsSend).try_into()?,
ran: true,
stopped: false,
job: Some(JobStored::CronJob(CronJob {
schedule: "0 0 0 1 1 * *".to_string(),
})),
};

api.db.upsert_scheduler_job(&job).await?;

let now = OffsetDateTime::now_utc();
let retry_state = scheduler
.schedule_retry(
job_id,
&SchedulerJobRetryStrategy::Constant {
interval: Duration::from_secs(120),
max_attempts: 2,
},
)
.await?
.unwrap();
assert_eq!(retry_state.attempts, 1);
assert!(retry_state.next_at >= now.add(Duration::from_secs(120)));

let retry_state = scheduler
.schedule_retry(
job_id,
&SchedulerJobRetryStrategy::Constant {
interval: Duration::from_secs(120),
max_attempts: 2,
},
)
.await?
.unwrap();
assert_eq!(retry_state.attempts, 2);
assert!(retry_state.next_at >= now.add(Duration::from_secs(120)));

let retry_state = scheduler
.schedule_retry(
job_id,
&SchedulerJobRetryStrategy::Constant {
interval: Duration::from_secs(120),
max_attempts: 2,
},
)
.await?;
assert!(retry_state.is_none());

Ok(())
}
}
489 changes: 315 additions & 174 deletions src/scheduler/database_ext.rs

Large diffs are not rendered by default.

62 changes: 62 additions & 0 deletions src/scheduler/job_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use crate::scheduler::{SchedulerJob, SchedulerJobMetadata};
use tokio_cron_scheduler::{Job, JobStoredData};

pub trait JobExt {
/// Populates job's `extra` field with the job metadata that includes type.
fn set_job_type(&mut self, job_type: SchedulerJob) -> anyhow::Result<()>;
}

impl JobExt for Job {
/// Populates job's `extra` field with the job metadata that includes type.
fn set_job_type(&mut self, job_type: SchedulerJob) -> anyhow::Result<()> {
let job_data = self.job_data()?;
self.set_job_data(JobStoredData {
extra: SchedulerJobMetadata::new(job_type).try_into()?,
..job_data
})?;

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::JobExt;
use crate::scheduler::{SchedulerJob, SchedulerJobMetadata};
use std::time::Duration;
use tokio_cron_scheduler::{Job, JobStoredData};

#[tokio::test]
async fn can_set_job_type() -> anyhow::Result<()> {
let mut job = Job::new_one_shot(Duration::from_secs(10), |_, _| {})?;
let original_job_data = job.job_data()?;
assert!(original_job_data.extra.is_empty());

job.set_job_type(SchedulerJob::WebPageTrackersSchedule)?;

let job_data = job.job_data()?;
assert_eq!(
SchedulerJobMetadata::try_from(job_data.extra.as_slice())?,
SchedulerJobMetadata::new(SchedulerJob::WebPageTrackersSchedule)
);

job.set_job_type(SchedulerJob::NotificationsSend)?;

let job_data = job.job_data()?;
assert_eq!(
SchedulerJobMetadata::try_from(job_data.extra.as_slice())?,
SchedulerJobMetadata::new(SchedulerJob::NotificationsSend)
);

// Other fields should not be affected.
assert_eq!(
job_data,
JobStoredData {
extra: Vec::try_from(SchedulerJobMetadata::new(SchedulerJob::NotificationsSend))?,
..original_job_data
}
);

Ok(())
}
}
55 changes: 55 additions & 0 deletions src/scheduler/schedule_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use cron::Schedule;
use std::time::Duration;

pub trait ScheduleExt {
/// Returns the minimum interval between occurrences.
fn min_interval(&self) -> anyhow::Result<Duration>;
}

impl ScheduleExt for Schedule {
/// Returns the minimum interval between occurrences. To calculate it, we take the first 10
/// upcoming occurrences and calculate the interval between each of them. Then we take the
/// smallest interval.
fn min_interval(&self) -> anyhow::Result<Duration> {
let mut minimum_interval = Duration::MAX;
let next_occurrences = self.upcoming(chrono::Utc).take(100).collect::<Vec<_>>();
for (index, occurrence) in next_occurrences.iter().enumerate().skip(1) {
let interval = (*occurrence - next_occurrences[index - 1]).to_std()?;
if interval < minimum_interval {
minimum_interval = interval;
}
}

Ok(minimum_interval)
}
}

#[cfg(test)]
mod tests {
use super::ScheduleExt;
use cron::Schedule;
use std::{str::FromStr, time::Duration};

#[test]
fn can_calculate_min_interval() -> anyhow::Result<()> {
let schedule = Schedule::from_str("0 * * * * * *")?;
assert_eq!(schedule.min_interval()?, Duration::from_secs(60));

let schedule = Schedule::from_str("0 0 * * * * *")?;
assert_eq!(schedule.min_interval()?, Duration::from_secs(3600));
let schedule = Schedule::from_str("@hourly")?;
assert_eq!(schedule.min_interval()?, Duration::from_secs(3600));

let schedule = Schedule::from_str("0 0 0 * * * *")?;
assert_eq!(schedule.min_interval()?, Duration::from_secs(24 * 3600));
let schedule = Schedule::from_str("@daily")?;
assert_eq!(schedule.min_interval()?, Duration::from_secs(24 * 3600));

let schedule = Schedule::from_str("0 0 0 * * 1 *")?;
assert_eq!(schedule.min_interval()?, Duration::from_secs(7 * 24 * 3600));
let schedule = Schedule::from_str("@weekly")?;
assert_eq!(schedule.min_interval()?, Duration::from_secs(7 * 24 * 3600));

Ok(())
}
}
81 changes: 0 additions & 81 deletions src/scheduler/scheduler_job.rs
Original file line number Diff line number Diff line change
@@ -22,27 +22,10 @@ impl SchedulerJob {
}
}

impl TryFrom<&[u8]> for SchedulerJob {
type Error = anyhow::Error;

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
Ok(postcard::from_bytes(value)?)
}
}

impl TryFrom<SchedulerJob> for Vec<u8> {
type Error = anyhow::Error;

fn try_from(value: SchedulerJob) -> Result<Self, Self::Error> {
Ok(postcard::to_stdvec(&value)?)
}
}

#[cfg(test)]
mod tests {
use super::SchedulerJob;
use crate::utils::WebPageTrackerKind;
use insta::assert_debug_snapshot;

#[test]
fn properly_determines_unique_jobs() -> anyhow::Result<()> {
@@ -60,68 +43,4 @@ mod tests {

Ok(())
}

#[test]
fn serialize() -> anyhow::Result<()> {
assert_eq!(
Vec::try_from(SchedulerJob::WebPageTrackersTrigger {
kind: WebPageTrackerKind::WebPageResources
})?,
vec![0, 0]
);
assert_eq!(
Vec::try_from(SchedulerJob::WebPageTrackersTrigger {
kind: WebPageTrackerKind::WebPageContent
})?,
vec![0, 1]
);
assert_eq!(
Vec::try_from(SchedulerJob::WebPageTrackersSchedule)?,
vec![1]
);
assert_eq!(Vec::try_from(SchedulerJob::WebPageTrackersFetch)?, vec![2]);
assert_eq!(Vec::try_from(SchedulerJob::NotificationsSend)?, vec![3]);

Ok(())
}

#[test]
fn deserialize() -> anyhow::Result<()> {
assert_eq!(
SchedulerJob::try_from([0, 0].as_ref())?,
SchedulerJob::WebPageTrackersTrigger {
kind: WebPageTrackerKind::WebPageResources
}
);

assert_eq!(
SchedulerJob::try_from([0, 1].as_ref())?,
SchedulerJob::WebPageTrackersTrigger {
kind: WebPageTrackerKind::WebPageContent
}
);

assert_eq!(
SchedulerJob::try_from([1].as_ref())?,
SchedulerJob::WebPageTrackersSchedule
);

assert_eq!(
SchedulerJob::try_from([2].as_ref())?,
SchedulerJob::WebPageTrackersFetch
);

assert_eq!(
SchedulerJob::try_from([3].as_ref())?,
SchedulerJob::NotificationsSend
);

assert_debug_snapshot!(SchedulerJob::try_from([4].as_ref()), @r###"
Err(
SerdeDeCustom,
)
"###);

Ok(())
}
}
16 changes: 16 additions & 0 deletions src/scheduler/scheduler_job_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use crate::scheduler::SchedulerJobRetryStrategy;
use serde::{Deserialize, Serialize};

/// Represents a job configuration that can be scheduled.
#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct SchedulerJobConfig {
/// Defines a schedule for the job.
pub schedule: String,
/// Defines a retry strategy for the job.
#[serde(skip_serializing_if = "Option::is_none")]
pub retry_strategy: Option<SchedulerJobRetryStrategy>,
/// Indicates whether the job result should result into a notification. If retry strategy is
/// defined, the error notification will be sent only if the job fails after all the retries.
pub notifications: bool,
}
146 changes: 146 additions & 0 deletions src/scheduler/scheduler_job_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use crate::scheduler::{SchedulerJob, SchedulerJobRetryState};
use serde::{Deserialize, Serialize};

/// Secutils.dev specific metadata of the scheduler job.
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct SchedulerJobMetadata {
/// The type of the job.
pub job_type: SchedulerJob,
/// The state of the job if it is being retried.
pub retry: Option<SchedulerJobRetryState>,
}

impl SchedulerJobMetadata {
/// Create a new job state without retry state.
pub fn new(job_type: SchedulerJob) -> Self {
Self {
job_type,
retry: None,
}
}
}

impl TryFrom<&[u8]> for SchedulerJobMetadata {
type Error = anyhow::Error;

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
Ok(postcard::from_bytes(value)?)
}
}

impl TryFrom<SchedulerJobMetadata> for Vec<u8> {
type Error = anyhow::Error;

fn try_from(value: SchedulerJobMetadata) -> Result<Self, Self::Error> {
Ok(postcard::to_stdvec(&value)?)
}
}

#[cfg(test)]
mod tests {
use super::SchedulerJob;
use crate::scheduler::{SchedulerJobMetadata, SchedulerJobRetryState};
use insta::assert_debug_snapshot;
use time::OffsetDateTime;

#[test]
fn properly_creates_metadata() -> anyhow::Result<()> {
assert_eq!(
SchedulerJobMetadata::new(SchedulerJob::WebPageTrackersSchedule),
SchedulerJobMetadata {
job_type: SchedulerJob::WebPageTrackersSchedule,
retry: None
}
);

assert_eq!(
SchedulerJobMetadata::new(SchedulerJob::NotificationsSend),
SchedulerJobMetadata {
job_type: SchedulerJob::NotificationsSend,
retry: None
}
);

Ok(())
}

#[test]
fn serialize() -> anyhow::Result<()> {
assert_eq!(
Vec::try_from(SchedulerJobMetadata::new(
SchedulerJob::WebPageTrackersSchedule
))?,
vec![1, 0]
);
assert_eq!(
Vec::try_from(SchedulerJobMetadata {
job_type: SchedulerJob::WebPageTrackersSchedule,
retry: Some(SchedulerJobRetryState {
attempts: 10,
next_at: OffsetDateTime::from_unix_timestamp(946720800)?,
})
})?,
vec![1, 1, 10, 160, 31, 1, 10, 0, 0, 0, 0, 0, 0]
);

assert_eq!(
Vec::try_from(SchedulerJobMetadata::new(SchedulerJob::NotificationsSend))?,
vec![3, 0]
);
assert_eq!(
Vec::try_from(SchedulerJobMetadata {
job_type: SchedulerJob::NotificationsSend,
retry: Some(SchedulerJobRetryState {
attempts: 10,
next_at: OffsetDateTime::from_unix_timestamp(946720800)?,
})
})?,
vec![3, 1, 10, 160, 31, 1, 10, 0, 0, 0, 0, 0, 0]
);

Ok(())
}

#[test]
fn deserialize() -> anyhow::Result<()> {
assert_eq!(
SchedulerJobMetadata::try_from([1, 0].as_ref())?,
SchedulerJobMetadata::new(SchedulerJob::WebPageTrackersSchedule)
);

assert_eq!(
SchedulerJobMetadata::try_from([1, 1, 10, 160, 31, 1, 10, 0, 0, 0, 0, 0, 0].as_ref())?,
SchedulerJobMetadata {
job_type: SchedulerJob::WebPageTrackersSchedule,
retry: Some(SchedulerJobRetryState {
attempts: 10,
next_at: OffsetDateTime::from_unix_timestamp(946720800)?,
})
}
);

assert_eq!(
SchedulerJobMetadata::try_from([3, 0].as_ref())?,
SchedulerJobMetadata::new(SchedulerJob::NotificationsSend)
);

assert_eq!(
SchedulerJobMetadata::try_from([3, 1, 10, 160, 31, 1, 10, 0, 0, 0, 0, 0, 0].as_ref())?,
SchedulerJobMetadata {
job_type: SchedulerJob::NotificationsSend,
retry: Some(SchedulerJobRetryState {
attempts: 10,
next_at: OffsetDateTime::from_unix_timestamp(946720800)?,
})
}
);

assert_debug_snapshot!(SchedulerJobMetadata::try_from([4].as_ref()), @r###"
Err(
SerdeDeCustom,
)
"###);

Ok(())
}
}
11 changes: 11 additions & 0 deletions src/scheduler/scheduler_job_retry_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;

/// Describes the state of a job that is being retried.
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub struct SchedulerJobRetryState {
/// How many times the job has been retried.
pub attempts: u32,
/// The time at which the job will be retried.
pub next_at: OffsetDateTime,
}
210 changes: 210 additions & 0 deletions src/scheduler/scheduler_job_retry_strategy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationMilliSeconds};
use std::time::Duration;

#[serde_as]
#[derive(Serialize, Deserialize, Debug, Copy, Clone, Hash, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum SchedulerJobRetryStrategy {
/// The job will be retried with a constant interval (1s -> 1s -> 1s).
#[serde(rename_all = "camelCase")]
Constant {
#[serde_as(as = "DurationMilliSeconds<u64>")]
interval: Duration,
max_attempts: u32,
},
/// The job will be retried with an exponential interval (1s -> 2s -> 4s -> 8s).
#[serde(rename_all = "camelCase")]
Exponential {
#[serde_as(as = "DurationMilliSeconds<u64>")]
initial_interval: Duration,
multiplier: u32,
#[serde_as(as = "DurationMilliSeconds<u64>")]
max_interval: Duration,
max_attempts: u32,
},
/// The job will be retried with a linear interval (1s -> 2s -> 3s).
#[serde(rename_all = "camelCase")]
Linear {
#[serde_as(as = "DurationMilliSeconds<u64>")]
initial_interval: Duration,
#[serde_as(as = "DurationMilliSeconds<u64>")]
increment: Duration,
#[serde_as(as = "DurationMilliSeconds<u64>")]
max_interval: Duration,
max_attempts: u32,
},
}

impl SchedulerJobRetryStrategy {
/// Calculates the interval for the next retry attempt.
pub fn interval(&self, attempt: u32) -> Duration {
match self {
Self::Constant { interval, .. } => *interval,
Self::Exponential {
initial_interval,
multiplier,
max_interval,
..
} => multiplier
.checked_pow(attempt)
.and_then(|multiplier| initial_interval.checked_mul(multiplier))
.map(|interval| interval.min(*max_interval))
.unwrap_or_else(|| *max_interval),
Self::Linear {
initial_interval,
increment,
max_interval,
..
} => increment
.checked_mul(attempt)
.and_then(|increment| initial_interval.checked_add(increment))
.map(|interval| interval.min(*max_interval))
.unwrap_or_else(|| *max_interval),
}
}

/// Returns the maximum number of attempts.
pub fn max_attempts(&self) -> u32 {
match self {
Self::Constant { max_attempts, .. } => *max_attempts,
Self::Exponential { max_attempts, .. } => *max_attempts,
Self::Linear { max_attempts, .. } => *max_attempts,
}
}

/// Returns the minimum retry interval.
pub fn min_interval(&self) -> &Duration {
match self {
Self::Constant { interval, .. } => interval,
Self::Exponential {
initial_interval, ..
}
| Self::Linear {
initial_interval, ..
} => initial_interval,
}
}
}

#[cfg(test)]
mod tests {
use super::SchedulerJobRetryStrategy;
use std::time::Duration;

#[test]
fn properly_detects_max_number_of_attempts() {
assert_eq!(
SchedulerJobRetryStrategy::Constant {
interval: Duration::from_secs(1),
max_attempts: 10,
}
.max_attempts(),
10
);
assert_eq!(
SchedulerJobRetryStrategy::Exponential {
initial_interval: Duration::from_secs(1),
multiplier: 2,
max_interval: Duration::from_secs(10),
max_attempts: 15,
}
.max_attempts(),
15
);
assert_eq!(
SchedulerJobRetryStrategy::Linear {
initial_interval: Duration::from_secs(1),
increment: Duration::from_secs(1),
max_interval: Duration::from_secs(10),
max_attempts: 20,
}
.max_attempts(),
20
);
}

#[test]
fn properly_detects_min_interval() {
assert_eq!(
SchedulerJobRetryStrategy::Constant {
interval: Duration::from_secs(1),
max_attempts: 10,
}
.min_interval(),
&Duration::from_secs(1)
);
assert_eq!(
SchedulerJobRetryStrategy::Exponential {
initial_interval: Duration::from_secs(2),
multiplier: 2,
max_interval: Duration::from_secs(10),
max_attempts: 15,
}
.min_interval(),
&Duration::from_secs(2)
);
assert_eq!(
SchedulerJobRetryStrategy::Linear {
initial_interval: Duration::from_secs(3),
increment: Duration::from_secs(1),
max_interval: Duration::from_secs(10),
max_attempts: 20,
}
.min_interval(),
&Duration::from_secs(3)
);
}

#[test]
fn properly_calculates_constant_interval() {
let retry_strategy = SchedulerJobRetryStrategy::Constant {
interval: Duration::from_secs(1),
max_attempts: 10,
};
assert_eq!(retry_strategy.interval(0), Duration::from_secs(1));
assert_eq!(retry_strategy.interval(1), Duration::from_secs(1));
assert_eq!(retry_strategy.interval(2), Duration::from_secs(1));
assert_eq!(retry_strategy.interval(u32::MAX), Duration::from_secs(1));
}

#[test]
fn properly_calculates_linear_interval() {
let retry_strategy = SchedulerJobRetryStrategy::Linear {
initial_interval: Duration::from_secs(1),
increment: Duration::from_secs(1),
max_interval: Duration::from_secs(5),
max_attempts: 10,
};
assert_eq!(retry_strategy.interval(0), Duration::from_secs(1));
assert_eq!(retry_strategy.interval(1), Duration::from_secs(2));
assert_eq!(retry_strategy.interval(2), Duration::from_secs(3));
assert_eq!(retry_strategy.interval(3), Duration::from_secs(4));
assert_eq!(retry_strategy.interval(4), Duration::from_secs(5));
assert_eq!(retry_strategy.interval(5), Duration::from_secs(5));
assert_eq!(retry_strategy.interval(6), Duration::from_secs(5));
assert_eq!(retry_strategy.interval(100), Duration::from_secs(5));
assert_eq!(retry_strategy.interval(u32::MAX), Duration::from_secs(5));
}

#[test]
fn properly_calculates_exponential_interval() {
let retry_strategy = SchedulerJobRetryStrategy::Exponential {
initial_interval: Duration::from_secs(1),
multiplier: 2,
max_interval: Duration::from_secs(100),
max_attempts: 10,
};
assert_eq!(retry_strategy.interval(0), Duration::from_secs(1));
assert_eq!(retry_strategy.interval(1), Duration::from_secs(2));
assert_eq!(retry_strategy.interval(2), Duration::from_secs(4));
assert_eq!(retry_strategy.interval(3), Duration::from_secs(8));
assert_eq!(retry_strategy.interval(4), Duration::from_secs(16));
assert_eq!(retry_strategy.interval(5), Duration::from_secs(32));
assert_eq!(retry_strategy.interval(6), Duration::from_secs(64));
assert_eq!(retry_strategy.interval(7), Duration::from_secs(100));
assert_eq!(retry_strategy.interval(100), Duration::from_secs(100));
assert_eq!(retry_strategy.interval(u32::MAX), Duration::from_secs(100));
}
}
18 changes: 10 additions & 8 deletions src/scheduler/scheduler_jobs/notifications_send_job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
api::Api,
network::{DnsResolver, EmailTransport, EmailTransportError},
scheduler::scheduler_job::SchedulerJob,
scheduler::{job_ext::JobExt, scheduler_job::SchedulerJob},
};
use std::{sync::Arc, time::Instant};
use tokio_cron_scheduler::{Job, JobId, JobScheduler, JobStoredData};
@@ -50,11 +50,7 @@ impl NotificationsSendJob {
},
)?;

let job_data = job.job_data()?;
job.set_job_data(JobStoredData {
extra: SchedulerJob::NotificationsSend.try_into()?,
..job_data
})?;
job.set_job_type(SchedulerJob::NotificationsSend)?;

Ok(job)
}
@@ -104,7 +100,9 @@ mod tests {
use super::{NotificationsSendJob, MAX_NOTIFICATIONS_TO_SEND};
use crate::{
notifications::{NotificationContent, NotificationDestination},
scheduler::{scheduler_job::SchedulerJob, scheduler_store::SchedulerStore},
scheduler::{
scheduler_job::SchedulerJob, scheduler_store::SchedulerStore, SchedulerJobMetadata,
},
tests::{mock_api_with_config, mock_config, mock_schedule_in_sec, mock_user},
};
use cron::Schedule;
@@ -128,7 +126,9 @@ mod tests {
ran: false,
stopped: false,
last_updated: None,
extra: SchedulerJob::NotificationsSend.try_into().unwrap(),
extra: SchedulerJobMetadata::new(SchedulerJob::NotificationsSend)
.try_into()
.unwrap(),
job: Some(JobStored::CronJob(CronJob {
schedule: "0 0 * * * *".to_string(),
})),
@@ -151,6 +151,7 @@ mod tests {
0,
[
3,
0,
],
Some(
CronJob(
@@ -185,6 +186,7 @@ mod tests {
0,
[
3,
0,
],
Some(
CronJob(
1,186 changes: 1,075 additions & 111 deletions src/scheduler/scheduler_jobs/web_page_trackers_fetch_job.rs

Large diffs are not rendered by default.

74 changes: 46 additions & 28 deletions src/scheduler/scheduler_jobs/web_page_trackers_schedule_job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{
api::Api,
network::{DnsResolver, EmailTransport},
scheduler::{scheduler_job::SchedulerJob, scheduler_jobs::WebPageTrackersTriggerJob},
scheduler::{
job_ext::JobExt, scheduler_job::SchedulerJob, scheduler_jobs::WebPageTrackersTriggerJob,
},
utils::{WebPageTracker, WebPageTrackerTag},
};
use std::sync::Arc;
@@ -45,11 +47,7 @@ impl WebPageTrackersScheduleJob {
},
)?;

let job_data = job.job_data()?;
job.set_job_data(JobStoredData {
extra: SchedulerJob::WebPageTrackersSchedule.try_into()?,
..job_data
})?;
job.set_job_type(SchedulerJob::WebPageTrackersSchedule)?;

Ok(job)
}
@@ -99,8 +97,8 @@ impl WebPageTrackersScheduleJob {
continue;
}

let schedule = if let Some(schedule) = tracker.settings.schedule {
schedule
let schedule = if let Some(job_config) = tracker.job_config {
job_config.schedule
} else {
log::error!(
"Found an unscheduled tracker ({}) that doesn't have tracking schedule, skipping…",
@@ -126,7 +124,10 @@ impl WebPageTrackersScheduleJob {
mod tests {
use super::WebPageTrackersScheduleJob;
use crate::{
scheduler::{scheduler_job::SchedulerJob, scheduler_store::SchedulerStore},
scheduler::{
scheduler_job::SchedulerJob, scheduler_store::SchedulerStore, SchedulerJobConfig,
SchedulerJobMetadata,
},
tests::{mock_api_with_config, mock_config, mock_user},
utils::{WebPageTrackerCreateParams, WebPageTrackerKind, WebPageTrackerSettings},
};
@@ -151,7 +152,9 @@ mod tests {
ran: false,
stopped: false,
last_updated: None,
extra: SchedulerJob::WebPageTrackersSchedule.try_into().unwrap(),
extra: SchedulerJobMetadata::new(SchedulerJob::WebPageTrackersSchedule)
.try_into()
.unwrap(),
job: Some(JobStored::CronJob(CronJob {
schedule: "0 0 * * * *".to_string(),
})),
@@ -174,6 +177,7 @@ mod tests {
0,
[
1,
0,
],
Some(
CronJob(
@@ -209,6 +213,7 @@ mod tests {
0,
[
1,
0,
],
Some(
CronJob(
@@ -259,11 +264,14 @@ mod tests {
settings: WebPageTrackerSettings {
revisions: 1,
delay: Duration::from_millis(2000),
enable_notifications: true,
schedule: Some("1 2 3 4 5 6 2030".to_string()),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -277,11 +285,14 @@ mod tests {
settings: WebPageTrackerSettings {
revisions: 1,
delay: Duration::from_millis(2000),
enable_notifications: true,
schedule: Some("1 2 3 4 5 6 2035".to_string()),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2035".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -295,11 +306,14 @@ mod tests {
settings: WebPageTrackerSettings {
revisions: 1,
delay: Duration::from_millis(2000),
enable_notifications: true,
schedule: Some("1 2 3 4 5 6 2040".to_string()),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2040".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -372,9 +386,9 @@ mod tests {
let resources_jobs = jobs
.iter()
.filter_map(|job_data| {
let job_type = SchedulerJob::try_from(job_data.extra.as_ref()).unwrap();
let job_meta = SchedulerJobMetadata::try_from(job_data.extra.as_ref()).unwrap();
if matches!(
job_type,
job_meta.job_type,
SchedulerJob::WebPageTrackersTrigger {
kind: WebPageTrackerKind::WebPageResources
}
@@ -394,9 +408,9 @@ mod tests {
let content_jobs = jobs
.iter()
.filter_map(|job_data| {
let job_type = SchedulerJob::try_from(job_data.extra.as_ref()).unwrap();
let job_meta = SchedulerJobMetadata::try_from(job_data.extra.as_ref()).unwrap();
if matches!(
job_type,
job_meta.job_type,
SchedulerJob::WebPageTrackersTrigger {
kind: WebPageTrackerKind::WebPageContent
}
@@ -437,11 +451,10 @@ mod tests {
settings: WebPageTrackerSettings {
revisions: 1,
delay: Duration::from_millis(2000),
enable_notifications: true,
schedule: None,
scripts: Default::default(),
headers: Default::default(),
},
job_config: None,
},
)
.await?;
@@ -456,11 +469,10 @@ mod tests {
settings: WebPageTrackerSettings {
revisions: 1,
delay: Duration::from_millis(2000),
enable_notifications: true,
schedule: None,
scripts: Default::default(),
headers: Default::default(),
},
job_config: None,
},
)
.await?;
@@ -536,11 +548,14 @@ mod tests {
settings: WebPageTrackerSettings {
revisions: 0,
delay: Duration::from_millis(2000),
enable_notifications: true,
schedule: Some("1 2 3 4 5 6 2030".to_string()),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -554,11 +569,14 @@ mod tests {
settings: WebPageTrackerSettings {
revisions: 0,
delay: Duration::from_millis(2000),
enable_notifications: true,
schedule: Some("1 2 3 4 5 6 2030".to_string()),
scripts: Default::default(),
headers: Default::default(),
},
job_config: Some(SchedulerJobConfig {
schedule: "1 2 3 4 5 6 2030".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
64 changes: 39 additions & 25 deletions src/scheduler/scheduler_jobs/web_page_trackers_trigger_job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
api::Api,
network::{DnsResolver, EmailTransport},
scheduler::scheduler_job::SchedulerJob,
scheduler::{job_ext::JobExt, scheduler_job::SchedulerJob},
utils::WebPageTrackerKind,
};
use std::sync::Arc;
@@ -23,15 +23,15 @@ impl WebPageTrackersTriggerJob {
) -> anyhow::Result<Option<Job>> {
// First, check if the tracker job exists.
let web_scraping = api.web_scraping();
let Some((tracker_id, tracker_settings)) = (match tracker_kind {
let Some((tracker_id, tracker_settings, tracker_job_config)) = (match tracker_kind {
WebPageTrackerKind::WebPageResources => web_scraping
.get_resources_tracker_by_job_id(job_id)
.await?
.map(|tracker| (tracker.id, tracker.settings)),
.map(|tracker| (tracker.id, tracker.settings, tracker.job_config)),
WebPageTrackerKind::WebPageContent => web_scraping
.get_content_tracker_by_job_id(job_id)
.await?
.map(|tracker| (tracker.id, tracker.settings)),
.map(|tracker| (tracker.id, tracker.settings, tracker.job_config)),
}) else {
log::warn!(
"Web page tracker job reference doesn't exist, the job ('{job_id}') will be removed."
@@ -52,9 +52,9 @@ impl WebPageTrackersTriggerJob {
};

// Then, check if the tracker still has a schedule.
let Some(schedule) = tracker_settings.schedule else {
let Some(job_config) = tracker_job_config else {
log::warn!(
"Web page tracker ('{}') no longer has a schedule, the job ('{job_id}') will be removed.",
"Web page tracker ('{}') no longer has a job config, the job ('{job_id}') will be removed.",
tracker_id
);
web_scraping
@@ -64,7 +64,7 @@ impl WebPageTrackersTriggerJob {
};

// If we changed the job parameters, we need to remove the old job and create a new one.
let mut new_job = Self::create(api.clone(), schedule, tracker_kind).await?;
let mut new_job = Self::create(api.clone(), job_config.schedule, tracker_kind).await?;
Ok(if new_job.job_data()?.job == existing_job_data.job {
new_job.set_job_data(existing_job_data)?;
Some(new_job)
@@ -90,7 +90,7 @@ impl WebPageTrackersTriggerJob {
// up stopped jobs, processes them, and then un-stops. Stopped flag is basically
// serving as a pending processing flag. Eventually we might need to add a separate
// table for pending jobs.
if let Err(err) = db.set_scheduler_job_stopped_state(uuid, true).await {
if let Err(err) = db.reset_scheduler_job_state(uuid, true).await {
log::error!(
"Error marking web page tracker trigger job as pending: {}",
err
@@ -101,11 +101,7 @@ impl WebPageTrackersTriggerJob {
})
})?;

let job_data = job.job_data()?;
job.set_job_data(JobStoredData {
extra: SchedulerJob::WebPageTrackersTrigger { kind: tracker_kind }.try_into()?,
..job_data
})?;
job.set_job_type(SchedulerJob::WebPageTrackersTrigger { kind: tracker_kind })?;

Ok(job)
}
@@ -115,7 +111,10 @@ impl WebPageTrackersTriggerJob {
mod tests {
use super::WebPageTrackersTriggerJob;
use crate::{
scheduler::{scheduler_job::SchedulerJob, scheduler_store::SchedulerStore},
scheduler::{
scheduler_job::SchedulerJob, scheduler_store::SchedulerStore, SchedulerJobConfig,
SchedulerJobMetadata,
},
tests::{mock_api, mock_user},
utils::{
WebPageTracker, WebPageTrackerCreateParams, WebPageTrackerKind, WebPageTrackerSettings,
@@ -139,7 +138,7 @@ mod tests {
ran: false,
stopped: false,
last_updated: None,
extra: job_type.try_into().unwrap(),
extra: SchedulerJobMetadata::new(job_type).try_into().unwrap(),
job: Some(JobStored::CronJob(CronJob {
schedule: "0 0 * * * *".to_string(),
})),
@@ -174,6 +173,7 @@ mod tests {
[
0,
0,
0,
],
Some(
CronJob(
@@ -189,6 +189,7 @@ mod tests {
[
0,
1,
0,
],
Some(
CronJob(
@@ -223,12 +224,15 @@ mod tests {
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 4,
schedule: Some("0 0 * * * *".to_string()),
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -259,6 +263,7 @@ mod tests {
[
0,
0,
0,
],
Some(
CronJob(
@@ -306,12 +311,15 @@ mod tests {
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 4,
schedule: Some("0 0 * * * *".to_string()),
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -342,6 +350,7 @@ mod tests {
[
0,
1,
0,
],
Some(
CronJob(
@@ -389,12 +398,15 @@ mod tests {
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 4,
schedule: Some("1 0 * * * *".to_string()),
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: Some(SchedulerJobConfig {
schedule: "1 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
@@ -455,12 +467,11 @@ mod tests {
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 4,
schedule: None,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: None,
},
)
.await?;
@@ -517,12 +528,15 @@ mod tests {
url: "https://localhost:1234/my/app?q=2".parse()?,
settings: WebPageTrackerSettings {
revisions: 0,
schedule: Some("0 0 * * * *".to_string()),
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * * *".to_string(),
retry_strategy: None,
notifications: true,
}),
},
)
.await?;
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -36,7 +36,7 @@ pub use self::{
WebPageTrackerUpdateParams, WebScraperContentRequest, WebScraperContentRequestScripts,
WebScraperContentResponse, WebScraperErrorResponse, WebScraperResource,
WebScraperResourcesRequest, WebScraperResourcesRequestScripts, WebScraperResourcesResponse,
WEB_PAGE_CONTENT_TRACKER_EXTRACT_SCRIPT_NAME,
WebScrapingApiExt, WEB_PAGE_CONTENT_TRACKER_EXTRACT_SCRIPT_NAME,
WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME,
},
web_security::{
12 changes: 6 additions & 6 deletions src/utils/api_ext.rs
Original file line number Diff line number Diff line change
@@ -6,11 +6,11 @@ use crate::{
};
use std::borrow::Cow;

pub struct UtilsApi<'a> {
pub struct UtilsApiExt<'a> {
db: Cow<'a, Database>,
}

impl<'a> UtilsApi<'a> {
impl<'a> UtilsApiExt<'a> {
/// Creates Utils API.
pub fn new(db: &'a Database) -> Self {
Self {
@@ -26,21 +26,21 @@ impl<'a> UtilsApi<'a> {

impl<DR: DnsResolver, ET: EmailTransport> Api<DR, ET> {
/// Returns an API to retrieve available utils.
pub fn utils(&self) -> UtilsApi {
UtilsApi::new(&self.db)
pub fn utils(&self) -> UtilsApiExt {
UtilsApiExt::new(&self.db)
}
}

#[cfg(test)]
mod tests {
use super::UtilsApi;
use super::UtilsApiExt;
use crate::tests::mock_db;
use insta::assert_debug_snapshot;

#[actix_rt::test]
async fn can_get_all_utils() -> anyhow::Result<()> {
let mock_db = mock_db().await?;
let api = UtilsApi::new(&mock_db);
let api = UtilsApiExt::new(&mock_db);

assert_debug_snapshot!(api.get_all().await?, @r###"
[
160 changes: 113 additions & 47 deletions src/utils/web_scraping.rs

Large diffs are not rendered by default.

1,262 changes: 1,032 additions & 230 deletions src/utils/web_scraping/api_ext.rs

Large diffs are not rendered by default.

90 changes: 55 additions & 35 deletions src/utils/web_scraping/api_ext/web_page_tracker_create_params.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::WebPageTrackerSettings;
use crate::{scheduler::SchedulerJobConfig, utils::WebPageTrackerSettings};
use serde::Deserialize;
use url::Url;

@@ -11,13 +11,18 @@ pub struct WebPageTrackerCreateParams {
pub url: Url,
/// Settings of the web page tracker.
pub settings: WebPageTrackerSettings,
/// Configuration for a job, if tracker needs to be scheduled for automatic change detection.
pub job_config: Option<SchedulerJobConfig>,
}

#[cfg(test)]
mod tests {
use crate::utils::{
web_scraping::api_ext::WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME,
WebPageTrackerCreateParams, WebPageTrackerSettings,
use crate::{
scheduler::{SchedulerJobConfig, SchedulerJobRetryStrategy},
utils::{
web_scraping::api_ext::WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME,
WebPageTrackerCreateParams, WebPageTrackerSettings,
},
};
use std::time::Duration;
use url::Url;
@@ -27,58 +32,64 @@ mod tests {
assert_eq!(
serde_json::from_str::<WebPageTrackerCreateParams>(
r#"
{
"name": "pk",
"url": "https://secutils.dev",
"settings": {
"revisions": 3,
"delay": 2000,
"enableNotifications": true
{
"name": "tck",
"url": "https://secutils.dev",
"settings": {
"revisions": 3,
"delay": 2000
}
}
}
"#
"#
)?,
WebPageTrackerCreateParams {
name: "pk".to_string(),
name: "tck".to_string(),
url: Url::parse("https://secutils.dev")?,
settings: WebPageTrackerSettings {
revisions: 3,
schedule: None,
delay: Duration::from_millis(2000),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
},
job_config: None,
}
);

assert_eq!(
serde_json::from_str::<WebPageTrackerCreateParams>(
r#"
{
"name": "pk",
"url": "https://secutils.dev",
"settings": {
"revisions": 3,
"delay": 2000,
"schedule": "0 0 * * *",
"scripts": {
"resourceFilterMap": "return resource;"
},
"headers": {
"cookie": "my-cookie"
{
"name": "tck",
"url": "https://secutils.dev",
"settings": {
"revisions": 3,
"delay": 2000,
"scripts": {
"resourceFilterMap": "return resource;"
},
"headers": {
"cookie": "my-cookie"
}
},
"enableNotifications": true
"jobConfig": {
"schedule": "0 0 * * *",
"retryStrategy": {
"type": "exponential",
"initialInterval": 1234,
"multiplier": 2,
"maxInterval": 120000,
"maxAttempts": 5
},
"notifications": true
}
}
}
"#
"#
)?,
WebPageTrackerCreateParams {
name: "pk".to_string(),
name: "tck".to_string(),
url: Url::parse("https://secutils.dev")?,
settings: WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2000),
scripts: Some(
[(
@@ -93,9 +104,18 @@ mod tests {
[("cookie".to_string(), "my-cookie".to_string())]
.into_iter()
.collect(),
),
enable_notifications: true,
)
},
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * *".to_string(),
retry_strategy: Some(SchedulerJobRetryStrategy::Exponential {
initial_interval: Duration::from_millis(1234),
multiplier: 2,
max_interval: Duration::from_secs(120),
max_attempts: 5,
}),
notifications: true,
}),
}
);

149 changes: 120 additions & 29 deletions src/utils/web_scraping/api_ext/web_page_tracker_update_params.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
use crate::utils::WebPageTrackerSettings;
use serde::Deserialize;
use crate::{scheduler::SchedulerJobConfig, utils::WebPageTrackerSettings};
use serde::{Deserialize, Deserializer};
use url::Url;

#[derive(Deserialize, Debug, Default, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
#[serde(default)]
pub struct WebPageTrackerUpdateParams {
/// Arbitrary name of the web page tracker.
pub name: Option<String>,
/// URL of the web page to track.
pub url: Option<Url>,
/// Settings of the web page tracker.
pub settings: Option<WebPageTrackerSettings>,
/// Configuration for a job, if tracker needs to be scheduled for automatic change detection.
/// We use nested `Option` to distinguish between `null` and `undefined` values.
#[serde(deserialize_with = "deserialize_optional_field")]
pub job_config: Option<Option<SchedulerJobConfig>>,
}

fn deserialize_optional_field<'de, T, D>(deserializer: D) -> Result<Option<Option<T>>, D::Error>
where
D: Deserializer<'de>,
T: Deserialize<'de>,
{
Ok(Some(Option::deserialize(deserializer)?))
}

#[cfg(test)]
mod tests {
use crate::utils::{
WebPageTrackerSettings, WebPageTrackerUpdateParams,
WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME,
use crate::{
scheduler::{SchedulerJobConfig, SchedulerJobRetryStrategy},
utils::{
WebPageTrackerSettings, WebPageTrackerUpdateParams,
WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME,
},
};
use std::time::Duration;
use url::Url;
@@ -27,45 +43,111 @@ mod tests {
assert_eq!(
serde_json::from_str::<WebPageTrackerUpdateParams>(
r#"
{
"name": "pk"
}
"#
{
"name": "tck"
}
"#
)?,
WebPageTrackerUpdateParams {
name: Some("pk".to_string()),
name: Some("tck".to_string()),
url: None,
settings: None,
job_config: None,
}
);

assert_eq!(
serde_json::from_str::<WebPageTrackerUpdateParams>(
r#"
{
"name": "pk",
"url": "https://secutils.dev",
"settings": {
"revisions": 3,
"delay": 2000,
"schedule": "0 0 * * *",
"scripts": {
"resourceFilterMap": "return resource;"
},
"headers": {
"cookie": "my-cookie"
{
"settings": {
"revisions": 3,
"delay": 2000,
"scripts": {
"resourceFilterMap": "return resource;"
},
"headers": {
"cookie": "my-cookie"
}
}
}
"#
)?,
WebPageTrackerUpdateParams {
name: None,
url: None,
settings: Some(WebPageTrackerSettings {
revisions: 3,
delay: Duration::from_millis(2000),
scripts: Some(
[(
WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME.to_string(),
"return resource;".to_string()
)]
.into_iter()
.collect()
),
headers: Some(
[("cookie".to_string(), "my-cookie".to_string())]
.into_iter()
.collect(),
)
}),
job_config: None
}
);

assert_eq!(
serde_json::from_str::<WebPageTrackerUpdateParams>(
r#"
{
"jobConfig": null
}
"#
)?,
WebPageTrackerUpdateParams {
name: None,
url: None,
settings: None,
job_config: Some(None)
}
);

assert_eq!(
serde_json::from_str::<WebPageTrackerUpdateParams>(
r#"
{
"name": "tck",
"url": "https://secutils.dev",
"settings": {
"revisions": 3,
"delay": 2000,
"scripts": {
"resourceFilterMap": "return resource;"
},
"headers": {
"cookie": "my-cookie"
}
},
"enableNotifications": true
"jobConfig": {
"schedule": "0 0 * * *",
"retryStrategy": {
"type": "exponential",
"initialInterval": 1234,
"multiplier": 2,
"maxInterval": 120000,
"maxAttempts": 5
},
"notifications": true
}
}
}
"#
"#
)?,
WebPageTrackerUpdateParams {
name: Some("pk".to_string()),
name: Some("tck".to_string()),
url: Some(Url::parse("https://secutils.dev")?),
settings: Some(WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2000),
scripts: Some(
[(
@@ -79,9 +161,18 @@ mod tests {
[("cookie".to_string(), "my-cookie".to_string())]
.into_iter()
.collect(),
),
enable_notifications: true,
)
}),
job_config: Some(Some(SchedulerJobConfig {
schedule: "0 0 * * *".to_string(),
retry_strategy: Some(SchedulerJobRetryStrategy::Exponential {
initial_interval: Duration::from_millis(1234),
multiplier: 2,
max_interval: Duration::from_secs(120),
max_attempts: 5,
}),
notifications: true,
})),
}
);

344 changes: 334 additions & 10 deletions src/utils/web_scraping/database_ext.rs

Large diffs are not rendered by default.

166 changes: 140 additions & 26 deletions src/utils/web_scraping/database_ext/raw_web_page_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::utils::{WebPageTracker, WebPageTrackerSettings, WebPageTrackerTag};
use crate::{
scheduler::{SchedulerJobConfig, SchedulerJobRetryStrategy},
utils::{WebPageTracker, WebPageTrackerSettings, WebPageTrackerTag},
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, time::Duration};
use time::OffsetDateTime;
@@ -10,9 +13,9 @@ pub(super) struct RawWebPageTracker {
pub name: String,
pub url: String,
pub kind: Vec<u8>,
pub schedule: Option<String>,
pub user_id: i64,
pub job_id: Option<Vec<u8>>,
pub job_config: Option<Vec<u8>>,
pub data: Vec<u8>,
pub created_at: i64,
}
@@ -23,15 +26,66 @@ pub(super) struct RawWebPageTrackerData<Tag: WebPageTrackerTag> {
pub delay: u64,
pub scripts: Option<HashMap<String, String>>,
pub headers: Option<HashMap<String, String>>,
pub enable_notifications: bool,
pub meta: Option<Tag::TrackerMeta>,
}

#[derive(Serialize, Deserialize)]
struct RawSchedulerJobConfig(String, Option<RawSchedulerJobRetryStrategy>, bool);

#[derive(Serialize, Deserialize)]
enum RawSchedulerJobRetryStrategy {
Constant(Duration, u32),
Exponential(Duration, u32, Duration, u32),
Linear(Duration, Duration, Duration, u32),
}

impl<Tag: WebPageTrackerTag> TryFrom<RawWebPageTracker> for WebPageTracker<Tag> {
type Error = anyhow::Error;

fn try_from(raw: RawWebPageTracker) -> Result<Self, Self::Error> {
let raw_data = postcard::from_bytes::<RawWebPageTrackerData<Tag>>(&raw.data)?;

let job_config = if let Some(job_config) = raw.job_config {
let RawSchedulerJobConfig(schedule, retry_strategy, notifications) =
postcard::from_bytes(&job_config)?;
Some(SchedulerJobConfig {
schedule,
retry_strategy: retry_strategy.map(|retry_strategy| match retry_strategy {
RawSchedulerJobRetryStrategy::Constant(interval, max_attempts) => {
SchedulerJobRetryStrategy::Constant {
interval,
max_attempts,
}
}
RawSchedulerJobRetryStrategy::Exponential(
initial_interval,
multiplier,
max_interval,
max_attempts,
) => SchedulerJobRetryStrategy::Exponential {
initial_interval,
multiplier,
max_interval,
max_attempts,
},
RawSchedulerJobRetryStrategy::Linear(
initial_interval,
increment,
max_interval,
max_attempts,
) => SchedulerJobRetryStrategy::Linear {
initial_interval,
increment,
max_interval,
max_attempts,
},
}),
notifications,
})
} else {
None
};

Ok(WebPageTracker {
id: Uuid::from_slice(raw.id.as_slice())?,
name: raw.name,
@@ -41,13 +95,12 @@ impl<Tag: WebPageTrackerTag> TryFrom<RawWebPageTracker> for WebPageTracker<Tag>
.job_id
.map(|job_id| Uuid::from_slice(job_id.as_slice()))
.transpose()?,
job_config,
settings: WebPageTrackerSettings {
revisions: raw_data.revisions,
delay: Duration::from_millis(raw_data.delay),
schedule: raw.schedule,
scripts: raw_data.scripts,
headers: raw_data.headers,
enable_notifications: raw_data.enable_notifications,
},
created_at: OffsetDateTime::from_unix_timestamp(raw.created_at)?,
meta: raw_data.meta,
@@ -64,19 +117,59 @@ impl<Tag: WebPageTrackerTag> TryFrom<&WebPageTracker<Tag>> for RawWebPageTracker
delay: item.settings.delay.as_millis() as u64,
scripts: item.settings.scripts.clone(),
headers: item.settings.headers.clone(),
enable_notifications: item.settings.enable_notifications,
meta: item.meta.clone(),
};

let job_config = if let Some(SchedulerJobConfig {
schedule,
retry_strategy,
notifications,
}) = &item.job_config
{
Some(postcard::to_stdvec(&RawSchedulerJobConfig(
schedule.to_string(),
retry_strategy.map(|retry_strategy| match retry_strategy {
SchedulerJobRetryStrategy::Constant {
interval,
max_attempts,
} => RawSchedulerJobRetryStrategy::Constant(interval, max_attempts),
SchedulerJobRetryStrategy::Exponential {
initial_interval,
multiplier,
max_interval,
max_attempts,
} => RawSchedulerJobRetryStrategy::Exponential(
initial_interval,
multiplier,
max_interval,
max_attempts,
),
SchedulerJobRetryStrategy::Linear {
initial_interval,
increment,
max_interval,
max_attempts,
} => RawSchedulerJobRetryStrategy::Linear(
initial_interval,
increment,
max_interval,
max_attempts,
),
}),
*notifications,
))?)
} else {
None
};

Ok(RawWebPageTracker {
id: item.id.into(),
name: item.name.clone(),
url: item.url.to_string(),
kind: Tag::KIND.try_into()?,
// Move schedule to a dedicated database table field to allow searching.
schedule: item.settings.schedule.clone(),
user_id: *item.user_id,
job_id: item.job_id.as_ref().map(|job_id| (*job_id).into()),
job_config,
data: postcard::to_stdvec(&raw_data)?,
created_at: item.created_at.unix_timestamp(),
})
@@ -87,6 +180,7 @@ impl<Tag: WebPageTrackerTag> TryFrom<&WebPageTracker<Tag>> for RawWebPageTracker
mod tests {
use super::RawWebPageTracker;
use crate::{
scheduler::{SchedulerJobConfig, SchedulerJobRetryStrategy},
tests::mock_user,
utils::{
WebPageResourcesTrackerTag, WebPageTracker, WebPageTrackerSettings,
@@ -108,10 +202,10 @@ mod tests {
name: "tk".to_string(),
url: "https://secutils.dev".to_string(),
kind: vec![0],
schedule: None,
user_id: *mock_user()?.id,
job_id: None,
data: vec![1, 0, 0, 0, 0, 0],
job_config: None,
data: vec![1, 0, 0, 0, 0],
// January 1, 2000 10:00:00
created_at: 946720800,
})?,
@@ -121,13 +215,12 @@ mod tests {
url: Url::parse("https://secutils.dev")?,
user_id: mock_user()?.id,
job_id: None,
job_config: None,
settings: WebPageTrackerSettings {
revisions: 1,
schedule: None,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: false,
headers: Default::default()
},
created_at: OffsetDateTime::from_unix_timestamp(946720800)?,
meta: None
@@ -142,18 +235,21 @@ mod tests {
name: "tk".to_string(),
url: "https://secutils.dev".to_string(),
kind: vec![0],
schedule: Some("0 0 * * *".to_string()),
user_id: *mock_user()?.id,
job_id: Some(
uuid!("00000000-0000-0000-0000-000000000002")
.as_bytes()
.to_vec()
),
job_config: Some(vec![
9, 48, 32, 48, 32, 42, 32, 42, 32, 42, 1, 1, 1, 128, 157, 202, 111, 2, 120, 0,
5, 1
]),
data: vec![
1, 208, 15, 1, 1, 17, 114, 101, 115, 111, 117, 114, 99, 101, 70, 105, 108, 116,
101, 114, 77, 97, 112, 16, 114, 101, 116, 117, 114, 110, 32, 114, 101, 115,
111, 117, 114, 99, 101, 59, 1, 1, 6, 99, 111, 111, 107, 105, 101, 9, 109, 121,
45, 99, 111, 111, 107, 105, 101, 1, 0
45, 99, 111, 111, 107, 105, 101, 0
],
// January 1, 2000 10:00:00
created_at: 946720800,
@@ -164,9 +260,18 @@ mod tests {
url: Url::parse("https://secutils.dev")?,
user_id: mock_user()?.id,
job_id: Some(uuid!("00000000-0000-0000-0000-000000000002")),
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * *".to_string(),
retry_strategy: Some(SchedulerJobRetryStrategy::Exponential {
initial_interval: Duration::from_millis(1234),
multiplier: 2,
max_interval: Duration::from_secs(120),
max_attempts: 5,
}),
notifications: true
}),
settings: WebPageTrackerSettings {
revisions: 1,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2000),
scripts: Some(
[(
@@ -180,8 +285,7 @@ mod tests {
[("cookie".to_string(), "my-cookie".to_string())]
.into_iter()
.collect()
),
enable_notifications: true,
)
},
created_at: OffsetDateTime::from_unix_timestamp(946720800)?,
meta: None
@@ -200,13 +304,12 @@ mod tests {
url: Url::parse("https://secutils.dev")?,
user_id: mock_user()?.id,
job_id: None,
job_config: None,
settings: WebPageTrackerSettings {
revisions: 1,
schedule: None,
delay: Default::default(),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: false,
},
created_at: OffsetDateTime::from_unix_timestamp(946720800)?,
meta: None
@@ -218,10 +321,10 @@ mod tests {
name: "tk".to_string(),
url: "https://secutils.dev/".to_string(),
kind: vec![0],
schedule: None,
user_id: *mock_user()?.id,
job_id: None,
data: vec![1, 0, 0, 0, 0, 0],
job_config: None,
data: vec![1, 0, 0, 0, 0],
// January 1, 2000 10:00:00
created_at: 946720800,
}
@@ -234,9 +337,18 @@ mod tests {
url: Url::parse("https://secutils.dev")?,
user_id: mock_user()?.id,
job_id: Some(uuid!("00000000-0000-0000-0000-000000000002")),
job_config: Some(SchedulerJobConfig {
schedule: "0 0 * * *".to_string(),
retry_strategy: Some(SchedulerJobRetryStrategy::Exponential {
initial_interval: Duration::from_millis(1234),
multiplier: 2,
max_interval: Duration::from_secs(120),
max_attempts: 5,
}),
notifications: true
}),
settings: WebPageTrackerSettings {
revisions: 1,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2000),
scripts: Some(
[(
@@ -251,7 +363,6 @@ mod tests {
.into_iter()
.collect()
),
enable_notifications: true,
},
created_at: OffsetDateTime::from_unix_timestamp(946720800)?,
meta: None
@@ -263,18 +374,21 @@ mod tests {
name: "tk".to_string(),
url: "https://secutils.dev/".to_string(),
kind: vec![0],
schedule: Some("0 0 * * *".to_string()),
user_id: *mock_user()?.id,
job_id: Some(
uuid!("00000000-0000-0000-0000-000000000002")
.as_bytes()
.to_vec()
),
job_config: Some(vec![
9, 48, 32, 48, 32, 42, 32, 42, 32, 42, 1, 1, 1, 128, 157, 202, 111, 2, 120, 0,
5, 1
]),
data: vec![
1, 208, 15, 1, 1, 17, 114, 101, 115, 111, 117, 114, 99, 101, 70, 105, 108, 116,
101, 114, 77, 97, 112, 16, 114, 101, 116, 117, 114, 110, 32, 114, 101, 115,
111, 117, 114, 99, 101, 59, 1, 1, 6, 99, 111, 111, 107, 105, 101, 9, 109, 121,
45, 99, 111, 111, 107, 105, 101, 1, 0
45, 99, 111, 111, 107, 105, 101, 0
],
// January 1, 2000 10:00:00
created_at: 946720800,
4 changes: 1 addition & 3 deletions src/utils/web_scraping/web_page_trackers.rs
Original file line number Diff line number Diff line change
@@ -21,9 +21,7 @@ pub use self::{
},
web_page_tracker::WebPageTracker,
web_page_tracker_kind::WebPageTrackerKind,
web_page_tracker_settings::{
WebPageTrackerSettings, MAX_WEB_PAGE_TRACKER_DELAY, MAX_WEB_PAGE_TRACKER_REVISIONS,
},
web_page_tracker_settings::WebPageTrackerSettings,
web_page_tracker_tag::WebPageTrackerTag,
web_scraper::WebScraperErrorResponse,
};
57 changes: 41 additions & 16 deletions src/utils/web_scraping/web_page_trackers/web_page_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{
scheduler::SchedulerJobConfig,
users::UserId,
utils::{WebPageTrackerSettings, WebPageTrackerTag},
};
@@ -19,10 +20,13 @@ pub struct WebPageTracker<Tag: WebPageTrackerTag> {
/// Id of the user who owns the tracker.
#[serde(skip_serializing)]
pub user_id: UserId,
/// ID of the optional job that triggers web page checking. If `None` when `schedule` is set,
/// ID of the optional job that triggers web page checking. If `None` when `job_config` is set,
/// then the job is not scheduled it.
#[serde(skip_serializing)]
pub job_id: Option<Uuid>,
/// Configuration of the job that triggers web page checking, if configured.
#[serde(skip_serializing_if = "Option::is_none")]
pub job_config: Option<SchedulerJobConfig>,
/// Settings of the web page tracker.
pub settings: WebPageTrackerSettings,
/// Optional meta data of the web page tracker.
@@ -36,10 +40,12 @@ pub struct WebPageTracker<Tag: WebPageTrackerTag> {
#[cfg(test)]
mod tests {
use crate::{
scheduler::{SchedulerJobConfig, SchedulerJobRetryStrategy},
tests::MockWebPageTrackerBuilder,
utils::{WebPageResourcesTrackerTag, WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME},
};
use insta::assert_json_snapshot;
use std::time::Duration;
use uuid::uuid;

#[test]
@@ -59,8 +65,7 @@ mod tests {
"url": "http://localhost:1234/my/app?q=2",
"settings": {
"revisions": 3,
"delay": 2500,
"enableNotifications": true
"delay": 2500
},
"createdAt": 946720800
}
@@ -80,11 +85,13 @@ mod tests {
"id": "00000000-0000-0000-0000-000000000001",
"name": "some-name",
"url": "http://localhost:1234/my/app?q=2",
"jobConfig": {
"schedule": "0 0 * * *",
"notifications": false
},
"settings": {
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"enableNotifications": true
"delay": 2500
},
"createdAt": 946720800
}
@@ -112,14 +119,16 @@ mod tests {
"id": "00000000-0000-0000-0000-000000000001",
"name": "some-name",
"url": "http://localhost:1234/my/app?q=2",
"jobConfig": {
"schedule": "0 0 * * *",
"notifications": false
},
"settings": {
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"scripts": {
"resourceFilterMap": "return resource;"
},
"enableNotifications": true
}
},
"createdAt": 946720800
}
@@ -140,12 +149,14 @@ mod tests {
"id": "00000000-0000-0000-0000-000000000001",
"name": "some-name",
"url": "http://localhost:1234/my/app?q=2",
"jobConfig": {
"schedule": "0 0 * * *",
"notifications": false
},
"settings": {
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"scripts": {},
"enableNotifications": true
"scripts": {}
},
"createdAt": 946720800
}
@@ -160,19 +171,33 @@ mod tests {
.with_delay_millis(2500)
.with_schedule("0 0 * * *")
.with_scripts(Default::default())
.without_notifications()
.with_job_config(SchedulerJobConfig {
schedule: "0 0 * * *".to_string(),
notifications: false,
retry_strategy: Some(SchedulerJobRetryStrategy::Constant {
interval: Duration::from_secs(1000),
max_attempts: 10,
}),
})
.build();
assert_json_snapshot!(tracker, @r###"
{
"id": "00000000-0000-0000-0000-000000000001",
"name": "some-name",
"url": "http://localhost:1234/my/app?q=2",
"jobConfig": {
"schedule": "0 0 * * *",
"retryStrategy": {
"type": "constant",
"interval": 1000000,
"maxAttempts": 10
},
"notifications": false
},
"settings": {
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"scripts": {},
"enableNotifications": false
"scripts": {}
},
"createdAt": 946720800
}
132 changes: 4 additions & 128 deletions src/utils/web_scraping/web_page_trackers/web_page_tracker_settings.rs
Original file line number Diff line number Diff line change
@@ -2,21 +2,12 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationMilliSeconds};
use std::{collections::HashMap, time::Duration};

/// We currently support up to 10 revisions of the web page content.
pub const MAX_WEB_PAGE_TRACKER_REVISIONS: usize = 10;

/// We currently wait up to 60 seconds before starting to track web page.
pub const MAX_WEB_PAGE_TRACKER_DELAY: Duration = Duration::from_secs(60);

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct WebPageTrackerSettings {
/// A number of revisions of the web page content to track.
pub revisions: usize,
/// Optional schedule to track web page on.
#[serde(skip_serializing_if = "Option::is_none")]
pub schedule: Option<String>,
/// Number of milliseconds to wait after web page enters "idle" state to start tracking.
#[serde_as(as = "DurationMilliSeconds<u64>")]
pub delay: Duration,
@@ -26,8 +17,6 @@ pub struct WebPageTrackerSettings {
/// Optional list of HTTP headers that should be sent with the tracker requests.
#[serde(skip_serializing_if = "Option::is_none")]
pub headers: Option<HashMap<String, String>>,
/// Indicates that web page change notifications are enabled for this tracker.
pub enable_notifications: bool,
}

#[cfg(test)]
@@ -43,40 +32,19 @@ mod tests {
fn serialization() -> anyhow::Result<()> {
let settings = WebPageTrackerSettings {
revisions: 3,
schedule: None,
delay: Duration::from_millis(2500),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
};
assert_json_snapshot!(settings, @r###"
{
"revisions": 3,
"delay": 2500,
"enableNotifications": true
}
"###);

let settings = WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2500),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
};
assert_json_snapshot!(settings, @r###"
{
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"enableNotifications": true
"delay": 2500
}
"###);

let settings = WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2500),
scripts: Some(
[(
@@ -91,54 +59,17 @@ mod tests {
.into_iter()
.collect(),
),
enable_notifications: true,
};
assert_json_snapshot!(settings, @r###"
{
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"scripts": {
"resourceFilterMap": "return resource;"
},
"headers": {
"cookie": "my-cookie"
},
"enableNotifications": true
}
"###);

let settings = WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2500),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
};
assert_json_snapshot!(settings, @r###"
{
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"enableNotifications": true
}
"###);

let settings = WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2500),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: false,
};
assert_json_snapshot!(settings, @r###"
{
"revisions": 3,
"schedule": "0 0 * * *",
"delay": 2500,
"enableNotifications": false
}
}
"###);

@@ -149,71 +80,19 @@ mod tests {
fn deserialization() -> anyhow::Result<()> {
let settings = WebPageTrackerSettings {
revisions: 3,
schedule: None,
delay: Duration::from_millis(2000),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
};
assert_eq!(
serde_json::from_str::<WebPageTrackerSettings>(
&json!({ "revisions": 3, "delay": 2000, "enableNotifications": true }).to_string()
)?,
settings
);

let settings = WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2000),
scripts: Default::default(),
headers: Default::default(),
enable_notifications: true,
};
assert_eq!(
serde_json::from_str::<WebPageTrackerSettings>(
&json!({ "revisions": 3, "delay": 2000, "schedule": "0 0 * * *", "enableNotifications": true }).to_string()
)?,
settings
);

let settings = WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2000),
scripts: Some(
[(
WEB_PAGE_RESOURCES_TRACKER_FILTER_SCRIPT_NAME.to_string(),
"return resource;".to_string(),
)]
.into_iter()
.collect(),
),
headers: Some(
[("cookie".to_string(), "my-cookie".to_string())]
.into_iter()
.collect(),
),
enable_notifications: true,
};
assert_eq!(
serde_json::from_str::<WebPageTrackerSettings>(
&json!({
"revisions": 3,
"delay": 2000,
"schedule": "0 0 * * *",
"scripts": { "resourceFilterMap": "return resource;" },
"headers": { "cookie": "my-cookie" },
"enableNotifications": true
})
.to_string()
&json!({ "revisions": 3, "delay": 2000 }).to_string()
)?,
settings
);

let settings = WebPageTrackerSettings {
revisions: 3,
schedule: Some("0 0 * * *".to_string()),
delay: Duration::from_millis(2000),
scripts: Some(
[(
@@ -228,17 +107,14 @@ mod tests {
.into_iter()
.collect(),
),
enable_notifications: false,
};
assert_eq!(
serde_json::from_str::<WebPageTrackerSettings>(
&json!({
"revisions": 3,
"delay": 2000,
"schedule": "0 0 * * *",
"scripts": { "resourceFilterMap": "return resource;" },
"headers": { "cookie": "my-cookie" },
"enableNotifications": false
"headers": { "cookie": "my-cookie" }
})
.to_string()
)?,

0 comments on commit f3decab

Please sign in to comment.