Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TRUNK-13980] bindings for multithreaded codeowners parsing and association #280

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions codeowners/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde_json = "1.0.133"
tsify-next = { version = "0.5.4", optional = true }
wasm-bindgen = { version = "0.2.95", optional = true }
pyo3-stub-gen = { version = "0.6.0", optional = true }
tokio = { version = "*", default-features = false, features = ["rt", "macros"] }

[target.'cfg(target_os = "linux")'.dependencies]
pyo3 = { version = "0.22.5", optional = true, features = [
Expand All @@ -35,6 +36,13 @@ pyo3 = { version = "0.22.5", optional = true, features = [
[target.'cfg(target_os = "macos")'.dependencies]
pyo3 = { version = "0.22.5", optional = true, features = ["abi3-py39"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "*", default-features = false, features = [
"rt-multi-thread",
"macros",
"test-util",
] }

[features]
bindings = []
wasm = ["bindings", "dep:wasm-bindgen", "dep:js-sys", "dep:tsify-next"]
Expand Down
123 changes: 121 additions & 2 deletions codeowners/src/codeowners.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
use std::{
fs::File,
path::{Path, PathBuf},
sync::Arc,
};

use anyhow::Result;
use constants::CODEOWNERS_LOCATIONS;
#[cfg(feature = "pyo3")]
use pyo3::prelude::*;
#[cfg(feature = "pyo3")]
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use serde::{Deserialize, Serialize};
use tokio::task;
#[cfg(feature = "wasm")]
use tsify_next::Tsify;
#[cfg(feature = "wasm")]
use wasm_bindgen::prelude::*;

#[cfg(feature = "pyo3")]
use crate::{github::BindingsGitHubOwners, gitlab::BindingsGitLabOwners};
use crate::{github::GitHubOwners, gitlab::GitLabOwners, traits::FromReader};
use crate::{
github::GitHubOwners,
gitlab::GitLabOwners,
traits::{FromReader, OwnersOfPath},
};

// TODO(TRUNK-13628): Implement serializing and deserializing for CodeOwners
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -85,6 +92,24 @@ impl CodeOwners {
owners: owners_result.ok(),
}
}

pub async fn parse_many_multithreaded(to_parse: Vec<Vec<u8>>) -> Result<Vec<Self>> {
let tasks = to_parse
.into_iter()
.enumerate()
.map(|(i, codeowners_bytes)| {
task::spawn(async move { (i, Self::parse(codeowners_bytes)) })
})
.collect::<Vec<_>>();

let mut results = vec![None; tasks.len()];
for task in tasks {
let (i, result) = task.await?;
results[i] = Some(result);
}

Ok(results.into_iter().flatten().collect())
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -95,7 +120,7 @@ pub enum Owners {

// TODO(TRUNK-13784): Make this smarter and return only an object with a .of method
// instead of forcing the ETL to try GitHub or GitLab
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "pyo3", gen_stub_pyclass, pyclass)]
pub struct BindingsOwners(pub Owners);

Expand All @@ -121,6 +146,43 @@ impl BindingsOwners {
}
}

fn associate_codeowners<T: AsRef<Path>>(owners: &Owners, file: T) -> Vec<String> {
match owners {
Owners::GitHubOwners(gho) => gho
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
Owners::GitLabOwners(glo) => glo
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
}
}

pub async fn associate_codeowners_multithreaded<T: AsRef<Path> + Send + Sync + 'static>(
to_associate: Vec<(Arc<Owners>, T)>,
) -> Result<Vec<Vec<String>>> {
let tasks = to_associate
.into_iter()
.enumerate()
.map(|(i, (owners, file))| {
task::spawn(async move { (i, associate_codeowners(owners.as_ref(), file)) })
})
.collect::<Vec<_>>();

let mut results = vec![None; tasks.len()];
for task in tasks {
let (i, result) = task.await?;
results[i] = Some(result);
}

Ok(results.into_iter().flatten().collect())
}

const CODEOWNERS: &str = "CODEOWNERS";

fn locate_codeowners<T, U>(repo_root: T, location: U) -> Option<PathBuf>
Expand All @@ -135,3 +197,60 @@ where
None
}
}

