Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
max-trunk committed Jan 10, 2025
1 parent 2124056 commit ce062af
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 117 deletions.
159 changes: 158 additions & 1 deletion codeowners/src/codeowners.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
collections::HashMap,
fs::File,
path::{Path, PathBuf},
sync::{Arc, Mutex, RwLock},
thread,
};

use constants::CODEOWNERS_LOCATIONS;
Expand All @@ -17,7 +20,13 @@ use wasm_bindgen::prelude::*;

#[cfg(feature = "pyo3")]
use crate::{github::BindingsGitHubOwners, gitlab::BindingsGitLabOwners};
use crate::{github::GitHubOwners, gitlab::GitLabOwners, traits::FromReader};
use crate::{
github::GitHubOwners,
gitlab::GitLabOwners,
traits::{FromReader, OwnersOfPath},
};

pub type BundleUploadIDAndCodeOwnersBytes = (String, Option<Vec<u8>>);

// TODO(TRUNK-13628): Implement serializing and deserializing for CodeOwners
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -86,6 +95,36 @@ impl CodeOwners {
owners: owners_result.ok(),
}
}

pub fn parse_many_multithreaded(
to_parse: Vec<BundleUploadIDAndCodeOwnersBytes>,
num_threads: usize,
) -> HashMap<String, Option<Self>> {
let chunk_size = (to_parse.len() + num_threads - 1) / num_threads;
let mut handles = Vec::with_capacity(num_threads);
let results_map: Arc<Mutex<HashMap<String, Option<Self>>>> =
Arc::new(Mutex::new(HashMap::new()));

for chunk in to_parse.chunks(chunk_size) {
let chunk = chunk.to_vec();
let results_map = Arc::clone(&results_map);
let handle = thread::spawn(move || {
for (bundle_upload_id, codeowners_bytes) in chunk.into_iter() {
let codeowners = codeowners_bytes.map(Self::parse);
let mut results_map = results_map.lock().unwrap();
results_map.insert(bundle_upload_id, codeowners);
}
});

handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

Arc::try_unwrap(results_map).unwrap().into_inner().unwrap()
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -123,6 +162,67 @@ impl BindingsOwners {
}
}

pub fn associate_codeowners_multithreaded(
codeowners_matchers: HashMap<String, Option<Owners>>,
to_associate: Vec<(String, Option<String>)>,
num_threads: usize,
) -> Vec<Vec<String>> {
let input_len = to_associate.len();
let chunk_size = (input_len + num_threads - 1) / num_threads;
let mut handles = Vec::with_capacity(num_threads);
let codeowners_matchers = Arc::new(RwLock::new(codeowners_matchers));
let all_associated_owners: Arc<Mutex<Vec<Option<Vec<String>>>>> =
Arc::new(Mutex::new(vec![None; input_len]));

for i in 0..num_threads {
let to_associate = to_associate.clone();
let codeowners_matchers = Arc::clone(&codeowners_matchers);
let all_associated_owners = Arc::clone(&all_associated_owners);
let start = i * chunk_size;
let end = ((i + 1) * chunk_size).min(input_len);
let handle = thread::spawn(move || {
let codeowners_matchers = codeowners_matchers.read().unwrap();
for j in start..end {
let (bundle_upload_id, file) = &to_associate[j];
let codeowners_matcher = codeowners_matchers.get(bundle_upload_id);
let associated_owners: Vec<String> = match (codeowners_matcher, &file) {
(Some(Some(owners)), Some(file)) => match owners {
Owners::GitHubOwners(gho) => gho
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
Owners::GitLabOwners(glo) => glo
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
},
_ => Vec::new(),
};
let mut all_associated_owners = all_associated_owners.lock().unwrap();
all_associated_owners[j] = Some(associated_owners);
}
});

handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}

Arc::try_unwrap(all_associated_owners)
.unwrap()
.into_inner()
.unwrap()
.into_iter()
.flatten()
.collect()
}

const CODEOWNERS: &str = "CODEOWNERS";

fn locate_codeowners<T, U>(repo_root: T, location: U) -> Option<PathBuf>
Expand All @@ -137,3 +237,60 @@ where
None
}
}

#[cfg(test)]
mod tests {
use super::*;

fn make_codeowners_bytes(i: usize) -> Vec<u8> {
format!("{i}.txt @user{i}").into_bytes()
}

#[test]
pub fn test_multithreaded_parsing_and_association() {
let num_codeowners_files = 100;
let num_files_to_associate_owners = 1000;
let num_threads = 4;

let codeowners_files: Vec<BundleUploadIDAndCodeOwnersBytes> = (0..num_codeowners_files)
.map(|i| (i.to_string(), Some(make_codeowners_bytes(i))))
.collect();
let to_associate: Vec<(String, Option<String>)> = (0..num_files_to_associate_owners)
.map(|i| {
let mut file = "foo".to_string();
if i % 2 == 0 {
let file_prefix = i % num_codeowners_files;
file = format!("{file_prefix}.txt");
}
((i % num_codeowners_files).to_string(), Some(file))
})
.collect();

let codeowners_matchers =
CodeOwners::parse_many_multithreaded(codeowners_files, num_threads)
.into_iter()
.map(|(bundle_upload_id, codeowners)| {
(
bundle_upload_id,
codeowners.and_then(|codeowners| codeowners.owners),
)
})
.collect();
let owners = crate::associate_codeowners_multithreaded(
codeowners_matchers,
to_associate,
num_threads,
);

assert_eq!(owners.len(), num_files_to_associate_owners);
for (i, owners) in owners.iter().enumerate() {
if i % 2 == 0 {
assert_eq!(owners.len(), 1);
let user_id = i % num_codeowners_files;
assert_eq!(owners[0], format!("@user{user_id}"));
} else {
assert_eq!(owners.len(), 0);
}
}
}
}
5 changes: 4 additions & 1 deletion codeowners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ mod github;
mod gitlab;
mod traits;

