Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TRUNK-13980] bindings for multithreaded codeowners parsing and association #280

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

max-trunk
Copy link
Contributor

@max-trunk max-trunk commented Jan 10, 2025

TRUNK-13980

Adds bindings for new pyfunctions intended to be used in the periodic ETL to 'batch' associate codeowners, leveraging Rust's multithreading capabilities, in an attempt to improve the overall performance of the codeowners association step.

Summary of new pyfunctions:

  • parse_many_codeowners_multithreaded() -> accepts a batch of Vec<(bundle_upload_id, codeowners_bytes)> and returns a HashMap<bundle_upload_id, CodeOwners obj>, splitting work over a requested number of threads
  • associate_codeowners_multithreaded() -> accepts a HashMap<bundle_upload_id, CodeOwners obj> and a Vec<(bundle_upload_id, file path to associate owners for)> and returns a Vec<Vec<associated owner>>, splitting work over a requested number of threads and maintaining order of elements in the resulting vector

See example usage in https://github.com/trunk-io/trunk/pull/20958

Tried my best with the multithreaded implementation but I'm sure it's not the most idiomatic way to do it :) pls hammer away!

Copy link

trunk-io bot commented Jan 10, 2025

Merging to main in this repository is managed by Trunk.

  • To merge this pull request, check the box to the left or comment /trunk merge below.

Copy link

trunk-staging-io bot commented Jan 10, 2025

Static BadgeStatic BadgeStatic BadgeStatic Badge

View Full Report ↗︎Docs

@max-trunk max-trunk marked this pull request as ready for review January 10, 2025 17:04
@max-trunk max-trunk requested a review from pv72895 January 10, 2025 17:04
Copy link
Collaborator

@TylerJang27 TylerJang27 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Deferring to someone who knows more rust :)