#[cfg(test)]
mod tests {
use super::*;

fn make_codeowners_bytes(i: usize) -> Vec<u8> {
format!("{i}.txt @user{i}").into_bytes()
}

#[tokio::test]
pub async fn test_multithreaded_parsing_and_association() {
let num_codeowners_files = 100;
let num_files_to_associate_owners = 1000;

let codeowners_files: Vec<Vec<u8>> = (0..num_codeowners_files)
.map(make_codeowners_bytes)
.collect();

let codeowners_matchers = CodeOwners::parse_many_multithreaded(codeowners_files)
.await
.unwrap();

let to_associate: Vec<(Arc<Owners>, String)> = (0..num_files_to_associate_owners)
.map(|i| {
let mut file = "unassociated".to_string();
if i % 2 == 0 {
let file_prefix = i % num_codeowners_files;
file = format!("{file_prefix}.txt");
}
(
Arc::new(
codeowners_matchers[i % num_codeowners_files]
.owners
.clone()
.unwrap(),
),
file,
)
})
.collect();

let owners = crate::associate_codeowners_multithreaded(to_associate)
.await
.unwrap();

assert_eq!(owners.len(), num_files_to_associate_owners);
for (i, owners) in owners.iter().enumerate() {
if i % 2 == 0 {
assert_eq!(owners.len(), 1);
let user_id = i % num_codeowners_files;
assert_eq!(owners[0], format!("@user{user_id}"));
} else {
assert_eq!(owners.len(), 0);
}
}
}
}
2 changes: 1 addition & 1 deletion codeowners/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod github;
mod gitlab;
mod traits;

pub use codeowners::{BindingsOwners, CodeOwners, Owners};
pub use codeowners::{associate_codeowners_multithreaded, BindingsOwners, CodeOwners, Owners};
pub use github::{BindingsGitHubOwners, GitHubOwner, GitHubOwners};
pub use gitlab::{BindingsGitLabOwners, GitLabOwner, GitLabOwners};
pub use traits::{FromPath, FromReader, OwnersOfPath};
139 changes: 137 additions & 2 deletions context-py/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::{collections::HashMap, io::BufReader};
use std::{collections::HashMap, io::BufReader, sync::Arc};

use bundle::{
parse_meta as parse_meta_impl, parse_meta_from_tarball as parse_meta_from_tarball_impl,
BindingsVersionedBundle,
};
use codeowners::{BindingsOwners, CodeOwners};
use codeowners::{
associate_codeowners_multithreaded as associate_codeowners, BindingsOwners, CodeOwners, Owners,
};
use context::{env, junit, meta, repo};
use prost::Message;
use pyo3::{exceptions::PyTypeError, prelude::*};
Expand Down Expand Up @@ -185,6 +187,135 @@ fn codeowners_parse(codeowners_bytes: Vec<u8>) -> PyResult<BindingsOwners> {
}
}

#[gen_stub_pyfunction]
#[pyfunction]
fn parse_many_codeowners_n_threads(
to_parse: Vec<Option<Vec<u8>>>,
num_threads: usize,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.enable_all()
.build()?;
parse_many_codeowners_multithreaded_impl(rt, to_parse)
}

#[gen_stub_pyfunction]
#[pyfunction]
fn parse_many_codeowners_multithreaded(
to_parse: Vec<Option<Vec<u8>>>,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
parse_many_codeowners_multithreaded_impl(rt, to_parse)
}

Comment on lines +190 to +212
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine these by making num_threads Option<usize>? Same with the other similar function below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I initially tried, but was unsuccessful. All function pyo3 function arguments are required by default, but can be configured optional via the #[pyo3(signature = (...))] macro https://pyo3.rs/main/function/signature. This part worked; but on the Python consumer side the function arguments are Unknown and not typed. I couldn't seem to declare argument types with the #[pyo3(signature = (...))] macro and have them carry over to the generated bindings, so this is what I settled for

