#########################################################################
# MacSyFinder - Detection of macromolecular systems in protein dataset #
# using systems modelling and similarity search. #
# Authors: Sophie Abby, Bertrand Neron #
# Copyright (c) 2014-2023 Institut Pasteur (Paris) and CNRS. #
# See the COPYRIGHT file for details #
# #
# This file is part of MacSyFinder package. #
# #
# MacSyFinder is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# MacSyFinder is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details . #
# #
# You should have received a copy of the GNU General Public License #
# along with MacSyFinder (COPYING). #
# If not, see <https://www.gnu.org/licenses/>. #
#########################################################################
"""
Module to build and manage Clusters of Hit
A cluster is a set of hits each of which hits less than inter_gene_max_space from its neighbor
"""
import itertools
import logging
import macsypy.gene
from .error import MacsypyError
from .gene import GeneStatus
from .hit import Loner, LonerMultiSystem, get_best_hit_4_func
_log = logging.getLogger(__name__)
def _colocates(h1, h2, rep_info):
"""
compute the distance (in number of gene between) between 2 hits
:param :class:`macsypy.hit.ModelHit` h1: the first hit to compute inter hit distance
:param :class:`macsypy.hit.ModelHit` h2: the second hit to compute inter hit distance
:return: True if the 2 hits spaced by lesser or equal genes than inter_gene_max_space.
Managed circularity.
"""
# compute the number of genes between h1 and h2
dist = h2.get_position() - h1.get_position() - 1
g1 = h1.gene_ref
g2 = h2.gene_ref
model = g1.model
d1 = g1.inter_gene_max_space
d2 = g2.inter_gene_max_space
if d1 is None and d2 is None:
inter_gene_max_space = model.inter_gene_max_space
elif d1 is None:
inter_gene_max_space = d2
elif d2 is None:
inter_gene_max_space = d1
else: # d1 and d2 are defined
inter_gene_max_space = min(d1, d2)
if 0 <= dist <= inter_gene_max_space:
return True
elif dist <= 0 and rep_info.topology == 'circular':
# h1 and h2 overlap the ori
dist = rep_info.max - h1.get_position() + h2.get_position() - rep_info.min
return dist <= inter_gene_max_space
return False
def _clusterize(hits, model, hit_weights, rep_info):
"""
clusterize hit regarding the distance between them
:param hits: the hits to clusterize
:type hits: list of :class:`macsypy.model.ModelHit` objects
:param model: the model to consider
:type model: :class:`macsypy.model.Model` object
:param hit_weights: the hit weight to compute the score
:type hit_weights: :class:`macsypy.hit.HitWeight` object
:type rep_info: :class:`macsypy.Indexes.RepliconInfo` object
:return: the clusters
:rtype: list of :class:`macsypy.cluster.Cluster` objects.
"""
def do_clst(cluster_scaffold, model, hit_weights):
"""
:param cluster_scaffold:
:param model:
:param hit_weights:
:return:
"""
gene_types = {hit.gene_ref.name for hit in cluster_scaffold}
if len(gene_types) > 1:
if all([_hit.gene_ref.status == GeneStatus.NEUTRAL for _hit in cluster_scaffold]):
# we do not consider a group of neutral as a cluster
_log.debug(f"({', '.join([h.id for h in cluster_scaffold])}) "
f"is composed of only neutral. It's not a cluster.")
else:
cluster = Cluster(cluster_scaffold, model, hit_weights)
clusters.append(cluster)
else:
cluster = Cluster(cluster_scaffold, model, hit_weights)
if cluster.loner:
# it's a group of one loner add as cluster
# it will be squashed at next step (_get_true_loners )
clusters.append(cluster)
else:
_log.debug(f"({', '.join([h.id for h in cluster_scaffold])}) "
f"is composed of only type of gene {cluster_scaffold[0].gene_ref.name}. It's not a cluster.")
clusters = []
cluster_scaffold = []
# sort hits by increasing position and then descending score
hits.sort(key=lambda h: (h.position, - h.score))
# remove duplicates hits (several hits for the same sequence),
# keep the first one, this with the best score
# position == sequence rank in replicon
hits = [next(group) for pos, group in itertools.groupby(hits, lambda h: h.position)]
if hits:
hit = hits[0]
cluster_scaffold.append(hit)
previous_hit = cluster_scaffold[0]
for m_hit in hits[1:]:
if _colocates(previous_hit, m_hit, rep_info):
cluster_scaffold.append(m_hit)
else:
if len(cluster_scaffold) > 1:
# close the current scaffold if it contains at least 2 hits
do_clst(cluster_scaffold, model, hit_weights)
elif model.min_genes_required == 1:
# close the current scaffold if it contains 1 hit
# but it's allowed by the model
cluster = Cluster(cluster_scaffold, model, hit_weights)
clusters.append(cluster)
elif model.get_gene(cluster_scaffold[0].gene.name).loner:
# close the current scaffold it contains 1 hit but the model gene is tag as a loner
cluster = Cluster(cluster_scaffold, model, hit_weights)
clusters.append(cluster)
# the hit transformation in loner is performed at the end when circularity and merging is done
# open new scaffold
cluster_scaffold = [m_hit]
previous_hit = m_hit
# close the last current cluster
len_scaffold = len(cluster_scaffold)
if len_scaffold > 1:
do_clst(cluster_scaffold, model, hit_weights)
elif len_scaffold == 1:
# handle circularity
# if there are clusters
# may be the hit collocate with the first hit of the first cluster
if clusters and _colocates(cluster_scaffold[0], clusters[0].hits[0], rep_info):
new_cluster = Cluster(cluster_scaffold, model, hit_weights)
clusters[0].merge(new_cluster, before=True)
elif cluster_scaffold[0].gene_ref.loner:
# the hit does not collocate, but it's a loner
# handle clusters containing only one loner
new_cluster = Cluster(cluster_scaffold, model, hit_weights)
clusters.append(new_cluster)
elif model.min_genes_required == 1:
# the hit does not collocate but the model required only one gene
# handle clusters containing only one gene
new_cluster = Cluster(cluster_scaffold, model, hit_weights)
clusters.append(new_cluster)
# handle circularity
if len(clusters) > 1:
if _colocates(clusters[-1].hits[-1], clusters[0].hits[0], rep_info):
clusters[0].merge(clusters[-1], before=True)
clusters = clusters[:-1]
return clusters
def _get_true_loners(clusters):
"""
We call a True Loner a Cluster composed of one or several hit related to the same gene tagged as loner
(by opposition with hit representing a gene tagged loner but include in cluster with several other genes)
:param clusters: the clusters
:type clusters: list of :class:`macsypy.cluster.Cluster` objects.
:return: tuple of 2 elts
* dict containing true clusters {str func_name : :class:`macsypy.hit.Loner | :class:`macsypy.hit.LonerMultiSystem` object}
* list of :class:`macsypy.cluster.Cluster` objects
"""
def add_true_loner(clstr):
hits = clstr.hits
clstr_len = len(hits)
if clstr_len > 1:
_log.warning(f"Squash cluster of {clstr_len} {clstr[0].gene_ref.name} loners "
f"({hits[0].position} -> {hits[-1].position})")
func_name = clstr[0].gene_ref.alternate_of().name
if func_name in true_loners:
true_loners[func_name].extend(hits)
else:
true_loners[func_name] = hits
###################
# get True Loners #
###################
# true_loner is a hit which encode for a gene tagged as loner
# and which does NOT clusterize with some other hits of interest
true_clusters = []
true_loners = {}
if clusters:
model = clusters[0].model
hit_weights = clusters[0].hit_weights
for clstr in clusters:
if clstr.loner:
# it's a true Loner or a stretch of same loner
add_true_loner(clstr)
else:
# it's a cluster of 1 hit
# but it's NOT a loner
# min_genes_required == 1
true_clusters.append(clstr)
for func_name, loners in true_loners.items():
# transform ModelHit in Loner
true_loners[func_name] = []
for i, _ in enumerate(loners):
if loners[i].multi_system:
# the counterpart have been already computed during the MS hit instanciation
# instead of the Loner not multisystem it include the hits which clusterize
true_loners[func_name].append(LonerMultiSystem(loners[i]))
else:
counterpart = loners[:]
hit = counterpart.pop(i)
true_loners[func_name].append(Loner(hit, counterpart=counterpart))
# replace List of Loners/MultiSystem by the best hit
best_loner = get_best_hit_4_func(func_name, true_loners[func_name], key='score')
true_loners[func_name] = best_loner
true_loners = {func_name: Cluster([loner], model, hit_weights) for func_name, loner in true_loners.items()}
return true_loners, true_clusters
[docs]def build_clusters(hits, rep_info, model, hit_weights):
"""
From a list of filtered hits, and replicon information (topology, length),
build all lists of hits that satisfied the constraints:
* max_gene_inter_space
* loner
* multi_system
If Yes create a cluster
A cluster contains at least two hits separated by less or equal than max_gene_inter_space
Except for loner genes which are allowed to be alone in a cluster
:param hits: list of filtered hits
:type hits: list of :class:`macsypy.hit.ModelHit` objects
:param rep_info: the replicon to analyse
:type rep_info: :class:`macsypy.Indexes.RepliconInfo` object
:param model: the model to study
:type model: :class:`macsypy.model.Model` object
:return: list of regular clusters,
the special clusters (loners not in cluster and multi systems)
:rtype: tuple with 2 elements
* true_clusters which is list of :class:`Cluster` objects
* true_loners: a dict { str function: :class:macsypy.hit.Loner | :class:macsypy.hit.LonerMultiSystem object}
"""
if hits:
clusters = _clusterize(hits, model, hit_weights, rep_info)
# The hits in clusters are either ModelHit or MultiSystem
# (they are cast during model.filter(hits) method)
# the MultiSystem have no yet counterpart
# which will compute once System will be computed
# to take in account only hits in true system candidates
# whereas the counterpart for loner & LonerMultiSystems during get_true_loners
true_loners, true_clusters = _get_true_loners(clusters)
else: # there is not hits
true_clusters = []
true_loners = {}
return true_clusters, true_loners
[docs]class Cluster:
"""
Handle hits relative to a model which collocates
"""
_id = itertools.count(1)
[docs] def __init__(self, hits, model, hit_weights):
"""
:param hits: the hits constituting this cluster
:type hits: [ :class:`macsypy.hit.CoreHit` | :class:`macsypy.hit.ModelHit`, ... ]
:param model: the model associated to this cluster
:type model: :class:`macsypy.model.Model`
"""
self.hits = hits
self.model = model
self._check_replicon_consistency()
self._score = None
self._genes_roles = None
self._hit_weights = hit_weights
self.id = f"c{next(self._id)}"
def __len__(self):
return len(self.hits)
def __getitem__(self, item):
return self.hits[item]
@property
def hit_weights(self):
"""
:return: the different weight for the hits used to compute the score
:rtype: :class:`macsypy.hit.HitWeight`
"""
return self._hit_weights
@property
def loner(self):
"""
:return: True if this cluster is made of only some hits representing the same gene and this gene is tag as loner
False otherwise:
- contains several hits coding for different genes
- contains one hit but gene is not tag as loner (max_gene_required = 1)
"""
# need this method in build_cluster before to transform ModelHit in Loner
# so cannot rely on Loner type
# beware return True if several hits of same gene composed this cluster
return len({h.gene_ref.name for h in self.hits}) == 1 and self.hits[0].gene_ref.loner
@property
def multi_system(self):
"""
:return: True if this cluster is made of only one hit representing a multi_system gene
False otherwise:
- contains several hits
- contains one hit but gene is not tag as loner (max_gene_required = 1)
"""
# by default gene_ref.multi_system == gene_ref.alternate_of().multi_system
return len(self) == 1 and self.hits[0].gene_ref.multi_system
[docs] def _check_replicon_consistency(self):
"""
:raise: MacsypyError if all hits of a cluster are NOT related to the same replicon
"""
rep_name = self.hits[0].replicon_name
if not all([h.replicon_name == rep_name for h in self.hits]):
msg = "Cannot build a cluster from hits coming from different replicons"
_log.error(msg)
raise MacsypyError(msg)
[docs] def __contains__(self, v_hit):
"""
:param v_hit: The hit to test
:type v_hit: :class:`macsypy.hit.ModelHit` object
:return: True if the hit is in the cluster hits, False otherwise
"""
return v_hit in self.hits
@property
def functions(self):
"""
:return: The set of functions encoded by this cluster
*function* mean gene name or reference gene name for exchangeables genes
for instance
<model vers="2.0">
<gene a presence="mandatory"/>
<gene b presence="accessory"/>
<exchangeable>
<gene c />
</exchangeable>
<gene/>
</model>
the functions for a cluster corresponding to this model wil be {'a' , 'b'}
:rtype: frozenset
"""
if self._genes_roles is None:
self._genes_roles = frozenset({h.gene_ref.alternate_of().name for h in self.hits})
return self._genes_roles
[docs] def fulfilled_function(self, *genes):
"""
:param gene: The genes which must be tested.
:type genes: :class:`macsypy.gene.ModelGene` object or string representing the gene name
:return: the common functions between genes and this cluster.
:rtype: set of string
"""
# we do not filter out neutral from the model
genes_roles = self.functions
functions = set()
for gene in genes:
if isinstance(gene, macsypy.gene.ModelGene):
function = gene.name
else:
# gene is a string
function = gene
functions.add(function)
return genes_roles.intersection(functions)
[docs] def merge(self, cluster, before=False):
"""
merge the cluster in this one. (do it in place)
:param cluster:
:type cluster: :class:`macsypy.cluster.Cluster` object
:param bool before: If False the hits of the cluster will be add at the end of this one,
Otherwise the cluster hits will be inserted before the hits of this one.
:return: None
:raise MacsypyError: if the two clusters have not the same model
"""
if cluster.model != self.model:
raise MacsypyError("Try to merge Clusters from different model")
else:
if before:
self.hits = cluster.hits + self.hits
else:
self.hits.extend(cluster.hits)
@property
def replicon_name(self):
"""
:return: The name of the replicon where this cluster is located
:rtype: str
"""
return self.hits[0].replicon_name
@property
def score(self):
"""
:return: The score for this cluster
:rtype: float
"""
if self._score is not None:
return self._score
else:
seen_hits = {}
_log.debug("===================== compute score for cluster =====================")
for m_hit in self.hits:
_log.debug(f"-------------- test model hit {m_hit.gene.name} --------------")
# attribute a score for this hit
# according to status of the gene_ref in the model: mandatory/accessory
if m_hit.status == GeneStatus.MANDATORY:
hit_score = self._hit_weights.mandatory
elif m_hit.status == GeneStatus.ACCESSORY:
hit_score = self._hit_weights.accessory
elif m_hit.status == GeneStatus.NEUTRAL:
hit_score = self._hit_weights.neutral
else:
raise MacsypyError(f"a Cluster contains hit {m_hit.gene.name} {m_hit.position}"
f" which is neither mandatory nor accessory: {m_hit.status}")
_log.debug(f"{m_hit.id} is {m_hit.status} hit score = {hit_score}")
# weighted the hit score according to the hit match the gene or
# is an exchangeable
if m_hit.gene_ref.is_exchangeable:
hit_score *= self._hit_weights.exchangeable
_log.debug(f"{m_hit.id} is exchangeable hit score = {hit_score}")
else:
hit_score *= self._hit_weights.itself
if self.loner or self.multi_system:
hit_score *= self._hit_weights.out_of_cluster
_log.debug(f"{m_hit.id} is out of cluster (Loner) score = {hit_score}")
# funct is the name of the gene if it code for itself
# or the name of the reference gene if it's an exchangeable
funct = m_hit.gene_ref.alternate_of().name
if funct in seen_hits:
# count only one occurrence of each function per cluster
# the score use is the max of hit score for this function
if hit_score > seen_hits[funct]:
seen_hits[funct] = hit_score
_log.debug(f"{m_hit.id} code for {funct} update hit_score to {hit_score}")
else:
_log.debug(f"{m_hit.id} code for {funct} which is already take in count in cluster")
else:
_log.debug(f"{m_hit.id} {m_hit.gene_ref.name} is not already in cluster")
seen_hits[funct] = hit_score
hits_scores = seen_hits.values()
score = sum(hits_scores)
_log.debug(f"cluster score = sum({list(hits_scores)}) = {score}")
_log.debug("===============================================================")
self._score = score
return score
[docs] def __str__(self):
"""
:return: a string representation of this cluster
"""
rep = f"""Cluster:
- model = {self.model.name}
- replicon = {self.replicon_name}
- hits = {', '.join([f"({h.id}, {h.gene.name}, {h.position})" for h in self.hits])}"""
return rep
[docs] def replace(self, old, new):
"""
replace hit old in this cluster by new one. (work in place)
:param old: the hit to replace
:type old: :class:`macsypy.hit.ModelHit` object.
:param new: the new hit
:type new: :class:`macsypy.hit.ModelHit` object.
:return: None
"""
idx = self.hits.index(old)
self.hits[idx] = new