.collect();
let to_associate: Vec<(String, Option<String>)> = (0..num_files_to_associate_owners)
.map(|i| {
let mut file = "foo".to_string();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention here for these odd files to not be associated with any codeowners? nit: would rename to "unassociated".to_string() or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Comment on lines 98 to 101
pub fn parse_many_multithreaded(
to_parse: Vec<BundleUploadIDAndCodeOwnersBytes>,
num_threads: usize,
) -> HashMap<String, Option<Self>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a HashMap? We know the order of the threads we start, so we can always create a Vec with the parsed results in the same order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to give back something that allowed for quick lookups of codeowners objects by bundle upload ID. Mainly had the ETL usage in mind

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hear you, but this crate is pretty far removed from that logically—ie I think that logic is better handled outside of this function as a matter of separation of concerns. Looking at this logic only in this crate, it's hard to discern its purpose

Copy link
Contributor

@tenesttang-trunk tenesttang-trunk Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to verify -- we can guarantee that the order of parsed CodeOwners objects returned is the same as the input codeowners bytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Comment on lines 104 to 125
let mut handles = Vec::with_capacity(num_threads);

let results_map: Arc<Mutex<HashMap<String, Option<Self>>>> =
Arc::new(Mutex::new(HashMap::new()));

for chunk in to_parse.chunks(chunk_size) {
let chunk = chunk.to_vec();
let results_map = Arc::clone(&results_map);
let handle = thread::spawn(move || {
for (bundle_upload_id, codeowners_bytes) in chunk.into_iter() {
let codeowners = codeowners_bytes.map(Self::parse);
let mut results_map = results_map.lock().unwrap();
results_map.insert(bundle_upload_id, codeowners);
}
});

handles.push(handle);
}

for handle in handles {
handle.join().unwrap();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a few things here to go over.

For starters, we should use Tokio to handle multi-threading for us. That allows us to use worker threads which will pick up work in parallel up to the maximum we specify for the executor (by default it's the number of threads the machine has). This cleans up a lot of this code.

Secondly, if we were to use this pattern there's a few things that are problematic. First of which is that we don't need to use Arc<Mutex<T>> here since whatever is returned by the thread::spawn closure is returned by handle.join() as a Result<T>. We can just do something like

let results = handles.into_iter().map(|handle| handle.join()).collect::<Result<Vec<Self>>>()?;

Either way, if we use Tokio, we'll use async...await which will make it easier to do this without worrying about handles.
Also, because we're using slice::chunks we're using chunk.to_vec() which will clone all of the codeowners files' bytes. This could be very expensive since the file might be very large. We won't need to worry about chunking if we use Tokio with worker threads.

Lastly, we cannot use unwrap here because that can cause panics—we should handle everything with Result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As always, thanks for the info :)

we should use Tokio to handle multi-threading for us

Will do!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alrighty - refactored to use tokio. Suprise suprise, much cleaner :D

Comment on lines 175 to 176
let codeowners_matchers = Arc::new(RwLock::new(codeowners_matchers));
let to_associate = Arc::new(RwLock::new(to_associate));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we only read these, so you should only need Arc for that.

Comment on lines +9 to 11
use tokio::task;

use constants::CODEOWNERS_LOCATIONS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no empty line needed here, these imports can be grouped

Comment on lines -76 to +87
pub fn parse(codeowners: Vec<u8>) -> Self {
pub async fn parse(codeowners: Vec<u8>) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No async needed here, there's nothing async inside this function

@@ -85,6 +96,29 @@ impl CodeOwners {
owners: owners_result.ok(),
}
}

pub async fn parse_many_multithreaded(
to_parse: Vec<Option<Vec<u8>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's unclear why one would pass Vec<Option<Vec<u8>>> instead of Vec<Vec<u8>>. I know it probably has something to do with how we're processing these, but perhaps we can put that logic outside of this method?

@@ -121,6 +155,58 @@ impl BindingsOwners {
}
}

async fn associate_codeowners(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function isn't async since it doesn't have anything to await

Comment on lines +100 to +121
pub async fn parse_many_multithreaded(
to_parse: Vec<Option<Vec<u8>>>,
) -> Result<Vec<Option<Self>>> {
let mut results = vec![None; to_parse.len()];
let mut tasks = Vec::new();
for (i, codeowners_bytes) in to_parse.into_iter().enumerate() {
let task = task::spawn(async move {
match codeowners_bytes {
Some(cb) => (i, Some(Self::parse(cb).await)),
None => (i, None),
}
});
tasks.push(task);
}

for task in tasks {
let (i, result) = task.await?;
results[i] = result;
}

Ok(results)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I'd simplify to something like the following:

Suggested change
pub async fn parse_many_multithreaded(
to_parse: Vec<Option<Vec<u8>>>,
) -> Result<Vec<Option<Self>>> {
let mut results = vec![None; to_parse.len()];
let mut tasks = Vec::new();
for (i, codeowners_bytes) in to_parse.into_iter().enumerate() {
let task = task::spawn(async move {
match codeowners_bytes {
Some(cb) => (i, Some(Self::parse(cb).await)),
None => (i, None),
}
});
tasks.push(task);
}
for task in tasks {
let (i, result) = task.await?;
results[i] = result;
}
Ok(results)
}
pub async fn parse_many_multithreaded(
to_parse: Vec<Vec<u8>>,
) -> Result<Vec<Self>> {
let tasks = to_parse.into_iter().enumerate().map(|(i, codeowners_bytes)| {
task::spawn(async move {
(i, Self::parse(cb))
})
}).collect::<Vec<_>>();
let mut results = Vec::with_capacity(tasks.len());
for task in tasks {
let (i, result) = task.await?;
results[i] = result;
}
Ok(results)
}

Comment on lines +158 to +181
async fn associate_codeowners(
codeowners_matchers: Arc<HashMap<String, Option<Owners>>>,
bundle_upload_id_and_file_path: BundleUploadIDAndFilePath,
) -> Vec<String> {
let (bundle_upload_id, file) = bundle_upload_id_and_file_path;
let codeowners_matcher = codeowners_matchers.get(&bundle_upload_id);
match (codeowners_matcher, &file) {
(Some(Some(owners)), Some(file)) => match owners {
Owners::GitHubOwners(gho) => gho
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
Owners::GitLabOwners(glo) => glo
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
},
_ => Vec::new(),
}
}
Copy link
Member

@dfrankland dfrankland Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as before, I don't think that BundleId is related to codeowners, so we shouldn't have logic here related to it here. This method can really get simplified to the following:

Suggested change
async fn associate_codeowners(
codeowners_matchers: Arc<HashMap<String, Option<Owners>>>,
bundle_upload_id_and_file_path: BundleUploadIDAndFilePath,
) -> Vec<String> {
let (bundle_upload_id, file) = bundle_upload_id_and_file_path;
let codeowners_matcher = codeowners_matchers.get(&bundle_upload_id);
match (codeowners_matcher, &file) {
(Some(Some(owners)), Some(file)) => match owners {
Owners::GitHubOwners(gho) => gho
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
Owners::GitLabOwners(glo) => glo
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
},
_ => Vec::new(),
}
}
fn associate_codeowners<T: AsRef<Path>>(
owners: &Owners,
file: T,
) -> Vec<String> {
match owners {
Owners::GitHubOwners(gho) => gho
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
Owners::GitLabOwners(glo) => glo
.of(file)
.unwrap_or_default()
.iter()
.map(ToString::to_string)
.collect(),
}
}

Comment on lines +183 to +208
pub async fn associate_codeowners_multithreaded(
codeowners_matchers: HashMap<String, Option<Owners>>,
to_associate: Vec<BundleUploadIDAndFilePath>,
) -> Result<Vec<Vec<String>>> {
let codeowners_matchers = Arc::new(codeowners_matchers);
let mut results = vec![None; to_associate.len()];
let mut tasks = Vec::new();

for (i, bundle_upload_id_and_file_path) in to_associate.into_iter().enumerate() {
let codeowners_matchers = Arc::clone(&codeowners_matchers);
let task = task::spawn(async move {
(
i,
associate_codeowners(codeowners_matchers, bundle_upload_id_and_file_path).await,
)
});
tasks.push(task);
}

for task in tasks {
let (i, result) = task.await?;
results[i] = Some(result);
}

Ok(results.into_iter().flatten().collect())
}
Copy link
Member

@dfrankland dfrankland Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very similar feedback to the above. BundleId isn't related to codeowners, so we shouldn't have any logic related to it here:

Suggested change
pub async fn associate_codeowners_multithreaded(
codeowners_matchers: HashMap<String, Option<Owners>>,
to_associate: Vec<BundleUploadIDAndFilePath>,
) -> Result<Vec<Vec<String>>> {
let codeowners_matchers = Arc::new(codeowners_matchers);
let mut results = vec![None; to_associate.len()];
let mut tasks = Vec::new();
for (i, bundle_upload_id_and_file_path) in to_associate.into_iter().enumerate() {
let codeowners_matchers = Arc::clone(&codeowners_matchers);
let task = task::spawn(async move {
(
i,
associate_codeowners(codeowners_matchers, bundle_upload_id_and_file_path).await,
)
});
tasks.push(task);
}
for task in tasks {
let (i, result) = task.await?;
results[i] = Some(result);
}
Ok(results.into_iter().flatten().collect())
}
pub async fn associate_codeowners_multithreaded<T: AsRef<Path>>(
to_associate: Vec<(Owners, T)>,
) -> Result<Vec<Vec<String>>> {
let tasks = to_associate.iter().enumerate().map(|(i, (owners, file))| {
task::spawn(async move {
(i, associate_codeowners(&owners, file))
})
}).collect::<Vec<_>>();
let mut results = Vec::with_capacity(tasks.len());
for task in tasks {
let (i, result) = task.await?;
results[i] = result;
}
Ok(results.into_iter().flatten().collect())
}

Comment on lines +194 to +216
#[gen_stub_pyfunction]
#[pyfunction]
fn parse_many_codeowners_n_threads(
to_parse: Vec<Option<Vec<u8>>>,
num_threads: usize,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
.enable_all()
.build()?;
parse_many_codeowners_multithreaded_impl(rt, to_parse)
}

#[gen_stub_pyfunction]
#[pyfunction]
fn parse_many_codeowners_multithreaded(
to_parse: Vec<Option<Vec<u8>>>,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
parse_many_codeowners_multithreaded_impl(rt, to_parse)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine these by making num_threads Option<usize>? Same with the other similar function below

Comment on lines +218 to +229
fn parse_many_codeowners_multithreaded_impl(
rt: tokio::runtime::Runtime,
to_parse: Vec<Option<Vec<u8>>>,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let parsed_codeowners = rt
.block_on(CodeOwners::parse_many_multithreaded(to_parse))
.map_err(|err| PyTypeError::new_err(err.to_string()))?;
Ok(parsed_codeowners
.into_iter()
.map(|codeowners| codeowners.and_then(|codeowners| codeowners.owners.map(BindingsOwners)))
.collect())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this top-level is where I would implement the logic related to how the data is passed from elsewhere. It can follow the very familiar pattern that we've seen in the other codeowners crate. Same goes for associate_codeowners_multithreaded_impl below

Suggested change
fn parse_many_codeowners_multithreaded_impl(
rt: tokio::runtime::Runtime,
to_parse: Vec<Option<Vec<u8>>>,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let parsed_codeowners = rt
.block_on(CodeOwners::parse_many_multithreaded(to_parse))
.map_err(|err| PyTypeError::new_err(err.to_string()))?;
Ok(parsed_codeowners
.into_iter()
.map(|codeowners| codeowners.and_then(|codeowners| codeowners.owners.map(BindingsOwners)))
.collect())
}
fn parse_many_codeowners_multithreaded_impl(
rt: tokio::runtime::Runtime,
to_parse: Vec<Option<Vec<u8>>>,
) -> PyResult<Vec<Option<BindingsOwners>>> {
let to_parse_len = to_parse.len();
let parsed_indexes = to_parse
.iter()
.enumerate()
.filter_map(|(i, bytes)| -> Option<usize> {
bytes.map(|| i)
})
.collect::<Vec<_>>();
let parsed_codeowners = rt
.block_on(
CodeOwners::parse_many_multithreaded(
to_parse
.into_iter()
.filter_map(|tp| tp)
.collect()
)
)
.map_err(|err| PyTypeError::new_err(err.to_string()))?;
let mut results: Vec<Option<BindingsOwners>> = vec![None; to_parse_len];
for (i, codeowners) in parsed_codeowners.into_iter().enumerate() {
results[parsed_indexes[i]] = Some(BindingsOwners(codeowners.owners));
}
Ok(results)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

4 participants