pub use codeowners::{BindingsOwners, CodeOwners, Owners};
pub use codeowners::{
associate_codeowners_multithreaded, BindingsOwners, BundleUploadIDAndCodeOwnersBytes,
CodeOwners, Owners,
};
pub use github::{BindingsGitHubOwners, GitHubOwner, GitHubOwners};
pub use gitlab::{BindingsGitLabOwners, GitLabOwner, GitLabOwners};
pub use traits::{FromPath, FromReader, OwnersOfPath};
76 changes: 34 additions & 42 deletions context-py/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::{Arc, RwLock};
use std::{collections::HashMap, io::BufReader, thread};
use std::{collections::HashMap, io::BufReader};

use bundle::{
parse_meta as parse_meta_impl, parse_meta_from_tarball as parse_meta_from_tarball_impl,
BindingsVersionedBundle,
};
use codeowners::{BindingsOwners, CodeOwners};
use codeowners::{
associate_codeowners_multithreaded as associate_codeowners, BindingsOwners,
BundleUploadIDAndCodeOwnersBytes, CodeOwners,
};
use context::{env, junit, meta, repo};
use prost::Message;
use pyo3::{exceptions::PyTypeError, prelude::*};
Expand Down Expand Up @@ -186,51 +188,40 @@ fn codeowners_parse(codeowners_bytes: Vec<u8>) -> PyResult<BindingsOwners> {
}
}

#[gen_stub_pyfunction]
#[pyfunction]
fn parse_many_codeowners_multithreaded(
to_parse: Vec<BundleUploadIDAndCodeOwnersBytes>,
num_threads: usize,
) -> HashMap<String, Option<BindingsOwners>> {
CodeOwners::parse_many_multithreaded(to_parse, num_threads)
.into_iter()
.map(|(bundle_upload_id, codeowners)| {
(
bundle_upload_id,
codeowners.and_then(|codeowners| codeowners.owners.map(BindingsOwners)),
)
})
.collect()
}

#[gen_stub_pyfunction]
#[pyfunction]
fn associate_codeowners_multithreaded(
codeowners_matchers: HashMap<String, Option<BindingsOwners>>,
to_match: Vec<(String, Option<String>)>,
to_associate: Vec<(String, Option<String>)>,
num_threads: usize,
) -> Vec<Vec<String>> {
let chunk_size = (to_match.len() + num_threads - 1) / num_threads;
let mut handles = Vec::with_capacity(num_threads);
let shared_map = Arc::new(RwLock::new(codeowners_matchers));

for chunk in to_match.chunks(chunk_size) {
let chunk = chunk.to_vec();
let shared_map = Arc::clone(&shared_map);
let handle = thread::spawn(move || {
let map = shared_map.read().unwrap();
chunk
.into_iter()
.map(|bundle_upload_id_and_file| -> Vec<String> {
let matcher = map.get(&bundle_upload_id_and_file.0);
match (matcher, &bundle_upload_id_and_file.1) {
(Some(Some(bo)), Some(file)) => {
if let Some(gho) = bo.get_github_owners() {
gho.of(file.to_string()).unwrap_or_default()
} else if let Some(glo) = bo.get_gitlab_owners() {
glo.of(file.to_string()).unwrap_or_default()
} else {
Vec::new()
}
}
_ => Vec::new(),
}
})
.collect::<Vec<Vec<String>>>()
});
handles.push(handle);
}

let mut result = Vec::new();
for handle in handles {
let chunk_result = handle.join().unwrap();
result.extend(chunk_result);
}

result
associate_codeowners(
codeowners_matchers
.into_iter()
.map(|(bundle_upload_id, codeowners)| {
(bundle_upload_id, codeowners.map(|codeowners| codeowners.0))
})
.collect(),
to_associate,
num_threads,
)
}

#[pymodule]
Expand Down Expand Up @@ -278,6 +269,7 @@ fn context_py(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<codeowners::BindingsOwners>()?;
m.add_function(wrap_pyfunction!(codeowners_parse, m)?)?;
m.add_function(wrap_pyfunction!(associate_codeowners_multithreaded, m)?)?;
m.add_function(wrap_pyfunction!(parse_many_codeowners_multithreaded, m)?)?;

Ok(())
}
Expand Down
Loading

0 comments on commit ce062af

Please sign in to comment.