Source code for ms2pip.core

#!/usr/bin/env python
from __future__ import annotations

import itertools
import logging
import multiprocessing
import multiprocessing.dummy
import re
from collections import defaultdict
from math import ceil
from pathlib import Path
from typing import Any, Callable, Generator, Iterable, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from psm_utils import PSM, Peptidoform, PSMList
from rich.progress import track

import ms2pip.exceptions as exceptions
from ms2pip import spectrum_output
from ms2pip._cython_modules import ms2pip_pyx
from ms2pip._utils.encoder import Encoder
from ms2pip._utils.feature_names import get_feature_names
from ms2pip._utils.psm_input import read_psms
from ms2pip._utils.retention_time import RetentionTime
from ms2pip._utils.ion_mobility import IonMobility
from ms2pip._utils.xgb_models import get_predictions_xgb, validate_requested_xgb_model
from ms2pip.constants import MODELS
from ms2pip.result import ProcessingResult, calculate_correlations
from ms2pip.search_space import ProteomeSearchSpace
from ms2pip.spectrum_input import read_spectrum_file
from ms2pip.spectrum_output import SUPPORTED_FORMATS

logger = logging.getLogger(__name__)


[docs]def predict_single( peptidoform: Union[Peptidoform, str], model: Optional[str] = "HCD", model_dir: Optional[Union[str, Path]] = None, ) -> ProcessingResult: """ Predict fragmentation spectrum for a single peptide.\f """ if isinstance(peptidoform, str): peptidoform = Peptidoform(peptidoform) psm = PSM(peptidoform=peptidoform, spectrum_id=0) model_dir = model_dir if model_dir else Path.home() / ".ms2pip" ion_types = [it.lower() for it in MODELS[model]["ion_types"]] with Encoder.from_peptidoform(peptidoform) as encoder: ms2pip_pyx.ms2pip_init(*encoder.encoder_files) result = _process_peptidoform(0, psm, model, encoder, ion_types=ion_types) if "xgboost_model_files" in MODELS[model].keys(): validate_requested_xgb_model( MODELS[model]["xgboost_model_files"], MODELS[model]["model_hash"], model_dir, ) predictions = np.array( get_predictions_xgb( result.feature_vectors, [len(peptidoform.parsed_sequence) - 1], MODELS[model], model_dir, ) ) result.predicted_intensity = predictions[0] # Only one spectrum in predictions result.feature_vectors = None return result
[docs]def predict_batch( psms: Union[PSMList, str, Path], add_retention_time: bool = False, add_ion_mobility: bool = False, psm_filetype: Optional[str] = None, model: Optional[str] = "HCD", model_dir: Optional[Union[str, Path]] = None, processes: Optional[int] = None, ) -> List[ProcessingResult]: """ Predict fragmentation spectra for a batch of peptides.\f Parameters ---------- psms PSMList or path to PSM file that is supported by psm_utils. psm_filetype Filetype of the PSM file. By default, None. Should be one of the supported psm_utils filetypes. See https://psm-utils.readthedocs.io/en/stable/#supported-file-formats. add_retention_time Add retention time predictions with DeepLC (Requires optional DeepLC dependency). add_ion_mobility Add ion mobility predictions with IM2Deep (Requires optional IM2Deep dependency). model Model to use for prediction. Default: "HCD". model_dir Directory where XGBoost model files are stored. Default: `~/.ms2pip`. processes Number of parallel processes for multiprocessing steps. By default, all available. Returns ------- predictions: List[ProcessingResult] Predicted spectra with theoretical m/z and predicted intensity values. """ if isinstance(psms, list): psms = PSMList(psm_list=psms) psm_list = read_psms(psms, filetype=psm_filetype) if add_retention_time: logger.info("Adding retention time predictions") rt_predictor = RetentionTime(processes=processes) rt_predictor.add_rt_predictions(psm_list) if add_ion_mobility: logger.info("Adding ion mobility predictions") im_predictor = IonMobility(processes=processes) im_predictor.add_im_predictions(psm_list) with Encoder.from_psm_list(psm_list) as encoder: ms2pip_parallelized = _Parallelized( encoder=encoder, model=model, model_dir=model_dir, processes=processes, ) logger.info("Processing peptides...") results = ms2pip_parallelized.process_peptides(psm_list) return results
[docs]def predict_library( fasta_file: Optional[Union[str, Path]] = None, config: Optional[Union[ProteomeSearchSpace, dict, str, Path]] = None, add_retention_time: bool = False, add_ion_mobility: bool = False, model: Optional[str] = "HCD", model_dir: Optional[Union[str, Path]] = None, batch_size: int = 100000, processes: Optional[int] = None, ) -> Generator[ProcessingResult, None, None]: """ Predict spectral library from protein FASTA file.\f Parameters ---------- fasta_file Path to FASTA file with protein sequences. Required if `search-space-config` is not provided. config ProteomeSearchSpace, or a dictionary or path to JSON file with proteome search space parameters. Required if `fasta_file` is not provided. add_retention_time Add retention time predictions with DeepLC (Requires optional DeepLC dependency). add_ion_mobility Add ion mobility predictions with IM2Deep (Requires optional IM2Deep dependency). model Model to use for prediction. Default: "HCD". model_dir Directory where XGBoost model files are stored. Default: `~/.ms2pip`. batch_size Number of peptides to process in each batch. processes Number of parallel processes for multiprocessing steps. By default, all available. Yields ------ predictions: List[ProcessingResult] Predicted spectra with theoretical m/z and predicted intensity values. """ if fasta_file and config: # Use provided proteome, but overwrite fasta_file config = ProteomeSearchSpace.from_any(config) config.fasta_file = fasta_file elif fasta_file and not config: # Default proteome search space with provided fasta_file config = ProteomeSearchSpace(fasta_file=fasta_file) elif not fasta_file and config: # Use provided proteome config = ProteomeSearchSpace.from_any(config) else: raise ValueError("Either `fasta_file` or `config` must be provided.") search_space = ProteomeSearchSpace.from_any(config) search_space.build() for batch in track( _into_batches(search_space, batch_size=batch_size), description="Predicting spectra...", total=ceil(len(search_space) / batch_size), ): logging.disable(logging.CRITICAL) yield predict_batch( search_space.filter_psms_by_mz(PSMList(psm_list=list(batch))), add_retention_time=add_retention_time, add_ion_mobility=add_ion_mobility, model=model, model_dir=model_dir, processes=processes, ) logging.disable(logging.NOTSET)
[docs]def correlate( psms: Union[PSMList, str, Path], spectrum_file: Union[str, Path], psm_filetype: Optional[str] = None, spectrum_id_pattern: Optional[str] = None, compute_correlations: bool = False, add_retention_time: bool = False, add_ion_mobility: bool = False, model: Optional[str] = "HCD", model_dir: Optional[Union[str, Path]] = None, ms2_tolerance: float = 0.02, processes: Optional[int] = None, ) -> List[ProcessingResult]: """ Compare predicted and observed intensities and optionally compute correlations.\f Parameters ---------- psms PSMList or path to PSM file that is supported by psm_utils. spectrum_file Path to spectrum file with target intensities. psm_filetype Filetype of the PSM file. By default, None. Should be one of the supported psm_utils filetypes. See https://psm-utils.readthedocs.io/en/stable/#supported-file-formats. spectrum_id_pattern Regular expression pattern to apply to spectrum titles before matching to peptide file ``spec_id`` entries. compute_correlations Compute correlations between predictions and targets. add_retention_time Add retention time predictions with DeepLC (Requires optional DeepLC dependency). add_ion_mobility Add ion mobility predictions with IM2Deep (Requires optional IM2Deep dependency). model Model to use for prediction. Default: "HCD". model_dir Directory where XGBoost model files are stored. Default: `~/.ms2pip`. ms2_tolerance MS2 tolerance in Da for observed spectrum peak annotation. By default, 0.02 Da. processes Number of parallel processes for multiprocessing steps. By default, all available. Returns ------- results: List[ProcessingResult] Predicted spectra with theoretical m/z and predicted intensity values, and optionally, correlations. """ psm_list = read_psms(psms, filetype=psm_filetype) spectrum_id_pattern = spectrum_id_pattern if spectrum_id_pattern else "(.*)" if add_retention_time: logger.info("Adding retention time predictions") rt_predictor = RetentionTime(processes=processes) rt_predictor.add_rt_predictions(psm_list) if add_ion_mobility: logger.info("Adding ion mobility predictions") im_predictor = IonMobility(processes=processes) im_predictor.add_im_predictions(psm_list) with Encoder.from_psm_list(psm_list) as encoder: ms2pip_parallelized = _Parallelized( encoder=encoder, model=model, model_dir=model_dir, ms2_tolerance=ms2_tolerance, processes=processes, ) logger.info("Processing spectra and peptides...") results = ms2pip_parallelized.process_spectra(psm_list, spectrum_file, spectrum_id_pattern) # Correlations also requested if compute_correlations: logger.info("Computing correlations") calculate_correlations(results) logger.info(f"Median correlation: {np.median(list(r.correlation for r in results))}") return results
[docs]def get_training_data( psms: Union[PSMList, str, Path], spectrum_file: Union[str, Path], psm_filetype: Optional[str] = None, spectrum_id_pattern: Optional[str] = None, model: Optional[str] = "HCD", ms2_tolerance: float = 0.02, processes: Optional[int] = None, ): """ Extract feature vectors and target intensities from observed spectra for training.\f Parameters ---------- psms PSMList or path to PSM file that is supported by psm_utils. spectrum_file Path to spectrum file with target intensities. psm_filetype Filetype of the PSM file. By default, None. Should be one of the supported psm_utils filetypes. See https://psm-utils.readthedocs.io/en/stable/#supported-file-formats. spectrum_id_pattern Regular expression pattern to apply to spectrum titles before matching to peptide file ``spec_id`` entries. model Model to use as reference for the ion types that are extracted from the observed spectra. Default: "HCD", which results in the extraction of singly charged b- and y-ions. ms2_tolerance MS2 tolerance in Da for observed spectrum peak annotation. By default, 0.02 Da. processes Number of parallel processes for multiprocessing steps. By default, all available. Returns ------- features :py:class:`pandas.DataFrame` with feature vectors and targets. """ psm_list = read_psms(psms, filetype=psm_filetype) spectrum_id_pattern = spectrum_id_pattern if spectrum_id_pattern else "(.*)" with Encoder.from_psm_list(psm_list) as encoder: ms2pip_parallelized = _Parallelized( encoder=encoder, model=model, ms2_tolerance=ms2_tolerance, processes=processes, ) logger.info("Processing spectra and peptides...") results = ms2pip_parallelized.process_spectra( psm_list, spectrum_file, spectrum_id_pattern, vector_file=True ) logger.info("Assembling training data in DataFrame...") training_data = _assemble_training_data(results, model) return training_data
[docs]def annotate_spectra( psms: Union[PSMList, str, Path], spectrum_file: Union[str, Path], psm_filetype: Optional[str] = None, spectrum_id_pattern: Optional[str] = None, model: Optional[str] = "HCD", ms2_tolerance: float = 0.02, processes: Optional[int] = None, ): """ Annotate observed spectra.\f Parameters ---------- psms PSMList or path to PSM file that is supported by psm_utils. spectrum_file Path to spectrum file with target intensities. psm_filetype Filetype of the PSM file. By default, None. Should be one of the supported psm_utils filetypes. See https://psm-utils.readthedocs.io/en/stable/#supported-file-formats. spectrum_id_pattern Regular expression pattern to apply to spectrum titles before matching to peptide file ``spec_id`` entries. model Model to use as reference for the ion types that are extracted from the observed spectra. Default: "HCD", which results in the extraction of singly charged b- and y-ions. ms2_tolerance MS2 tolerance in Da for observed spectrum peak annotation. By default, 0.02 Da. processes Number of parallel processes for multiprocessing steps. By default, all available. Returns ------- results: List[ProcessingResult] List of ProcessingResult objects with theoretical m/z and observed intensity values. """ psm_list = read_psms(psms, filetype=psm_filetype) spectrum_id_pattern = spectrum_id_pattern if spectrum_id_pattern else "(.*)" with Encoder.from_psm_list(psm_list) as encoder: ms2pip_parallelized = _Parallelized( encoder=encoder, model=model, ms2_tolerance=ms2_tolerance, processes=processes, ) logger.info("Processing spectra and peptides...") results = ms2pip_parallelized.process_spectra( psm_list, spectrum_file, spectrum_id_pattern, vector_file=False, annotations_only=True ) return results
[docs]def download_models( models: Optional[List[str]] = None, model_dir: Optional[Union[str, Path]] = None ): """ Download all specified models to the specified directory. Parameters ---------- models List of models to download. If not specified, all models will be downloaded. model_dir Directory where XGBoost model files are to be stored. Default: ``~/.ms2pip``. """ model_dir = model_dir if model_dir else Path.home() / ".ms2pip" model_dir = Path(model_dir).expanduser() model_dir.mkdir(parents=True, exist_ok=True) if not models: models = list(MODELS.keys()) for model in models: try: if "xgb_model_files" in MODELS[model].keys(): continue except KeyError: raise exceptions.UnknownModelError(model) logger.debug("Downloading %s model files", model) validate_requested_xgb_model( MODELS[model]["xgboost_model_files"], MODELS[model]["model_hash"], model_dir, )
class _Parallelized: """Implementations of common multiprocessing functionality across MS²PIP usage modes.""" def __init__( self, encoder: Encoder = None, model: Optional[str] = None, model_dir: Optional[Union[str, Path]] = None, ms2_tolerance: float = 0.02, processes: Optional[int] = None, ): """ Implementations of common multiprocessing functionality across MS²PIP usage modes. Parameters ---------- encoding Configured encoding class instance. Required if input peptides contain modifications. model Name of the model to use for predictions. Overrides configuration file. model_dir Custom directory for downloaded XGBoost model files. By default, `~/.ms2pip` is used. ms2_tolerance MS2 tolerance in Da for observed spectrum peak annotation. By default, 0.02 Da. processes Number of parallel processes for multiprocessing steps. By default, all available. """ # Input parameters self.encoder = encoder self.model = model self.model_dir = model_dir if model_dir else Path.home() / ".ms2pip" self.ms2_tolerance = ms2_tolerance self.processes = processes if processes else multiprocessing.cpu_count() # Setup encoder if not configured if not self.encoder: self.encoder = Encoder() self.encoder.write_encoder_files() # Validate requested model if self.model in MODELS.keys(): logger.debug("Using %s model", self.model) if "xgboost_model_files" in MODELS[self.model].keys(): validate_requested_xgb_model( MODELS[self.model]["xgboost_model_files"], MODELS[self.model]["model_hash"], self.model_dir, ) else: raise exceptions.UnknownModelError(self.model) def _get_pool(self): """Get multiprocessing pool.""" logger.debug(f"Starting workers (processes={self.processes})...") if multiprocessing.current_process().daemon: logger.warn( "MS²PIP is running in a daemon process. Disabling multiprocessing as daemonic " "processes cannot have children." ) return multiprocessing.dummy.Pool(1) elif self.processes == 1: logger.debug("Using dummy multiprocessing pool.") return multiprocessing.dummy.Pool(1) else: return multiprocessing.get_context("spawn").Pool(self.processes) def _validate_output_formats(self, output_formats: List[str]) -> List[str]: """Validate requested output formats.""" if not output_formats: self.output_formats = ["csv"] else: for output_format in output_formats: if output_format not in SUPPORTED_FORMATS: raise exceptions.UnknownOutputFormatError(output_format) self.output_formats = output_formats def _execute_in_pool(self, psm_list: PSMList, func: Callable, args: tuple): """Execute function in multiprocessing pool.""" def get_chunk_size(n_items, n_processes): """Get optimal chunk size for multiprocessing.""" if n_items < 5000: return n_items else: max_chunk_size = 50000 n_chunks = ceil(ceil(n_items / n_processes) / max_chunk_size) * n_processes return ceil(n_items / n_chunks) def to_chunks(_list, chunk_size): """Split _list into chunks of size chunk_size.""" def _generate_chunks(): for i in range(0, len(_list), chunk_size): yield _list[i : i + chunk_size] _list = list(_list) return list(_generate_chunks()) def _enumerated_psm_list_by_spectrum_id(psm_list, spectrum_ids_chunk): selected_indices = np.flatnonzero(np.isin(psm_list["spectrum_id"], spectrum_ids_chunk)) return [(i, psm_list.psm_list[i]) for i in selected_indices] with self._get_pool() as pool: if not psm_list: logger.warning("No PSMs to process.") return [] # Split PSMList into chunks if func == _process_spectra: # Split by spectrum_id to keep PSMs for same spectrum together spectrum_ids = set(psm_list["spectrum_id"]) chunk_size = get_chunk_size(len(spectrum_ids), pool._processes) chunks = [ _enumerated_psm_list_by_spectrum_id(psm_list, spectrum_ids_chunk) for spectrum_ids_chunk in to_chunks(spectrum_ids, chunk_size) ] else: # Simple split by PSM chunk_size = get_chunk_size(len(psm_list), pool._processes) chunks = to_chunks(list(enumerate(psm_list)), chunk_size) logger.debug(f"Processing {len(chunks)} chunk(s) of ~{chunk_size} entries each.") # Add jobs to pool mp_results = [] for psm_list_chunk in chunks: mp_results.append(pool.apply_async(func, args=(psm_list_chunk, *args))) # Gather results # results = [ # r.get() # for r in track( # mp_results, # disable=len(chunks) == 1, # description="Processing chunks...", # transient=True, # show_speed=False, # ) # ] results = [r.get() for r in mp_results] # Sort results by input order results = list( sorted( itertools.chain.from_iterable(results), key=lambda result: result.psm_index, ) ) return results def process_peptides(self, psm_list: PSMList) -> List[ProcessingResult]: """Process peptides in parallel.""" results = self._execute_in_pool( psm_list, _process_peptides, (self.encoder, self.model), ) logger.debug(f"Gathered data for {len(results)} peptides.") # Add XGBoost predictions if required if "xgboost_model_files" in MODELS[self.model].keys(): results = self._add_xgboost_predictions(results) return results def process_spectra( self, psm_list: PSMList, spectrum_file: Union[str, Path], spectrum_id_pattern: str, vector_file: bool = False, annotations_only: bool = False, ) -> List[ProcessingResult]: """ Process PSMs and observed spectra in parallel Parameters ---------- psm_list psm_utils.PSMList instance with PSMs to process spectrum_file Filename of spectrum file spectrum_id_pattern Regular expression pattern to apply to spectrum titles before matching to peptide file entries vector_file If feature vectors should be extracted instead of predictions annotations_only If only peak annotations should be extracted from the spectrum file """ # Validate runs and collections if not len(psm_list.collections) == 1 or not len(psm_list.runs) == 1: raise exceptions.InvalidInputError("PSMs should be for a single run and collection.") args = ( spectrum_file, vector_file, self.encoder, self.model, self.ms2_tolerance, spectrum_id_pattern, annotations_only, ) results = self._execute_in_pool(psm_list, _process_spectra, args) # Validate number of results if not results: raise exceptions.NoMatchingSpectraFound( "No spectra matching spectrum IDs from PSM list could be found in provided file." ) logger.debug(f"Gathered data for {len(results)} PSMs.") # Add XGBoost predictions if required if ( not (vector_file or annotations_only) and "xgboost_model_files" in MODELS[self.model].keys() ): results = self._add_xgboost_predictions(results) return results def _add_xgboost_predictions(self, results: List[ProcessingResult]) -> List[ProcessingResult]: """ Add XGBoost predictions to results. Notes ----- This functions is applied after the parallel processing, as XGBoost implements its own multiprocessing. """ if "xgboost_model_files" not in MODELS[self.model].keys(): raise ValueError("XGBoost model files not found in MODELS dictionary.") logger.debug("Converting feature vectors to XGBoost DMatrix...") import xgboost as xgb results_to_predict = [r for r in results if r.feature_vectors is not None] if not results_to_predict: return results num_ions = [len(r.psm.peptidoform.parsed_sequence) - 1 for r in results_to_predict] xgb_vector = xgb.DMatrix(np.vstack(list(r.feature_vectors for r in results_to_predict))) predictions = get_predictions_xgb( xgb_vector, num_ions, MODELS[self.model], self.model_dir, processes=self.processes, ) logger.debug("Adding XGBoost predictions to results...") for result, preds in zip(results_to_predict, predictions): result.predicted_intensity = preds result.feature_vectors = None return results # TODO IMPLEMENT def write_predictions( self, all_preds: pd.DataFrame, peptides: pd.DataFrame, output_filename: str ): raise NotImplementedError spec_out = spectrum_output.SpectrumOutput( all_preds, peptides, self.params["ms2pip"], output_filename=output_filename, ) spec_out.write_results(self.output_formats) def _process_peptidoform( psm_index: int, psm: PSM, model: str, encoder: Encoder, ion_types: Optional[List[str]] = None, ) -> ProcessingResult: """ Process a single peptidoform from a PSM. Get theoretical m/z and predicted intensities (from C model) or feature vectors (for XGBoost model) for a single peptidoform from a PSM. Notes ----- - ``ms2pip_pyx.init()`` must be called before this function is called. - Optionally, lowercase version of ``ion_types`` from the model configuration can be provided to save computational time. """ peptidoform = psm.peptidoform if not ion_types: ion_types = [it.lower() for it in MODELS[model]["ion_types"]] enc_peptide = encoder.encode_peptide(peptidoform) enc_peptidoform = encoder.encode_peptidoform(peptidoform) # Get ion mzs and map to ion types mz = ms2pip_pyx.get_mzs(enc_peptidoform, MODELS[model]["peaks_version"]) mz = {i: np.array(mz, dtype=np.float32) for i, mz in zip(ion_types, mz)} # Get predictions from XGBoost models. if "xgboost_model_files" in MODELS[model].keys(): predictions = None feature_vectors = np.array( ms2pip_pyx.get_vector(enc_peptide, enc_peptidoform, peptidoform.precursor_charge), dtype=np.uint16, ) # Or get predictions from C models. else: predictions = ms2pip_pyx.get_predictions( enc_peptide, enc_peptidoform, peptidoform.precursor_charge, MODELS[model]["id"], MODELS[model]["peaks_version"], 30.0, # TODO: Remove CE feature ) predictions = { i: np.array(p, dtype=np.float32).clip(min=np.log2(0.001)) # Clip negative intensities for i, p in zip(ion_types, predictions) } feature_vectors = None return ProcessingResult( psm_index=psm_index, psm=psm, theoretical_mz=mz, predicted_intensity=predictions, observed_intensity=None, feature_vectors=feature_vectors, ) def _process_peptides( enumerated_psm_list: List[Tuple[int, PSM]], encoder: Encoder, model: str, ) -> List[ProcessingResult]: """ Predict spectrum for each entry in PeptideRecord DataFrame. Parameters ---------- enumerated_psm_list List of tuples of (index, PSM) for each PSM in the input file. encoder Configured encoder to use for peptide and peptidoform encoding model Name of prediction model to be used """ ms2pip_pyx.ms2pip_init(*encoder.encoder_files) results = [] ion_types = [it.lower() for it in MODELS[model]["ion_types"]] for psm_index, psm in enumerated_psm_list: try: result = _process_peptidoform(psm_index, psm, model, encoder, ion_types) except ( exceptions.InvalidPeptidoformError, exceptions.InvalidAminoAcidError, ): result = ProcessingResult(psm_index=psm_index, psm=psm) results.append(result) return results def _process_spectra( enumerated_psm_list: List[Tuple[int, PSM]], spec_file: str, vector_file: bool, encoder: Encoder, model: str, ms2_tolerance: float, spectrum_id_pattern: str, annotations_only: bool = False, ) -> List[ProcessingResult, None]: """ Perform requested tasks for each spectrum in spectrum file. Parameters ---------- enumerated_psm_list List of tuples of (index, PSM) for each PSM in the input file. spec_file Filename of spectrum file vector_file If feature vectors should be extracted instead of predictions encoder: Encoder Configured encoder to use for peptide and peptidoform encoding model Name of prediction model to be used ms2_tolerance Fragmentation spectrum m/z error tolerance in Dalton spectrum_id_pattern Regular expression pattern to apply to spectrum titles before matching to peptide file entries annotations_only If only peak annotations should be extracted from the spectrum file """ ms2pip_pyx.ms2pip_init(*encoder.encoder_files) results = [] ion_types = [it.lower() for it in MODELS[model]["ion_types"]] try: spectrum_id_regex = re.compile(spectrum_id_pattern) except TypeError: spectrum_id_regex = re.compile(r"(.*)") # Restructure PeptideRecord entries as spec_id -> [(id, psm_1), (id, psm_2), ...] psms_by_specid = defaultdict(list) for psm_index, psm in enumerated_psm_list: psms_by_specid[str(psm.spectrum_id)].append((psm_index, psm)) for spectrum in read_spectrum_file(spec_file): # Match spectrum ID with provided regex, use first match group as new ID match = spectrum_id_regex.search(spectrum.identifier) try: spectrum_id = match[1] except (TypeError, IndexError): raise exceptions.TitlePatternError( f"Spectrum title pattern `{spectrum_id_pattern}` could not be matched to " f"spectrum ID `{spectrum.identifier}`. " " Are you sure that the regex contains a capturing group?" ) if spectrum_id not in psms_by_specid: continue # Spectrum preprocessing: # Remove reporter ions and precursor peak, normalize, transform for label_type in ["iTRAQ", "TMT"]: if label_type in model: spectrum.remove_reporter_ions(label_type) # spectrum.remove_precursor() # TODO: Decide to implement this or not spectrum.tic_norm() spectrum.log2_transform() for psm_index, psm in psms_by_specid[spectrum_id]: try: enc_peptidoform = encoder.encode_peptidoform(psm.peptidoform) except exceptions.InvalidAminoAcidError: result = ProcessingResult(psm_index=psm_index, psm=psm) results.append(result) continue targets = ms2pip_pyx.get_targets( enc_peptidoform, spectrum.mz.astype(np.float32), spectrum.intensity.astype(np.float32), float(ms2_tolerance), MODELS[model]["peaks_version"], ) targets = {i: np.array(t, dtype=np.float32) for i, t in zip(ion_types, targets)} if not psm.peptidoform.precursor_charge: psm.peptidoform.precursor_charge = spectrum.precursor_charge if vector_file: enc_peptide = encoder.encode_peptide(psm.peptidoform) feature_vectors = np.array( ms2pip_pyx.get_vector( enc_peptide, enc_peptidoform, psm.peptidoform.precursor_charge ), dtype=np.uint16, ) result = ProcessingResult( psm_index=psm_index, psm=psm, theoretical_mz=None, predicted_intensity=None, observed_intensity=targets, correlation=None, feature_vectors=feature_vectors, ) elif annotations_only: # Only return mz and targets mz = ms2pip_pyx.get_mzs(enc_peptidoform, MODELS[model]["peaks_version"]) mz = {i: np.array(mz, dtype=np.float32) for i, mz in zip(ion_types, mz)} result = ProcessingResult( psm_index=psm_index, psm=psm, theoretical_mz=mz, predicted_intensity=None, observed_intensity=targets, correlation=None, feature_vectors=None, ) else: # Predict with C model or get feature vectors for XGBoost try: result = _process_peptidoform(psm_index, psm, model, encoder, ion_types) except ( exceptions.InvalidPeptidoformError, exceptions.InvalidAminoAcidError, ): result = ProcessingResult(psm_index=psm_index, psm=psm) else: result.observed_intensity = targets results.append(result) return results def _assemble_training_data(results: List[ProcessingResult], model: str) -> pd.DataFrame: """Assemble training data from results list to single pandas DataFrame.""" # Get ion types ion_types = [it.lower() for it in MODELS[model]["ion_types"]] # Assemble feature vectors, PSM indices, and targets training_data = pd.DataFrame( np.vstack([r.feature_vectors for r in results if r.feature_vectors is not None]), columns=get_feature_names(), ) training_data["psm_index"] = np.concatenate( [ np.repeat(r.psm_index, r.feature_vectors.shape[0]) for r in results if r.feature_vectors is not None ] ) for ion_type in ion_types: if ion_type in ["a", "b", "b2", "c"]: training_data[f"target_{ion_type}"] = np.concatenate( [r.observed_intensity[ion_type] for r in results if r.feature_vectors is not None] ) elif ion_type in ["x", "y", "y2", "z"]: training_data[f"target_{ion_type}"] = np.concatenate( [ r.observed_intensity[ion_type][::-1] for r in results if r.feature_vectors is not None ] ) # Reorder columns training_data = training_data[ ["psm_index"] + get_feature_names() + [f"target_{it}" for it in ion_types] ] return training_data def _into_batches(iterable: Iterable[Any], batch_size: int) -> Generator[List[Any], None, None]: """Accumulate iterator elements into batches of a given size.""" batch = [] for item in iterable: batch.append(item) if len(batch) == batch_size: yield batch batch = [] if batch: yield batch