Skip to content

Commit

Permalink
fix: make psm_id required field, add to all outputs, change ordinal
Browse files Browse the repository at this point in the history
- fix: int32 -> int64 for psm_id in parquet
  • Loading branch information
lazear committed Nov 29, 2023
1 parent 0e4498c commit e16f835
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 78 deletions.
3 changes: 1 addition & 2 deletions crates/sage-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ impl Runner {
&outputs.quant,
&filenames,
&self.database,
&self.parameters.annotate_matches,
)?;

let path = self.make_path("results.sage.parquet");
Expand All @@ -340,7 +339,7 @@ impl Runner {
if self.parameters.annotate_matches {
let bytes =
sage_cloudpath::parquet::serialize_matched_fragments(&outputs.features)?;
let path = self.make_path("results.matched.fragments.sage.parquet");
let path = self.make_path("matched_fragments.sage.parquet");
path.write_bytes_sync(bytes)?;
self.parameters.output_paths.push(path.to_string());
}
Expand Down
31 changes: 8 additions & 23 deletions crates/sage-cli/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@ impl Runner {
pub fn serialize_feature(&self, feature: &Feature, filenames: &[String]) -> csv::ByteRecord {
let mut record = csv::ByteRecord::new();

if self.parameters.annotate_matches {
record.push_field(
itoa::Buffer::new()
.format(feature.psm_id.unwrap_or_default())
.as_bytes(),
);
}
record.push_field(itoa::Buffer::new().format(feature.psm_id).as_bytes());

let peptide = &self.database[feature.peptide_idx];
record.push_field(peptide.to_string().as_bytes());
Expand Down Expand Up @@ -99,19 +93,15 @@ impl Runner {

pub fn serialize_fragments(
&self,
psm_id: &Option<usize>,
psm_id: usize,
fragments_: &Option<Fragments>,
) -> Vec<ByteRecord> {
let mut frag_records = vec![];

if let Some(fragments) = fragments_ {
for id in 0..fragments.fragment_ordinals.len() {
let mut record = ByteRecord::new();
record.push_field(
itoa::Buffer::new()
.format(psm_id.unwrap_or_default())
.as_bytes(),
);
record.push_field(itoa::Buffer::new().format(psm_id).as_bytes());
let ion_type = match fragments.kinds[id] {
Kind::A => "a",
Kind::B => "b",
Expand Down Expand Up @@ -160,7 +150,8 @@ impl Runner {
.delimiter(b'\t')
.from_writer(vec![]);

let mut csv_headers = vec![
let csv_headers = vec![
"psm_id",
"peptide",
"proteins",
"num_proteins",
Expand Down Expand Up @@ -200,10 +191,6 @@ impl Runner {
"ms2_intensity",
];

if self.parameters.annotate_matches {
csv_headers.insert(0, "psm_id");
}

let headers = csv::ByteRecord::from(csv_headers);

wtr.write_byte_record(&headers)?;
Expand Down Expand Up @@ -242,7 +229,7 @@ impl Runner {

for record in features
.into_par_iter()
.map(|feat| self.serialize_fragments(&feat.psm_id, &feat.fragments))
.map(|feat| self.serialize_fragments(feat.psm_id, &feat.fragments))
.flatten()
.collect::<Vec<_>>()
{
Expand All @@ -258,7 +245,6 @@ impl Runner {
fn serialize_pin(
&self,
re: &regex::Regex,
idx: usize,
feature: &Feature,
filenames: &[String],
) -> csv::ByteRecord {
Expand All @@ -270,7 +256,7 @@ impl Runner {

let mut record = csv::ByteRecord::new();
let peptide = &self.database[feature.peptide_idx];
record.push_field(itoa::Buffer::new().format(idx).as_bytes());
record.push_field(itoa::Buffer::new().format(feature.psm_id).as_bytes());
record.push_field(itoa::Buffer::new().format(feature.label).as_bytes());
record.push_field(scannr.as_bytes());
record.push_field(ryu::Buffer::new().format(feature.expmass).as_bytes());
Expand Down Expand Up @@ -436,8 +422,7 @@ impl Runner {
wtr.write_byte_record(&headers)?;
for record in features
.into_par_iter()
.enumerate()
.map(|(idx, feat)| self.serialize_pin(&re, idx, feat, filenames))
.map(|feat| self.serialize_pin(&re, feat, filenames))
.collect::<Vec<_>>()
{
wtr.write_byte_record(&record)?;
Expand Down
69 changes: 25 additions & 44 deletions crates/sage-cloudpath/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
use std::collections::HashMap;
use std::hash::BuildHasher;

use parquet::data_type::{BoolType, ByteArray, FloatType};
use parquet::data_type::{BoolType, ByteArray, FloatType, Int64Type};
use parquet::file::writer::SerializedColumnWriter;
use parquet::{
basic::ZstdLevel,
Expand All @@ -25,11 +25,10 @@ use sage_core::lfq::{Peak, PrecursorId};
use sage_core::scoring::Feature;
use sage_core::tmt::TmtQuant;

pub fn build_schema(is_psm_id_enable: &bool) -> Result<Type, parquet::errors::ParquetError> {
let msg_header = "message schema {";
let msg_footer = "}";
let psm_id_def = "required int32 psm_id;";
let psm_default_header = r#"
pub fn build_schema() -> Result<Type, parquet::errors::ParquetError> {
let msg = r#"
message schema {
required int64 psm_id;
required byte_array filename (utf8);
required byte_array scannr (utf8);
required byte_array peptide (utf8);
Expand Down Expand Up @@ -70,15 +69,8 @@ pub fn build_schema(is_psm_id_enable: &bool) -> Result<Type, parquet::errors::Pa
optional float element;
}
}
}
"#;
let msg = match is_psm_id_enable {
true => format!(
"{} \n {} \n {} \n {}",
msg_header, psm_id_def, psm_default_header, msg_footer
),
false => format!("{} \n {} \n {}", msg_header, psm_default_header, msg_footer),
};

parquet::schema::parser::parse_message_type(&msg)
}

Expand Down Expand Up @@ -132,9 +124,8 @@ pub fn serialize_features(
reporter_ions: &[TmtQuant],
filenames: &[String],
database: &IndexedDatabase,
is_psm_id_enable: &bool,
) -> Result<Vec<u8>, parquet::errors::ParquetError> {
let schema = build_schema(is_psm_id_enable)?;
let schema = build_schema()?;

let options = WriterProperties::builder()
.set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(3)?))
Expand Down Expand Up @@ -171,10 +162,7 @@ pub fn serialize_features(
};
}

if *is_psm_id_enable {
write_col!(|f: &Feature| f.psm_id.unwrap_or_default() as i32, Int32Type);
}

write_col!(|f: &Feature| f.psm_id as i64, Int64Type);
write_col!(
|f: &Feature| filenames[f.file_id].as_str().into(),
ByteArrayType
Expand Down Expand Up @@ -245,7 +233,7 @@ pub fn serialize_features(
pub fn build_matched_fragment_schema() -> parquet::errors::Result<Type> {
let msg = r#"
message schema {
required int32 psm_id;
required int64 psm_id;
required byte_array fragment_type (utf8);
required int32 fragment_ordinals;
required int32 fragment_charge;
Expand Down Expand Up @@ -278,27 +266,25 @@ pub fn serialize_matched_fragments(
let psm_ids = features
.iter()
.map(|f| {
vec![
f.psm_id.unwrap_or_default() as i32;
std::iter::repeat(f.psm_id as i64).take(
f.fragments
.as_ref()
.map(|fragments| fragments.fragment_ordinals.len())
.unwrap_or_default()
]
.unwrap_or_default(),
)
})
.flatten()
.collect::<Vec<_>>();

col.typed::<Int32Type>().write_batch(&psm_ids, None, None)?;
col.typed::<Int64Type>().write_batch(&psm_ids, None, None)?;
col.close()?;
}

if let Some(mut col) = rg.next_column()? {
let fragment_types = features
.iter()
.flat_map(|future| {
future
.fragments
.flat_map(|f| {
f.fragments
.as_ref()
.map(|fragments| fragments.kinds.iter().copied())
})
Expand All @@ -321,9 +307,8 @@ pub fn serialize_matched_fragments(
if let Some(mut col) = rg.next_column()? {
let fragment_ordinals = features
.iter()
.flat_map(|future| {
future
.fragments
.flat_map(|f| {
f.fragments
.as_ref()
.map(|fragments| fragments.fragment_ordinals.iter().copied())
})
Expand All @@ -338,9 +323,8 @@ pub fn serialize_matched_fragments(
if let Some(mut col) = rg.next_column()? {
let fragment_charge = features
.iter()
.flat_map(|future| {
future
.fragments
.flat_map(|f| {
f.fragments
.as_ref()
.map(|fragments| fragments.charges.iter().copied())
})
Expand All @@ -355,9 +339,8 @@ pub fn serialize_matched_fragments(
if let Some(mut col) = rg.next_column()? {
let fragment_mz_experimental = features
.iter()
.flat_map(|future| {
future
.fragments
.flat_map(|f| {
f.fragments
.as_ref()
.map(|fragments| fragments.mz_experimental.iter().copied())
})
Expand All @@ -372,9 +355,8 @@ pub fn serialize_matched_fragments(
if let Some(mut col) = rg.next_column()? {
let fragment_mz_calculated = features
.iter()
.flat_map(|future| {
future
.fragments
.flat_map(|f| {
f.fragments
.as_ref()
.map(|fragments| fragments.mz_calculated.iter().copied())
})
Expand All @@ -389,9 +371,8 @@ pub fn serialize_matched_fragments(
if let Some(mut col) = rg.next_column()? {
let fragment_intensity = features
.iter()
.flat_map(|future| {
future
.fragments
.flat_map(|f| {
f.fragments
.as_ref()
.map(|fragments| fragments.intensities.iter().copied())
})
Expand Down
21 changes: 12 additions & 9 deletions crates/sage/src/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ impl AddAssign<InitialHits> for InitialHits {
/// Features of a candidate peptide spectrum match
pub struct Feature {
#[serde(skip_serializing)]
// psm_id help to match with matched fragments table.
pub psm_id: Option<usize>,
pub peptide_idx: PeptideIx,
// psm_id help to match with matched fragments table.
pub psm_id: usize,
pub peptide_len: usize,
/// Spectrum id
pub spec_id: String,
Expand Down Expand Up @@ -411,12 +411,9 @@ impl<'db> Scorer<'db> {

for idx in 0..report_psms.min(score_vector.len()) {
let score = score_vector[idx].0;
let mut fragments: Option<Fragments> = None;
let mut psm_id: Option<usize> = None;
if self.annotate_matches {
mem::swap(&mut score_vector[idx].1, &mut fragments);
psm_id = Option::from(increment_psm_counter());
}
let fragments: Option<Fragments> = score_vector[idx].1.take();
let psm_id = increment_psm_counter();

let peptide = &self.db[score.peptide];
let precursor_mass = mz * score.precursor_charge as f32;

Expand Down Expand Up @@ -616,11 +613,17 @@ impl<'db> Scorer<'db> {
}

if self.annotate_matches {
let idx = match frag.kind {
Kind::A | Kind::B | Kind::C => idx as i32 + 1,
Kind::X | Kind::Y | Kind::Z => {
peptide.sequence.len().saturating_sub(1) as i32 - idx as i32
}
};
fragments_details.kinds.push(frag.kind);
fragments_details.charges.push(charge as i32);
fragments_details.mz_experimental.push(exp_mz);
fragments_details.mz_calculated.push(calc_mz);
fragments_details.fragment_ordinals.push(idx as i32);
fragments_details.fragment_ordinals.push(idx);
fragments_details.intensities.push(peak.intensity);
}
}
Expand Down

0 comments on commit e16f835

Please sign in to comment.