diff --git a/Cargo.lock b/Cargo.lock index bb237d03..ebefba50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -843,6 +843,7 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.69", + "tokio", "tsify-next", "wasm-bindgen", ] diff --git a/codeowners/Cargo.toml b/codeowners/Cargo.toml index ecfdce74..0da88941 100644 --- a/codeowners/Cargo.toml +++ b/codeowners/Cargo.toml @@ -25,6 +25,7 @@ serde_json = "1.0.133" tsify-next = { version = "0.5.4", optional = true } wasm-bindgen = { version = "0.2.95", optional = true } pyo3-stub-gen = { version = "0.6.0", optional = true } +tokio = { version = "*", default-features = false, features = ["rt", "macros"] } [target.'cfg(target_os = "linux")'.dependencies] pyo3 = { version = "0.22.5", optional = true, features = [ @@ -35,6 +36,13 @@ pyo3 = { version = "0.22.5", optional = true, features = [ [target.'cfg(target_os = "macos")'.dependencies] pyo3 = { version = "0.22.5", optional = true, features = ["abi3-py39"] } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { version = "*", default-features = false, features = [ + "rt-multi-thread", + "macros", + "test-util", +] } + [features] bindings = [] wasm = ["bindings", "dep:wasm-bindgen", "dep:js-sys", "dep:tsify-next"] diff --git a/codeowners/src/codeowners.rs b/codeowners/src/codeowners.rs index 669f8912..bcb85253 100644 --- a/codeowners/src/codeowners.rs +++ b/codeowners/src/codeowners.rs @@ -1,14 +1,17 @@ use std::{ fs::File, path::{Path, PathBuf}, + sync::Arc, }; +use anyhow::Result; use constants::CODEOWNERS_LOCATIONS; #[cfg(feature = "pyo3")] use pyo3::prelude::*; #[cfg(feature = "pyo3")] use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; use serde::{Deserialize, Serialize}; +use tokio::task; #[cfg(feature = "wasm")] use tsify_next::Tsify; #[cfg(feature = "wasm")] @@ -16,7 +19,11 @@ use wasm_bindgen::prelude::*; #[cfg(feature = "pyo3")] use crate::{github::BindingsGitHubOwners, gitlab::BindingsGitLabOwners}; -use crate::{github::GitHubOwners, gitlab::GitLabOwners, traits::FromReader}; +use crate::{ + github::GitHubOwners, + gitlab::GitLabOwners, + traits::{FromReader, OwnersOfPath}, +}; // TODO(TRUNK-13628): Implement serializing and deserializing for CodeOwners #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] @@ -85,6 +92,24 @@ impl CodeOwners { owners: owners_result.ok(), } } + + pub async fn parse_many_multithreaded(to_parse: Vec>) -> Result> { + let tasks = to_parse + .into_iter() + .enumerate() + .map(|(i, codeowners_bytes)| { + task::spawn(async move { (i, Self::parse(codeowners_bytes)) }) + }) + .collect::>(); + + let mut results = vec![None; tasks.len()]; + for task in tasks { + let (i, result) = task.await?; + results[i] = Some(result); + } + + Ok(results.into_iter().flatten().collect()) + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -95,7 +120,7 @@ pub enum Owners { // TODO(TRUNK-13784): Make this smarter and return only an object with a .of method // instead of forcing the ETL to try GitHub or GitLab -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "pyo3", gen_stub_pyclass, pyclass)] pub struct BindingsOwners(pub Owners); @@ -121,6 +146,43 @@ impl BindingsOwners { } } +fn associate_codeowners>(owners: &Owners, file: T) -> Vec { + match owners { + Owners::GitHubOwners(gho) => gho + .of(file) + .unwrap_or_default() + .iter() + .map(ToString::to_string) + .collect(), + Owners::GitLabOwners(glo) => glo + .of(file) + .unwrap_or_default() + .iter() + .map(ToString::to_string) + .collect(), + } +} + +pub async fn associate_codeowners_multithreaded + Send + Sync + 'static>( + to_associate: Vec<(Arc, T)>, +) -> Result>> { + let tasks = to_associate + .into_iter() + .enumerate() + .map(|(i, (owners, file))| { + task::spawn(async move { (i, associate_codeowners(owners.as_ref(), file)) }) + }) + .collect::>(); + + let mut results = vec![None; tasks.len()]; + for task in tasks { + let (i, result) = task.await?; + results[i] = Some(result); + } + + Ok(results.into_iter().flatten().collect()) +} + const CODEOWNERS: &str = "CODEOWNERS"; fn locate_codeowners(repo_root: T, location: U) -> Option @@ -135,3 +197,60 @@ where None } } + +#[cfg(test)] +mod tests { + use super::*; + + fn make_codeowners_bytes(i: usize) -> Vec { + format!("{i}.txt @user{i}").into_bytes() + } + + #[tokio::test] + pub async fn test_multithreaded_parsing_and_association() { + let num_codeowners_files = 100; + let num_files_to_associate_owners = 1000; + + let codeowners_files: Vec> = (0..num_codeowners_files) + .map(make_codeowners_bytes) + .collect(); + + let codeowners_matchers = CodeOwners::parse_many_multithreaded(codeowners_files) + .await + .unwrap(); + + let to_associate: Vec<(Arc, String)> = (0..num_files_to_associate_owners) + .map(|i| { + let mut file = "unassociated".to_string(); + if i % 2 == 0 { + let file_prefix = i % num_codeowners_files; + file = format!("{file_prefix}.txt"); + } + ( + Arc::new( + codeowners_matchers[i % num_codeowners_files] + .owners + .clone() + .unwrap(), + ), + file, + ) + }) + .collect(); + + let owners = crate::associate_codeowners_multithreaded(to_associate) + .await + .unwrap(); + + assert_eq!(owners.len(), num_files_to_associate_owners); + for (i, owners) in owners.iter().enumerate() { + if i % 2 == 0 { + assert_eq!(owners.len(), 1); + let user_id = i % num_codeowners_files; + assert_eq!(owners[0], format!("@user{user_id}")); + } else { + assert_eq!(owners.len(), 0); + } + } + } +} diff --git a/codeowners/src/lib.rs b/codeowners/src/lib.rs index a313c783..8deaa975 100644 --- a/codeowners/src/lib.rs +++ b/codeowners/src/lib.rs @@ -3,7 +3,7 @@ mod github; mod gitlab; mod traits; -pub use codeowners::{BindingsOwners, CodeOwners, Owners}; +pub use codeowners::{associate_codeowners_multithreaded, BindingsOwners, CodeOwners, Owners}; pub use github::{BindingsGitHubOwners, GitHubOwner, GitHubOwners}; pub use gitlab::{BindingsGitLabOwners, GitLabOwner, GitLabOwners}; pub use traits::{FromPath, FromReader, OwnersOfPath}; diff --git a/context-py/src/lib.rs b/context-py/src/lib.rs index 0c310405..a037362f 100644 --- a/context-py/src/lib.rs +++ b/context-py/src/lib.rs @@ -1,10 +1,12 @@ -use std::{collections::HashMap, io::BufReader}; +use std::{collections::HashMap, io::BufReader, sync::Arc}; use bundle::{ parse_meta as parse_meta_impl, parse_meta_from_tarball as parse_meta_from_tarball_impl, BindingsVersionedBundle, }; -use codeowners::{BindingsOwners, CodeOwners}; +use codeowners::{ + associate_codeowners_multithreaded as associate_codeowners, BindingsOwners, CodeOwners, Owners, +}; use context::{env, junit, meta, repo}; use prost::Message; use pyo3::{exceptions::PyTypeError, prelude::*}; @@ -185,6 +187,135 @@ fn codeowners_parse(codeowners_bytes: Vec) -> PyResult { } } +#[gen_stub_pyfunction] +#[pyfunction] +fn parse_many_codeowners_n_threads( + to_parse: Vec>>, + num_threads: usize, +) -> PyResult>> { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(num_threads) + .enable_all() + .build()?; + parse_many_codeowners_multithreaded_impl(rt, to_parse) +} + +#[gen_stub_pyfunction] +#[pyfunction] +fn parse_many_codeowners_multithreaded( + to_parse: Vec>>, +) -> PyResult>> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + parse_many_codeowners_multithreaded_impl(rt, to_parse) +} + +fn parse_many_codeowners_multithreaded_impl( + rt: tokio::runtime::Runtime, + to_parse: Vec>>, +) -> PyResult>> { + let to_parse_len = to_parse.len(); + let parsed_indexes = to_parse + .iter() + .enumerate() + .filter_map(|(i, bytes)| -> Option { bytes.as_ref().map(|_bytes| i) }) + .collect::>(); + let parsed_codeowners = rt + .block_on(CodeOwners::parse_many_multithreaded( + to_parse.into_iter().flatten().collect(), + )) + .map_err(|err| PyTypeError::new_err(err.to_string()))?; + + let mut results: Vec> = vec![None; to_parse_len]; + for (i, codeowners) in parsed_codeowners.into_iter().enumerate() { + results[parsed_indexes[i]] = codeowners.owners.map(BindingsOwners); + } + Ok(results) +} + +#[gen_stub_pyfunction] +#[pyfunction] +fn associate_codeowners_n_threads( + codeowners_matchers: HashMap>, + to_associate: Vec<(String, Option)>, + num_threads: usize, +) -> PyResult>> { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(num_threads) + .enable_all() + .build()?; + associate_codeowners_multithreaded_impl(rt, codeowners_matchers, to_associate) +} + +#[gen_stub_pyfunction] +#[pyfunction] +fn associate_codeowners_multithreaded( + codeowners_matchers: HashMap>, + to_associate: Vec<(String, Option)>, +) -> PyResult>> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + associate_codeowners_multithreaded_impl(rt, codeowners_matchers, to_associate) +} + +fn associate_codeowners_multithreaded_impl( + rt: tokio::runtime::Runtime, + codeowners_matchers: HashMap>, + to_associate: Vec<(String, Option)>, +) -> PyResult>> { + let to_associate_len = to_associate.len(); + let associated_indexes = to_associate + .iter() + .enumerate() + .filter_map(|(i, (bundle_upload_id, file))| { + file.as_ref().map(|_file| (i, bundle_upload_id)) + }) + .filter_map(|(i, bundle_upload_id)| { + codeowners_matchers + .get(bundle_upload_id) + .map(|codeowners_matcher| (i, codeowners_matcher)) + }) + .filter_map(|(i, codeowners_matcher)| { + codeowners_matcher.as_ref().map(|_codeowners_matcher| i) + }) + .collect::>(); + let codeowners_matchers: HashMap>> = codeowners_matchers + .into_iter() + .map(|(key, value)| { + ( + key, + value.map(|bindings_owners| Arc::new(bindings_owners.0)), + ) + }) + .collect(); + let associated_codeowners = rt + .block_on(associate_codeowners( + to_associate + .into_iter() + .filter_map(|(bundle_upload_id, file)| file.map(|file| (bundle_upload_id, file))) + .filter_map(|(bundle_upload_id, file)| { + codeowners_matchers + .get(&bundle_upload_id) + .map(|codeowners_matcher| (codeowners_matcher, file)) + }) + .filter_map(|(codeowners_matcher, file)| { + codeowners_matcher + .as_ref() + .map(|codeowners_matcher| (Arc::clone(codeowners_matcher), file)) + }) + .collect(), + )) + .map_err(|err| PyTypeError::new_err(err.to_string()))?; + + let mut results: Vec> = vec![Vec::new(); to_associate_len]; + for (i, owners) in associated_codeowners.into_iter().enumerate() { + results[associated_indexes[i]] = owners; + } + Ok(results) +} + #[pymodule] fn context_py(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; @@ -229,6 +360,10 @@ fn context_py(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_function(wrap_pyfunction!(codeowners_parse, m)?)?; + m.add_function(wrap_pyfunction!(associate_codeowners_multithreaded, m)?)?; + m.add_function(wrap_pyfunction!(associate_codeowners_n_threads, m)?)?; + m.add_function(wrap_pyfunction!(parse_many_codeowners_multithreaded, m)?)?; + m.add_function(wrap_pyfunction!(parse_many_codeowners_n_threads, m)?)?; Ok(()) } diff --git a/context-py/tests/test_parse_codeowners.py b/context-py/tests/test_parse_codeowners.py index 125b99d4..56209a53 100644 --- a/context-py/tests/test_parse_codeowners.py +++ b/context-py/tests/test_parse_codeowners.py @@ -148,3 +148,46 @@ def test_parse_codeowners_from_bytes_gitlab_sections(): "@ruby-owner", "@dev-team", ] + + +def test_parse_and_associate_multithreaded(): + from context_py import ( + associate_codeowners_n_threads, + parse_many_codeowners_n_threads, + ) + + def make_codeowners_bytes(i: int) -> bytes: + return f"{i}.txt @user{i}".encode() + + num_codeowners_files = 100 + num_files_to_associate_owners = 1000 + num_threads = 4 + + codeowners_files = [ + make_codeowners_bytes(i) for i in range(0, num_codeowners_files) + ] + to_associate = [ + ( + f"{i % num_codeowners_files}", + f"{i % num_codeowners_files if i % 2 == 0 else 'foo'}.txt", + ) + for i in range(0, num_files_to_associate_owners) + ] + + parsed_codeowners = parse_many_codeowners_n_threads(codeowners_files, num_threads) + codeowners_matchers = { + f"{i}": codeowners_matcher + for i, codeowners_matcher in enumerate(parsed_codeowners) + } + owners = associate_codeowners_n_threads( + codeowners_matchers, to_associate, num_threads + ) + + assert len(owners) == num_files_to_associate_owners + + for i in range(0, num_files_to_associate_owners): + if i % 2 == 0: + assert len(owners[i]) == 1 + assert owners[i][0] == f"@user{i % num_codeowners_files}" + else: + assert len(owners[i]) == 0