fn parse_many_codeowners_multithreaded_impl(
rt: tokio::runtime::Runtime,
to_parse: Vec<Option<Vec<u8>>>,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let to_parse_len = to_parse.len();
let parsed_indexes = to_parse
.iter()
.enumerate()
.filter_map(|(i, bytes)| -> Option<usize> { bytes.as_ref().map(|_bytes| i) })
.collect::<Vec<_>>();
let parsed_codeowners = rt
.block_on(CodeOwners::parse_many_multithreaded(
to_parse.into_iter().flatten().collect(),
))
.map_err(|err| PyTypeError::new_err(err.to_string()))?;

let mut results: Vec<Option<BindingsOwners>> = vec![None; to_parse_len];
for (i, codeowners) in parsed_codeowners.into_iter().enumerate() {
results[parsed_indexes[i]] = codeowners.owners.map(BindingsOwners);
}
Ok(results)
}

#[gen_stub_pyfunction]
#[pyfunction]
fn associate_codeowners_n_threads(
codeowners_matchers: HashMap<String, Option<BindingsOwners>>,
to_associate: Vec<(String, Option<String>)>,
num_threads: usize,
) -> PyResult<Vec<Vec<String>>> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.enable_all()
.build()?;
associate_codeowners_multithreaded_impl(rt, codeowners_matchers, to_associate)
}

#[gen_stub_pyfunction]
#[pyfunction]
fn associate_codeowners_multithreaded(
codeowners_matchers: HashMap<String, Option<BindingsOwners>>,
to_associate: Vec<(String, Option<String>)>,
) -> PyResult<Vec<Vec<String>>> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
associate_codeowners_multithreaded_impl(rt, codeowners_matchers, to_associate)
}

fn associate_codeowners_multithreaded_impl(
rt: tokio::runtime::Runtime,
codeowners_matchers: HashMap<String, Option<BindingsOwners>>,
to_associate: Vec<(String, Option<String>)>,
) -> PyResult<Vec<Vec<String>>> {
let to_associate_len = to_associate.len();
let associated_indexes = to_associate
.iter()
.enumerate()
.filter_map(|(i, (bundle_upload_id, file))| {
file.as_ref().map(|_file| (i, bundle_upload_id))
})
.filter_map(|(i, bundle_upload_id)| {
codeowners_matchers
.get(bundle_upload_id)
.map(|codeowners_matcher| (i, codeowners_matcher))
})
.filter_map(|(i, codeowners_matcher)| {
codeowners_matcher.as_ref().map(|_codeowners_matcher| i)
})
.collect::<Vec<_>>();
let codeowners_matchers: HashMap<String, Option<Arc<Owners>>> = codeowners_matchers
.into_iter()
.map(|(key, value)| {
(
key,
value.map(|bindings_owners| Arc::new(bindings_owners.0)),
)
})
.collect();
let associated_codeowners = rt
.block_on(associate_codeowners(
to_associate
.into_iter()
.filter_map(|(bundle_upload_id, file)| file.map(|file| (bundle_upload_id, file)))
.filter_map(|(bundle_upload_id, file)| {
codeowners_matchers
.get(&bundle_upload_id)
.map(|codeowners_matcher| (codeowners_matcher, file))
})
.filter_map(|(codeowners_matcher, file)| {
codeowners_matcher
.as_ref()
.map(|codeowners_matcher| (Arc::clone(codeowners_matcher), file))
})
.collect(),
))
.map_err(|err| PyTypeError::new_err(err.to_string()))?;

let mut results: Vec<Vec<String>> = vec![Vec::new(); to_associate_len];
for (i, owners) in associated_codeowners.into_iter().enumerate() {
results[associated_indexes[i]] = owners;
}
Ok(results)
}

#[pymodule]
fn context_py(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<env::parser::CIInfo>()?;
Expand Down Expand Up @@ -229,6 +360,10 @@ fn context_py(m: &Bound<'_, PyModule>) -> PyResult<()> {

m.add_class::<codeowners::BindingsOwners>()?;
m.add_function(wrap_pyfunction!(codeowners_parse, m)?)?;
m.add_function(wrap_pyfunction!(associate_codeowners_multithreaded, m)?)?;
m.add_function(wrap_pyfunction!(associate_codeowners_n_threads, m)?)?;
m.add_function(wrap_pyfunction!(parse_many_codeowners_multithreaded, m)?)?;
m.add_function(wrap_pyfunction!(parse_many_codeowners_n_threads, m)?)?;

Ok(())
}
Expand Down
Loading
Loading