Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1656934 - Scan pending pings directories after dealing with upload status #1205

Merged
merged 2 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* General
* Track the size of the database file at startup ([#1141](https://github.com/mozilla/glean/pull/1141)).
* BUGFIX: scan the pending pings directories **after** dealing with upload status on initialization. This is important, because in case upload is disabled we delete any outstanding non-deletion ping file, and if we scan the pending pings folder before doing that we may end up sending pings that should have been discarded. ([#1205](https://github.com/mozilla/glean/pull/1205))
* iOS
* Disabled code coverage in release builds ([#1195](https://github.com/mozilla/glean/issues/1195)).
* Python
Expand Down
2 changes: 1 addition & 1 deletion glean-core/ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ pub unsafe extern "C" fn glean_initialize_for_subprocess(cfg: *const FfiConfigur
// 2. We're not holding on to it beyond this function
// and we copy out all data when needed.
let glean_cfg = glean_core::Configuration::try_from(&*cfg)?;
let glean = Glean::new_for_subprocess(&glean_cfg)?;
let glean = Glean::new_for_subprocess(&glean_cfg, true)?;
glean_core::setup_glean(glean)?;
log::info!("Glean initialized for subprocess");
Ok(true)
Expand Down
22 changes: 16 additions & 6 deletions glean-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Glean {
///
/// Importantly, this will not send any pings at startup, since that
/// sort of management should only happen in the main process.
pub fn new_for_subprocess(cfg: &Configuration) -> Result<Self> {
pub fn new_for_subprocess(cfg: &Configuration, scan_directories: bool) -> Result<Self> {
log::info!("Creating new Glean v{}", GLEAN_VERSION);

let application_id = sanitize_application_id(&cfg.application_id);
Expand All @@ -201,12 +201,17 @@ impl Glean {
let event_data_store = EventDatabase::new(&cfg.data_path)?;

// Create an upload manager with rate limiting of 10 pings every 60 seconds.
let mut upload_manager =
PingUploadManager::new(&cfg.data_path, &cfg.language_binding_name, false);
let mut upload_manager = PingUploadManager::new(&cfg.data_path, &cfg.language_binding_name);
upload_manager.set_rate_limiter(
/* seconds per interval */ 60, /* max tasks per interval */ 15,
);

// We only scan the pending ping sdirectories when calling this from a subprocess,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this makes me question the name of this function. If you have better ideas I'd be up for changing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops: pending ping sdirectories

// when calling this from ::new we need to scan the directories after dealing with the upload state.
if scan_directories {
let _scanning_thread = upload_manager.scan_pending_pings_directories();
}

Ok(Self {
upload_enabled: cfg.upload_enabled,
data_store,
Expand All @@ -230,7 +235,7 @@ impl Glean {
/// This will create the necessary directories and files in `data_path`.
/// This will also initialize the core metrics.
pub fn new(cfg: Configuration) -> Result<Self> {
let mut glean = Self::new_for_subprocess(&cfg)?;
let mut glean = Self::new_for_subprocess(&cfg, false)?;

// The upload enabled flag may have changed since the last run, for
// example by the changing of a config file.
Expand Down Expand Up @@ -263,6 +268,12 @@ impl Glean {
}
}

// We only scan the pendings pings directories **after** dealing with the upload state.
// If upload is disabled, we delete all pending pings files
// and we need to do that **before** scanning the pending pings folder
// to ensure we don't enqueue pings before their files are deleted.
let _scanning_thread = glean.upload_manager.scan_pending_pings_directories();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eugh. So this was an actual bug and not a test error. We should add a changelog entry. Do we have any way to estimate how frequently we sent pings out?

I'd suspect that's not super frequent, but it's important to better understand. We could also draft a PSA email, if we find the impact being big.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any way to estimate how frequently we sent pings out?

Hm, I need to think about this. When we have this bug it is usually followed by an error on deleting the ping file. Because we send the request to the caller -> it uploads succesfully (usually) -> and then it tries to delete an already deleted file, which logs an error. But that is all we have, from what I remember on the top of my head.

I'll think a bit further, if you have any ideas, let me know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an idea to validate this: check if any pings are sent from a client_id after this client_id sent a deletion-request ping. Will come back to this thread with results when I have them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Ok(glean)
}

Expand All @@ -285,8 +296,7 @@ impl Glean {
let mut glean = Self::new(cfg).unwrap();

// Disable all upload manager policies for testing
// and make the upload manager scan the pings directories synchronously.
glean.upload_manager = PingUploadManager::no_policy(data_path, true);
glean.upload_manager = PingUploadManager::no_policy(data_path);

glean
}
Expand Down
138 changes: 45 additions & 93 deletions glean-core/src/upload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,54 +163,20 @@ pub struct PingUploadManager {
impl PingUploadManager {
/// Creates a new PingUploadManager.
///
/// Spawns a new thread and processes the pending pings directory,
/// filling up the queue with whatever pings are in there.
///
/// # Arguments
///
/// * `data_path` - Path to the pending pings directory.
/// * `language_binding_name` - The name of the language binding calling this managers instance.
/// * `sync_scan` - Whether or not ping directory scanning should be synchronous.
///
/// # Panics
///
/// Will panic if unable to spawn a new thread.
pub fn new<P: Into<PathBuf>>(
data_path: P,
language_binding_name: &str,
sync_scan: bool,
) -> Self {
let queue = RwLock::new(VecDeque::new());
let directory_manager = PingDirectoryManager::new(data_path);

let processed_pending_pings = Arc::new(AtomicBool::new(false));
let cached_pings = Arc::new(RwLock::new(PingPayloadsByDirectory::default()));

let local_manager = directory_manager.clone();
let local_cached_pings = cached_pings.clone();
let local_flag = processed_pending_pings.clone();
let ping_scanning_thread = thread::Builder::new()
.name("glean.ping_directory_manager.process_dir".to_string())
.spawn(move || {
let mut local_cached_pings = local_cached_pings
.write()
.expect("Can't write to pending pings cache.");
local_cached_pings.extend(local_manager.process_dirs());
local_flag.store(true, Ordering::SeqCst);
})
.expect("Unable to spawn thread to process pings directories.");

if sync_scan {
ping_scanning_thread
.join()
.expect("Unable to wait for startup ping processing to finish.");
}

pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
Self {
queue,
directory_manager,
processed_pending_pings,
cached_pings,
queue: RwLock::new(VecDeque::new()),
directory_manager: PingDirectoryManager::new(data_path),
processed_pending_pings: Arc::new(AtomicBool::new(false)),
cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
recoverable_failure_count: AtomicU32::new(0),
wait_attempt_count: AtomicU32::new(0),
rate_limiter: None,
Expand All @@ -220,10 +186,32 @@ impl PingUploadManager {
}
}

/// Spawns a new thread and processes the pending pings directories,
/// filling up the queue with whatever pings are in there.
///
/// # Returns
///
/// The `JoinHandle` to the spawned thread
pub fn scan_pending_pings_directories(&self) -> std::thread::JoinHandle<()> {
let local_manager = self.directory_manager.clone();
let local_cached_pings = self.cached_pings.clone();
let local_flag = self.processed_pending_pings.clone();
thread::Builder::new()
.name("glean.ping_directory_manager.process_dir".to_string())
.spawn(move || {
let mut local_cached_pings = local_cached_pings
.write()
.expect("Can't write to pending pings cache.");
local_cached_pings.extend(local_manager.process_dirs());
local_flag.store(true, Ordering::SeqCst);
})
.expect("Unable to spawn thread to process pings directories.")
}

/// Creates a new upload manager with no limitations, for tests.
#[cfg(test)]
pub fn no_policy<P: Into<PathBuf>>(data_path: P, sync_scan: bool) -> Self {
let mut upload_manager = Self::new(data_path, "Test", sync_scan);
pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
let mut upload_manager = Self::new(data_path, "Test");

// Disable all policies for tests, if necessary individuals tests can re-enable them.
upload_manager.policy.set_max_recoverable_failures(None);
Expand All @@ -233,6 +221,12 @@ impl PingUploadManager {
.policy
.set_max_pending_pings_directory_size(None);

// When building for tests, always scan the pending pings directories and do it sync.
upload_manager
.scan_pending_pings_directories()
.join()
.unwrap();

upload_manager
}

Expand Down Expand Up @@ -715,9 +709,7 @@ mod test {
fn returns_ping_request_when_there_is_one() {
let (glean, dir) = new_glean(None);

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let upload_manager = PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let upload_manager = PingUploadManager::no_policy(dir.path());

// Enqueue a ping
upload_manager.enqueue_ping(&glean, &Uuid::new_v4().to_string(), PATH, "", None);
Expand All @@ -734,9 +726,7 @@ mod test {
fn returns_as_many_ping_requests_as_there_are() {
let (glean, dir) = new_glean(None);

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let upload_manager = PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let upload_manager = PingUploadManager::no_policy(dir.path());

// Enqueue a ping multiple times
let n = 10;
Expand All @@ -763,10 +753,7 @@ mod test {
fn limits_the_number_of_pings_when_there_is_rate_limiting() {
let (glean, dir) = new_glean(None);

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let mut upload_manager =
PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let mut upload_manager = PingUploadManager::no_policy(dir.path());

// Add a rate limiter to the upload mangager with max of 10 pings every 3 seconds.
let secs_per_interval = 3;
Expand Down Expand Up @@ -808,9 +795,7 @@ mod test {
fn clearing_the_queue_works_correctly() {
let (glean, dir) = new_glean(None);

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let upload_manager = PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let upload_manager = PingUploadManager::no_policy(dir.path());

// Enqueue a ping multiple times
for _ in 0..10 {
Expand Down Expand Up @@ -875,7 +860,7 @@ mod test {
}

// Create a new upload manager pointing to the same data_path as the glean instance.
let upload_manager = PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let upload_manager = PingUploadManager::no_policy(dir.path());

// Verify the requests were properly enqueued
for _ in 0..n {
Expand Down Expand Up @@ -1018,12 +1003,7 @@ mod test {
fn new_pings_are_added_while_upload_in_progress() {
let (glean, dir) = new_glean(None);

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let upload_manager = PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
while upload_manager.get_upload_task(&glean, false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}
let upload_manager = PingUploadManager::no_policy(dir.path());

let doc1 = Uuid::new_v4().to_string();
let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
Expand Down Expand Up @@ -1064,27 +1044,6 @@ mod test {
);
}

#[test]
fn async_ping_directories_scanning_works() {
let (glean, dir) = new_glean(None);

// Create a new upload_manager, with a asynchronous ping dir scan,
// all other tests do this synchronously.
let upload_manager = PingUploadManager::no_policy(dir.path(), /* sync_scan */ false);

// Wait for processing of pending pings directory to finish.
while upload_manager.get_upload_task(&glean, false) == PingUploadTask::Wait {
thread::sleep(Duration::from_millis(10));
}

// Since the scan was synchronous and the directory was empty,
// we expect the upload task to always be `Done`.
assert_eq!(
PingUploadTask::Done,
upload_manager.get_upload_task(&glean, false)
)
}

#[test]
fn adds_debug_view_header_to_requests_when_tag_is_set() {
let (mut glean, _) = new_glean(None);
Expand Down Expand Up @@ -1113,7 +1072,7 @@ mod test {

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let upload_manager = PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let upload_manager = PingUploadManager::no_policy(dir.path());

let doc_id = Uuid::new_v4().to_string();
let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
Expand Down Expand Up @@ -1149,10 +1108,7 @@ mod test {
glean.submit_ping(&ping_type, None).unwrap();
}

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let mut upload_manager =
PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let mut upload_manager = PingUploadManager::no_policy(dir.path());

// Set a policy for max recoverable failures, this is usually disabled for tests.
let max_recoverable_failures = 3;
Expand Down Expand Up @@ -1210,8 +1166,7 @@ mod test {
let (newest_ping_id, _, _, _) = &newest_ping;

// Create a new upload manager pointing to the same data_path as the glean instance.
let mut upload_manager =
PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let mut upload_manager = PingUploadManager::no_policy(dir.path());

// Set the quota to just a little over the size on an empty ping file.
// This way we can check that one ping is kept and all others are deleted.
Expand Down Expand Up @@ -1253,10 +1208,7 @@ mod test {
fn maximum_wait_attemps_is_enforced() {
let (glean, dir) = new_glean(None);

// Create a new upload manager so that we have access to its functions directly,
// make it synchronous so we don't have to manually wait for the scanning to finish.
let mut upload_manager =
PingUploadManager::no_policy(dir.path(), /* sync_scan */ true);
let mut upload_manager = PingUploadManager::no_policy(dir.path());

// Define a max_wait_attemps policy, this is disabled for tests by default.
let max_wait_attempts = 3;
Expand Down