"""
Write spectrum files from MS²PIP prediction results.
Examples
--------
The simplest way to write MS²PIP predictions to a file is to use the :py:func:`write_spectra`
function:
>>> from ms2pip import predict_single, write_spectra
>>> results = [predict_single("ACDE/2")]
>>> write_spectra("/path/to/output/filename", results, "mgf")
Specific writer classes can also be used directly. Writer classes should be used in a context
manager to ensure the file is properly closed after writing. The following example writes MS²PIP
predictions to a TSV file:
>>> from ms2pip import predict_single
>>> results = [predict_single("ACDE/2")]
>>> with TSV("output.tsv") as writer:
... writer.write(results)
Results can be written to the same file sequentially:
>>> results_2 = [predict_single("PEPTIDEK/2")]
>>> with TSV("output.tsv", write_mode="a") as writer:
... writer.write(results)
... writer.write(results_2)
Results can be written to an existing file using the append mode:
>>> with TSV("output.tsv", write_mode="a") as writer:
... writer.write(results_2)
"""
from __future__ import annotations
import csv
import itertools
import logging
import re
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from io import StringIO
from pathlib import Path
from time import localtime, strftime
from typing import Any, Dict, Generator, List, Optional, Union
import numpy as np
from psm_utils import PSM, Peptidoform
from pyteomics import proforma
from sqlalchemy import engine, select
from ms2pip._utils import dlib
from ms2pip.result import ProcessingResult
LOGGER = logging.getLogger(__name__)
[docs]def write_spectra(
filename: Union[str, Path],
processing_results: List[ProcessingResult],
file_format: str = "tsv",
write_mode: str = "w",
):
"""
Write MS2PIP processing results to a supported spectrum file format.
Parameters
----------
filename
Output filename without file extension.
processing_results
List of :py:class:`ms2pip.result.ProcessingResult` objects.
file_format
File format to write. See :py:attr:`FILE_FORMATS` for available formats.
write_mode
Write mode for file. Default is ``w`` (write). Use ``a`` (append) to add to existing file.
"""
with SUPPORTED_FORMATS[file_format](filename, write_mode) as writer:
LOGGER.info(f"Writing to {writer.filename}")
writer.write(processing_results)
class _Writer(ABC):
"""Abstract base class for writing spectrum files."""
suffix = ""
def __init__(self, filename: Union[str, Path], write_mode: str = "w"):
self.filename = Path(filename).with_suffix(self.suffix)
self.write_mode = write_mode
self._open_file = None
def __enter__(self):
"""Open file in context manager."""
self.open()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Close file in context manager."""
self.close()
def __repr__(self):
return f"{self.__class__.__name__}({self.filename, self.write_mode})"
def open(self):
"""Open file."""
if self._open_file:
self.close()
self._open_file = open(self.filename, self.write_mode)
def close(self):
"""Close file."""
if self._open_file:
self._open_file.close()
self._open_file = None
@property
def _file_object(self):
"""Get open file object."""
if self._open_file:
return self._open_file
else:
warnings.warn(
"Opening file outside of context manager. Manually close file after use."
)
self.open()
return self._open_file
def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
for result in processing_results:
self._write_result(result)
@abstractmethod
def _write_result(self, result: ProcessingResult):
"""Write single processing result to file."""
...
[docs]class TSV(_Writer):
"""Write TSV files from MS2PIP processing results."""
suffix = ".tsv"
field_names = [
"psm_index",
"ion_type",
"ion_number",
"mz",
"predicted",
"observed",
"rt",
"im",
]
[docs] def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
writer = csv.DictWriter(
self._file_object, fieldnames=self.field_names, delimiter="\t", lineterminator="\n"
)
if self.write_mode == "w":
writer.writeheader()
for result in processing_results:
self._write_result(result, writer)
def _write_result(self, result: ProcessingResult, writer: csv.DictWriter):
"""Write single processing result to file."""
# Only write results with predictions or observations
if not result.theoretical_mz:
return
for ion_type in result.theoretical_mz:
for i in range(len(result.theoretical_mz[ion_type])):
writer.writerow(self._write_row(result, ion_type, i))
@staticmethod
def _write_row(result: ProcessingResult, ion_type: str, ion_index: int):
"""Write single row for TSV file."""
return {
"psm_index": result.psm_index,
"ion_type": ion_type,
"ion_number": ion_index + 1,
"mz": "{:.8f}".format(result.theoretical_mz[ion_type][ion_index]),
"predicted": "{:.8f}".format(result.predicted_intensity[ion_type][ion_index])
if result.predicted_intensity
else None,
"observed": "{:.8f}".format(result.observed_intensity[ion_type][ion_index])
if result.observed_intensity
else None,
"rt": result.psm.retention_time if result.psm.retention_time else None,
"im": result.psm.ion_mobility if result.psm.ion_mobility else None,
}
[docs]class MSP(_Writer):
"""Write MSP files from MS2PIP processing results."""
suffix = ".msp"
[docs] def write(self, results: List[ProcessingResult]):
"""Write multiple processing results to file."""
for result in results:
self._write_result(result)
def _write_result(self, result: ProcessingResult):
"""Write single processing result to file."""
predicted_spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4
peaks = zip(predicted_spectrum.mz, intensity_normalized, predicted_spectrum.annotations)
# Header
lines = [
f"Name: {result.psm.peptidoform.sequence}/{result.psm.get_precursor_charge()}",
f"MW: {result.psm.peptidoform.theoretical_mass}",
self._format_comment_line(result.psm),
f"Num peaks: {len(predicted_spectrum.mz)}",
]
# Peaks
lines.extend(
f"{mz:.8f}\t{intensity:.8f}\t{annotation}/0.0" for mz, intensity, annotation in peaks
)
# Write to file
self._file_object.writelines(line + "\n" for line in lines)
self._file_object.write("\n")
@staticmethod
def _format_modifications(peptidoform: Peptidoform):
"""Format modifications in MSP-style string, e.g. ``Mods=1/0,E,Glu->pyro-Glu``."""
def _format_single_modification(
amino_acid: str,
position: int,
modifications: Optional[List[proforma.ModificationBase]],
) -> Union[str, None]:
"""Get modification label from :py:class:`proforma.ModificationBase` list."""
if not modifications:
return None
if len(modifications) > 1:
raise ValueError("Multiple modifications per amino acid not supported in MSP.")
modification = modifications[0]
try:
return f"{position},{amino_acid},{modification.name}"
except AttributeError: # MassModification has no attribute `name`
return f"{position},{amino_acid},{modification.value}"
sequence_mods = [
_format_single_modification(aa, pos + 1, mods)
for pos, (aa, mods) in enumerate(peptidoform.parsed_sequence)
]
n_term = _format_single_modification(
peptidoform.sequence[0], 0, peptidoform.properties["n_term"]
)
c_term = _format_single_modification(
peptidoform.sequence[-1], -1, peptidoform.properties["c_term"]
)
mods = [mod for mod in [n_term] + sequence_mods + [c_term] if mod is not None]
if not mods:
return "Mods=0"
else:
return f"Mods={len(mods)}/{'/'.join(mods)}"
@staticmethod
def _format_parent_mass(peptidoform: Peptidoform) -> str:
"""Format parent mass as string."""
return f"Parent={peptidoform.theoretical_mz}"
@staticmethod
def _format_protein_string(psm: PSM) -> Union[str, None]:
"""Format protein list as string."""
if psm.protein_list:
return f"Protein={','.join(psm.protein_list)}"
else:
return None
@staticmethod
def _format_retention_time(psm: PSM) -> Union[str, None]:
"""Format retention time as string."""
if psm.retention_time:
return f"RetentionTime={psm.retention_time}"
else:
return None
@staticmethod
def _format_ion_mobility(psm: PSM) -> Union[str, None]:
"""Format ion mobility as string."""
if psm.ion_mobility:
return f"IonMobility={psm.ion_mobility}"
else:
return None
@staticmethod
def _format_identifier(psm: PSM) -> str:
"""Format MS2PIP ID as string."""
return f"SpectrumIdentifier={psm.spectrum_id}"
@staticmethod
def _format_comment_line(psm: PSM) -> str:
"""Format comment line for MSP file."""
comments = " ".join(
filter(
None,
[
MSP._format_modifications(psm.peptidoform),
MSP._format_parent_mass(psm.peptidoform),
MSP._format_protein_string(psm),
MSP._format_retention_time(psm),
MSP._format_ion_mobility(psm),
MSP._format_identifier(psm),
],
)
)
return f"Comment: {comments}"
[docs]class MGF(_Writer):
"""
Write MGF files from MS2PIP processing results.
See http://www.matrixscience.com/help/data_file_help.html for documentation on the MGF format.
"""
suffix = ".mgf"
[docs] def write(self, results: List[ProcessingResult]):
"""Write multiple processing results to file."""
for result in results:
self._write_result(result)
def _write_result(self, result: ProcessingResult):
"""Write single processing result to file."""
predicted_spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4
peaks = zip(predicted_spectrum.mz, intensity_normalized)
# Header
lines = [
"BEGIN IONS",
f"TITLE={result.psm.peptidoform}",
f"PEPMASS={result.psm.peptidoform.theoretical_mz}",
f"CHARGE={result.psm.get_precursor_charge()}+",
f"SCANS={result.psm.spectrum_id}",
f"RTINSECONDS={result.psm.retention_time}" if result.psm.retention_time else None,
f"ION_MOBILITY={result.psm.ion_mobility}" if result.psm.ion_mobility else None,
]
# Peaks
lines.extend(f"{mz:.8f} {intensity:.8f}" for mz, intensity in peaks)
# Write to file
self._file_object.writelines(line + "\n" for line in lines if line)
self._file_object.write("END IONS\n\n")
[docs]class Spectronaut(_Writer):
"""Write Spectronaut files from MS2PIP processing results."""
suffix = ".spectronaut.tsv"
field_names = [
"ModifiedPeptide",
"StrippedPeptide",
"PrecursorCharge",
"PrecursorMz",
"IonMobility",
"iRT",
"ProteinId",
"RelativeFragmentIntensity",
"FragmentMz",
"FragmentType",
"FragmentNumber",
"FragmentCharge",
"FragmentLossType",
]
[docs] def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
writer = csv.DictWriter(
self._file_object, fieldnames=self.field_names, delimiter="\t", lineterminator="\n"
)
if self.write_mode == "w":
writer.writeheader()
for result in processing_results:
self._write_result(result, writer)
def _write_result(self, result: ProcessingResult, writer: csv.DictWriter):
"""Write single processing result to file."""
# Only write results with predictions
if result.predicted_intensity is None:
return
psm_info = self._process_psm(result.psm)
for fragment_info in self._yield_fragment_info(result):
writer.writerow({**psm_info, **fragment_info})
@staticmethod
def _process_psm(psm: PSM) -> Dict[str, Any]:
"""Process PSM to Spectronaut format."""
return {
"ModifiedPeptide": _peptidoform_str_without_charge(psm.peptidoform),
"StrippedPeptide": psm.peptidoform.sequence,
"PrecursorCharge": psm.get_precursor_charge(),
"PrecursorMz": f"{psm.peptidoform.theoretical_mz:.8f}",
"IonMobility": f"{psm.ion_mobility:.8f}" if psm.ion_mobility else None,
"iRT": f"{psm.retention_time:.8f}" if psm.retention_time else None,
"ProteinId": "".join(psm.protein_list) if psm.protein_list else None,
}
@staticmethod
def _yield_fragment_info(result: ProcessingResult) -> Generator[Dict[str, Any], None, None]:
"""Yield fragment information for a processing result."""
# Normalize intensities
intensities = {
ion_type: _unlogarithmize(intensities)
for ion_type, intensities in result.predicted_intensity.items()
}
max_intensity = max(itertools.chain(*intensities.values()))
intensities = {
ion_type: _basepeak_normalize(intensities[ion_type], basepeak=max_intensity)
for ion_type in intensities
}
for ion_type in result.predicted_intensity:
fragment_type = ion_type[0].lower()
fragment_charge = ion_type[1:] if len(ion_type) > 1 else "1"
for ion_index, (intensity, mz) in enumerate(
zip(intensities[ion_type], result.theoretical_mz[ion_type])
):
yield {
"RelativeFragmentIntensity": f"{intensity:.8f}",
"FragmentMz": f"{mz:.8f}",
"FragmentType": fragment_type,
"FragmentNumber": ion_index + 1,
"FragmentCharge": fragment_charge,
"FragmentLossType": "noloss",
}
[docs]class Bibliospec(_Writer):
"""
Write Bibliospec SSL and MS2 files from MS2PIP processing results.
Bibliospec SSL and MS2 files are also compatible with Skyline. See
https://skyline.ms/wiki/home/software/BiblioSpec/page.view?name=BiblioSpec%20input%20and%20output%20file%20formats
for documentation on the Bibliospec file formats.
"""
ssl_suffix = ".ssl"
ms2_suffix = ".ms2"
ssl_field_names = [
"file",
"scan",
"charge",
"sequence",
"score-type",
"score",
"retention-time",
"ion-mobility",
]
def __init__(self, filename: Union[str, Path], write_mode: str = "w"):
super().__init__(filename, write_mode)
self.ssl_file = self.filename.with_suffix(self.ssl_suffix)
self.ms2_file = self.filename.with_suffix(self.ms2_suffix)
self._open_ssl_file = None
self._open_ms2_file = None
[docs] def open(self):
"""Open files."""
self._open_ssl_file = open(self.ssl_file, self.write_mode)
self._open_ms2_file = open(self.ms2_file, self.write_mode)
[docs] def close(self):
"""Close files."""
if self._open_ssl_file:
self._open_ssl_file.close()
self._open_ssl_file = None
if self._open_ms2_file:
self._open_ms2_file.close()
self._open_ms2_file = None
@property
def _ssl_file_object(self):
"""Get open SSL file object."""
if self._open_ssl_file:
return self._open_ssl_file
else:
warnings.warn(
"Opening file outside of context manager. Manually close file after use."
)
self.open()
return self._open_ssl_file
@property
def _ms2_file_object(self):
"""Get open MS2 file object."""
if self._open_ms2_file:
return self._open_ms2_file
else:
warnings.warn(
"Opening file outside of context manager. Manually close file after use."
)
self.open()
return self._open_ms2_file
[docs] def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
# Create CSV writer
ssl_dict_writer = csv.DictWriter(
self._ssl_file_object,
fieldnames=self.ssl_field_names,
delimiter="\t",
lineterminator="\n",
)
# Write headers
if self.write_mode == "w":
ssl_dict_writer.writeheader()
self._write_ms2_header()
start_scan_number = 0
elif self.write_mode == "a":
start_scan_number = self._get_last_ssl_scan_number(self.ssl_file) + 1
else:
raise ValueError(f"Unsupported write mode: {self.write_mode}")
# Write results
for i, result in enumerate(processing_results):
scan_number = start_scan_number + i
modified_sequence = self._format_modified_sequence(result.psm.peptidoform)
self._write_result(result, modified_sequence, scan_number, ssl_dict_writer)
def _write_ms2_header(self):
"""Write header to MS2 file."""
self._ms2_file_object.write(
f"H\tCreationDate\t{strftime('%Y-%m-%d %H:%M:%S', localtime())}\n"
)
self._ms2_file_object.write("H\tExtractor\tMS2PIP predictions\n")
def _write_result(
self,
result: ProcessingResult,
modified_sequence: str,
scan_number: int,
writer: csv.DictWriter,
):
"""Write single processing result to files."""
self._write_result_to_ssl(result, modified_sequence, scan_number, writer)
self._write_result_to_ms2(result, modified_sequence, scan_number)
def _write_result_to_ssl(
self,
result: ProcessingResult,
modified_sequence: str,
scan_number: int,
writer: csv.DictWriter,
):
"""Write single processing result to the SSL file."""
writer.writerow(
{
"file": self.ms2_file.name if isinstance(self.ms2_file, Path) else "file.ms2",
"scan": scan_number,
"charge": result.psm.get_precursor_charge(),
"sequence": modified_sequence,
"score-type": None,
"score": None,
"retention-time": result.psm.retention_time if result.psm.retention_time else None,
"ion-mobility": result.psm.ion_mobility if result.psm.ion_mobility else None,
}
)
def _write_result_to_ms2(
self, result: ProcessingResult, modified_sequence: str, scan_number: int
):
"""Write single processing result to the MS2 file."""
predicted_spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4
peaks = zip(predicted_spectrum.mz, intensity_normalized)
# Header
lines = [
f"S\t{scan_number}\t{result.psm.peptidoform.theoretical_mz}",
f"Z\t{result.psm.get_precursor_charge()}\t{result.psm.peptidoform.theoretical_mass}",
f"D\tseq\t{result.psm.peptidoform.sequence}",
f"D\tmodified seq\t{modified_sequence}",
]
# Peaks
lines.extend(f"{mz:.8f}\t{intensity:.8f}" for mz, intensity in peaks)
# Write to file
self._ms2_file_object.writelines(line + "\n" for line in lines)
self._ms2_file_object.write("\n")
@staticmethod
def _format_modified_sequence(peptidoform: Peptidoform) -> str:
"""Format modified sequence as string for Spectronaut."""
modification_dict = defaultdict(list)
for term, position in [("n_term", 0), ("c_term", len(peptidoform) - 1)]:
if peptidoform.properties[term]:
modification_dict[position].extend(peptidoform.properties[term])
for position, (_, mods) in enumerate(peptidoform.parsed_sequence):
if mods:
modification_dict[position].extend(mods)
return "".join(
[
f"{aa}{''.join([f'[{mod.mass:+.1f}]' for mod in modification_dict[position]])}"
for position, aa in enumerate(peptidoform.sequence)
]
)
@staticmethod
def _get_last_ssl_scan_number(ssl_file: Union[str, Path, StringIO]):
"""Read scan number of last line in a Bibliospec SSL file."""
if isinstance(ssl_file, StringIO):
ssl_file.seek(0)
for line in ssl_file:
last_line = line
elif isinstance(ssl_file, (str, Path)):
with open(ssl_file, "rt") as ssl:
for line in ssl:
last_line = line
else:
raise TypeError("Unsupported type for `ssl_file`.")
return int(last_line.split("\t")[1])
[docs]class DLIB(_Writer):
"""
Write DLIB files from MS2PIP processing results.
See `EncyclopeDIA File Formats <https://bitbucket.org/searleb/encyclopedia/wiki/EncyclopeDIA%20File%20Formats>`_
for documentation on the DLIB format.
"""
suffix = ".dlib"
[docs] def open(self):
"""Open file."""
if self.write_mode == "w":
self._open_file = self.filename.unlink(missing_ok=True)
self._open_file = dlib.open_sqlite(self.filename)
[docs] def write(self, processing_results: List[ProcessingResult]):
"""Write MS2PIP predictions to a DLIB SQLite file."""
connection = self._file_object
dlib.metadata.create_all()
self._write_metadata(connection)
self._write_entries(processing_results, connection, self.filename)
self._write_peptide_to_protein(processing_results, connection)
def _write_result(self, result: ProcessingResult): ...
@staticmethod
def _format_modified_sequence(peptidoform: Peptidoform) -> str:
"""Format modified sequence as string for DLIB."""
# Sum all sequential mass shifts for each position
masses = [
sum(mod.mass for mod in mods) if mods else 0 for _, mods in peptidoform.parsed_sequence
]
# Add N- and C-terminal modifications
for term, position in [("n_term", 0), ("c_term", len(peptidoform) - 1)]:
if peptidoform.properties[term]:
masses[position] += sum(mod.mass for mod in peptidoform.properties[term])
# Format modified sequence
return "".join(
[
f"{aa}[{mass:+.6f}]" if mass else aa
for aa, mass in zip(peptidoform.sequence, masses)
]
)
@staticmethod
def _write_metadata(connection: engine.Connection):
"""Write metadata to DLIB SQLite file."""
with connection.begin():
version = connection.execute(
select([dlib.Metadata.c.Value]).where(dlib.Metadata.c.Key == "version")
).scalar()
if version is None:
connection.execute(
dlib.Metadata.insert().values(
Key="version",
Value=dlib.DLIB_VERSION,
)
)
@staticmethod
def _write_entries(
processing_results: List[ProcessingResult],
connection: engine.Connection,
output_filename: str,
):
"""Write spectra to DLIB SQLite file."""
with connection.begin():
for result in processing_results:
if not result.psm.retention_time:
raise ValueError("Retention time required to write DLIB file.")
spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(spectrum.intensity) * 1e4
n_peaks = len(spectrum.mz)
connection.execute(
dlib.Entry.insert().values(
PrecursorMz=result.psm.peptidoform.theoretical_mz,
PrecursorCharge=result.psm.get_precursor_charge(),
PeptideModSeq=DLIB._format_modified_sequence(result.psm.peptidoform),
PeptideSeq=result.psm.peptidoform.sequence,
Copies=1,
RTInSeconds=result.psm.retention_time,
Score=0,
MassEncodedLength=n_peaks,
MassArray=spectrum.mz.tolist(),
IntensityEncodedLength=n_peaks,
IntensityArray=intensity_normalized.tolist(),
SourceFile=str(output_filename),
)
)
@staticmethod
def _write_peptide_to_protein(results: List[ProcessingResult], connection: engine.Connection):
"""Write peptide-to-protein mappings to DLIB SQLite file."""
peptide_to_proteins = {
(result.psm.peptidoform.sequence, protein)
for result in results
if result.psm.protein_list
for protein in result.psm.protein_list
}
with connection.begin():
sql_peptide_to_proteins = set()
proteins = {protein for _, protein in peptide_to_proteins}
for peptide_to_protein in connection.execute(
dlib.PeptideToProtein.select().where(
dlib.PeptideToProtein.c.ProteinAccession.in_(proteins)
)
):
sql_peptide_to_proteins.add(
(
peptide_to_protein.PeptideSeq,
peptide_to_protein.ProteinAccession,
)
)
peptide_to_proteins.difference_update(sql_peptide_to_proteins)
for seq, protein in peptide_to_proteins:
connection.execute(
dlib.PeptideToProtein.insert().values(
PeptideSeq=seq, isDecoy=False, ProteinAccession=protein
)
)
SUPPORTED_FORMATS = {
"tsv": TSV,
"msp": MSP,
"mgf": MGF,
"spectronaut": Spectronaut,
"bibliospec": Bibliospec,
"dlib": DLIB,
}
def _peptidoform_str_without_charge(peptidoform: Peptidoform) -> str:
"""Get peptidoform string without charge."""
return re.sub(r"\/\d+$", "", str(peptidoform))
def _unlogarithmize(intensities: np.array) -> np.array:
"""Undo logarithmic transformation of intensities."""
return (2**intensities) - 0.001
def _basepeak_normalize(intensities: np.array, basepeak: Optional[float] = None) -> np.array:
"""Normalize intensities to most intense peak."""
if not basepeak:
basepeak = intensities.max()
return intensities / basepeak