From e16f835131204dfb5040e2342739b86b7b60edef Mon Sep 17 00:00:00 2001 From: Michael Lazear Date: Wed, 29 Nov 2023 15:18:06 -0800 Subject: [PATCH] fix: make psm_id required field, add to all outputs, change ordinal - fix: int32 -> int64 for psm_id in parquet --- crates/sage-cli/src/main.rs | 3 +- crates/sage-cli/src/output.rs | 31 ++++--------- crates/sage-cloudpath/src/parquet.rs | 69 ++++++++++------------------ crates/sage/src/scoring.rs | 21 +++++---- 4 files changed, 46 insertions(+), 78 deletions(-) diff --git a/crates/sage-cli/src/main.rs b/crates/sage-cli/src/main.rs index 005c179..add4c5c 100644 --- a/crates/sage-cli/src/main.rs +++ b/crates/sage-cli/src/main.rs @@ -330,7 +330,6 @@ impl Runner { &outputs.quant, &filenames, &self.database, - &self.parameters.annotate_matches, )?; let path = self.make_path("results.sage.parquet"); @@ -340,7 +339,7 @@ impl Runner { if self.parameters.annotate_matches { let bytes = sage_cloudpath::parquet::serialize_matched_fragments(&outputs.features)?; - let path = self.make_path("results.matched.fragments.sage.parquet"); + let path = self.make_path("matched_fragments.sage.parquet"); path.write_bytes_sync(bytes)?; self.parameters.output_paths.push(path.to_string()); } diff --git a/crates/sage-cli/src/output.rs b/crates/sage-cli/src/output.rs index ea52d74..7289bf7 100644 --- a/crates/sage-cli/src/output.rs +++ b/crates/sage-cli/src/output.rs @@ -16,13 +16,7 @@ impl Runner { pub fn serialize_feature(&self, feature: &Feature, filenames: &[String]) -> csv::ByteRecord { let mut record = csv::ByteRecord::new(); - if self.parameters.annotate_matches { - record.push_field( - itoa::Buffer::new() - .format(feature.psm_id.unwrap_or_default()) - .as_bytes(), - ); - } + record.push_field(itoa::Buffer::new().format(feature.psm_id).as_bytes()); let peptide = &self.database[feature.peptide_idx]; record.push_field(peptide.to_string().as_bytes()); @@ -99,7 +93,7 @@ impl Runner { pub fn serialize_fragments( &self, - psm_id: &Option, + psm_id: usize, fragments_: &Option, ) -> Vec { let mut frag_records = vec![]; @@ -107,11 +101,7 @@ impl Runner { if let Some(fragments) = fragments_ { for id in 0..fragments.fragment_ordinals.len() { let mut record = ByteRecord::new(); - record.push_field( - itoa::Buffer::new() - .format(psm_id.unwrap_or_default()) - .as_bytes(), - ); + record.push_field(itoa::Buffer::new().format(psm_id).as_bytes()); let ion_type = match fragments.kinds[id] { Kind::A => "a", Kind::B => "b", @@ -160,7 +150,8 @@ impl Runner { .delimiter(b'\t') .from_writer(vec![]); - let mut csv_headers = vec![ + let csv_headers = vec![ + "psm_id", "peptide", "proteins", "num_proteins", @@ -200,10 +191,6 @@ impl Runner { "ms2_intensity", ]; - if self.parameters.annotate_matches { - csv_headers.insert(0, "psm_id"); - } - let headers = csv::ByteRecord::from(csv_headers); wtr.write_byte_record(&headers)?; @@ -242,7 +229,7 @@ impl Runner { for record in features .into_par_iter() - .map(|feat| self.serialize_fragments(&feat.psm_id, &feat.fragments)) + .map(|feat| self.serialize_fragments(feat.psm_id, &feat.fragments)) .flatten() .collect::>() { @@ -258,7 +245,6 @@ impl Runner { fn serialize_pin( &self, re: ®ex::Regex, - idx: usize, feature: &Feature, filenames: &[String], ) -> csv::ByteRecord { @@ -270,7 +256,7 @@ impl Runner { let mut record = csv::ByteRecord::new(); let peptide = &self.database[feature.peptide_idx]; - record.push_field(itoa::Buffer::new().format(idx).as_bytes()); + record.push_field(itoa::Buffer::new().format(feature.psm_id).as_bytes()); record.push_field(itoa::Buffer::new().format(feature.label).as_bytes()); record.push_field(scannr.as_bytes()); record.push_field(ryu::Buffer::new().format(feature.expmass).as_bytes()); @@ -436,8 +422,7 @@ impl Runner { wtr.write_byte_record(&headers)?; for record in features .into_par_iter() - .enumerate() - .map(|(idx, feat)| self.serialize_pin(&re, idx, feat, filenames)) + .map(|feat| self.serialize_pin(&re, feat, filenames)) .collect::>() { wtr.write_byte_record(&record)?; diff --git a/crates/sage-cloudpath/src/parquet.rs b/crates/sage-cloudpath/src/parquet.rs index a16df1b..b01262b 100644 --- a/crates/sage-cloudpath/src/parquet.rs +++ b/crates/sage-cloudpath/src/parquet.rs @@ -11,7 +11,7 @@ use std::collections::HashMap; use std::hash::BuildHasher; -use parquet::data_type::{BoolType, ByteArray, FloatType}; +use parquet::data_type::{BoolType, ByteArray, FloatType, Int64Type}; use parquet::file::writer::SerializedColumnWriter; use parquet::{ basic::ZstdLevel, @@ -25,11 +25,10 @@ use sage_core::lfq::{Peak, PrecursorId}; use sage_core::scoring::Feature; use sage_core::tmt::TmtQuant; -pub fn build_schema(is_psm_id_enable: &bool) -> Result { - let msg_header = "message schema {"; - let msg_footer = "}"; - let psm_id_def = "required int32 psm_id;"; - let psm_default_header = r#" +pub fn build_schema() -> Result { + let msg = r#" + message schema { + required int64 psm_id; required byte_array filename (utf8); required byte_array scannr (utf8); required byte_array peptide (utf8); @@ -70,15 +69,8 @@ pub fn build_schema(is_psm_id_enable: &bool) -> Result format!( - "{} \n {} \n {} \n {}", - msg_header, psm_id_def, psm_default_header, msg_footer - ), - false => format!("{} \n {} \n {}", msg_header, psm_default_header, msg_footer), - }; - parquet::schema::parser::parse_message_type(&msg) } @@ -132,9 +124,8 @@ pub fn serialize_features( reporter_ions: &[TmtQuant], filenames: &[String], database: &IndexedDatabase, - is_psm_id_enable: &bool, ) -> Result, parquet::errors::ParquetError> { - let schema = build_schema(is_psm_id_enable)?; + let schema = build_schema()?; let options = WriterProperties::builder() .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(3)?)) @@ -171,10 +162,7 @@ pub fn serialize_features( }; } - if *is_psm_id_enable { - write_col!(|f: &Feature| f.psm_id.unwrap_or_default() as i32, Int32Type); - } - + write_col!(|f: &Feature| f.psm_id as i64, Int64Type); write_col!( |f: &Feature| filenames[f.file_id].as_str().into(), ByteArrayType @@ -245,7 +233,7 @@ pub fn serialize_features( pub fn build_matched_fragment_schema() -> parquet::errors::Result { let msg = r#" message schema { - required int32 psm_id; + required int64 psm_id; required byte_array fragment_type (utf8); required int32 fragment_ordinals; required int32 fragment_charge; @@ -278,27 +266,25 @@ pub fn serialize_matched_fragments( let psm_ids = features .iter() .map(|f| { - vec![ - f.psm_id.unwrap_or_default() as i32; + std::iter::repeat(f.psm_id as i64).take( f.fragments .as_ref() .map(|fragments| fragments.fragment_ordinals.len()) - .unwrap_or_default() - ] + .unwrap_or_default(), + ) }) .flatten() .collect::>(); - col.typed::().write_batch(&psm_ids, None, None)?; + col.typed::().write_batch(&psm_ids, None, None)?; col.close()?; } if let Some(mut col) = rg.next_column()? { let fragment_types = features .iter() - .flat_map(|future| { - future - .fragments + .flat_map(|f| { + f.fragments .as_ref() .map(|fragments| fragments.kinds.iter().copied()) }) @@ -321,9 +307,8 @@ pub fn serialize_matched_fragments( if let Some(mut col) = rg.next_column()? { let fragment_ordinals = features .iter() - .flat_map(|future| { - future - .fragments + .flat_map(|f| { + f.fragments .as_ref() .map(|fragments| fragments.fragment_ordinals.iter().copied()) }) @@ -338,9 +323,8 @@ pub fn serialize_matched_fragments( if let Some(mut col) = rg.next_column()? { let fragment_charge = features .iter() - .flat_map(|future| { - future - .fragments + .flat_map(|f| { + f.fragments .as_ref() .map(|fragments| fragments.charges.iter().copied()) }) @@ -355,9 +339,8 @@ pub fn serialize_matched_fragments( if let Some(mut col) = rg.next_column()? { let fragment_mz_experimental = features .iter() - .flat_map(|future| { - future - .fragments + .flat_map(|f| { + f.fragments .as_ref() .map(|fragments| fragments.mz_experimental.iter().copied()) }) @@ -372,9 +355,8 @@ pub fn serialize_matched_fragments( if let Some(mut col) = rg.next_column()? { let fragment_mz_calculated = features .iter() - .flat_map(|future| { - future - .fragments + .flat_map(|f| { + f.fragments .as_ref() .map(|fragments| fragments.mz_calculated.iter().copied()) }) @@ -389,9 +371,8 @@ pub fn serialize_matched_fragments( if let Some(mut col) = rg.next_column()? { let fragment_intensity = features .iter() - .flat_map(|future| { - future - .fragments + .flat_map(|f| { + f.fragments .as_ref() .map(|fragments| fragments.intensities.iter().copied()) }) diff --git a/crates/sage/src/scoring.rs b/crates/sage/src/scoring.rs index 213fd99..bc6b743 100644 --- a/crates/sage/src/scoring.rs +++ b/crates/sage/src/scoring.rs @@ -55,9 +55,9 @@ impl AddAssign for InitialHits { /// Features of a candidate peptide spectrum match pub struct Feature { #[serde(skip_serializing)] - // psm_id help to match with matched fragments table. - pub psm_id: Option, pub peptide_idx: PeptideIx, + // psm_id help to match with matched fragments table. + pub psm_id: usize, pub peptide_len: usize, /// Spectrum id pub spec_id: String, @@ -411,12 +411,9 @@ impl<'db> Scorer<'db> { for idx in 0..report_psms.min(score_vector.len()) { let score = score_vector[idx].0; - let mut fragments: Option = None; - let mut psm_id: Option = None; - if self.annotate_matches { - mem::swap(&mut score_vector[idx].1, &mut fragments); - psm_id = Option::from(increment_psm_counter()); - } + let fragments: Option = score_vector[idx].1.take(); + let psm_id = increment_psm_counter(); + let peptide = &self.db[score.peptide]; let precursor_mass = mz * score.precursor_charge as f32; @@ -616,11 +613,17 @@ impl<'db> Scorer<'db> { } if self.annotate_matches { + let idx = match frag.kind { + Kind::A | Kind::B | Kind::C => idx as i32 + 1, + Kind::X | Kind::Y | Kind::Z => { + peptide.sequence.len().saturating_sub(1) as i32 - idx as i32 + } + }; fragments_details.kinds.push(frag.kind); fragments_details.charges.push(charge as i32); fragments_details.mz_experimental.push(exp_mz); fragments_details.mz_calculated.push(calc_mz); - fragments_details.fragment_ordinals.push(idx as i32); + fragments_details.fragment_ordinals.push(idx); fragments_details.intensities.push(peak.intensity); } }