"""
Created on 18 Oct 2018
@author: jmht
"""
from collections import OrderedDict
import logging
import os
from mrparse.mr_util import run_cmd, EXE_EXT
from mrbump.seq_align.simpleSeqID import simpleSeqID
from mrbump.tools import makeSeqDB
from Bio import SearchIO
PHMMER = 'phmmer'
HHSEARCH = 'hhsearch'
logger = logging.getLogger(__name__)
[docs]class SequenceHit:
def __init__(self):
self.name = None
self.pdb_id = None
self.chain_id = None
self.rank = None
self.prob = 0.0
self.evalue = 0.0
self.pvalue = 0.0
self.score = 0.0
self.seq_ali = None
self.search_engine = None
self.alignment = ""
self.query_start = None
self.query_stop = None
self.hit_start = None
self.hit_stop = None
self.overall_sequence_identity = 0.0
self.target_alignment = None
self.alignments = dict([])
# pointers to objects
self.region = None
self.homolog = None
@property
def length(self):
return self.query_extent
@property
def hit_extent(self):
"""python list indexing so need to subtract 1"""
return (self.hit_stop - self.hit_start) - 1
@property
def hit_range(self):
return (self.hit_start, self.hit_stop)
@property
def query_extent(self):
"""python list indexing so need to add 1"""
return (self.query_stop - self.query_start) - 1
@property
def query_midpoint(self):
return ((float(self.query_stop) - float(self.query_start + 1)) / 2.0) + float(self.query_start + 1)
@property
def query_range(self):
return (self.query_start, self.query_stop)
@property
def region_id(self):
return self._get_child_attr('region', 'id')
@property
def region_index(self):
return self._get_child_attr('region', 'index')
# Non-property methods
def _get_child_attr(self, child, attr):
if hasattr(self, child):
child = getattr(self, child)
if hasattr(child, attr):
return getattr(child, attr)
return None
def __str__(self):
attrs = [k for k in self.__dict__.keys() if not k.startswith('_')]
INDENT = " "
out_str = "Class: {}\nData:\n".format(self.__class__)
for a in sorted(attrs):
out_str += INDENT + "{} : {}\n".format(a, self.__dict__[a])
return out_str
[docs]def find_hits(seq_info, search_engine=PHMMER, hhsearch_exe=None, hhsearch_db=None, phmmer_dblvl=95):
if search_engine == PHMMER:
logfile = run_phmmer(seq_info, dblvl=phmmer_dblvl)
searchio_type = 'hmmer3-text'
elif search_engine == HHSEARCH:
searchio_type = 'hhsuite2-text'
logfile = run_hhsearch(seq_info, hhsearch_exe, hhsearch_db)
else:
raise RuntimeError("Unrecognised search_engine: {}".format(search_engine))
target_sequence = seq_info.sequence
return _find_hits(logfile=logfile, searchio_type=searchio_type, target_sequence=target_sequence)
def _find_hits(logfile=None, searchio_type=None, target_sequence=None):
assert logfile and searchio_type and target_sequence
try:
io = SearchIO.read(logfile, searchio_type)
except ValueError:
# If the error is: "'NoneType' object has no attribute 'group'" then this is an error in Biopython that
# has been fixed in later versions
logger.exception("ValueError while running Biopython")
raise RuntimeError('Problem running Biopython SearchIO - you may need to update your version of Biopython.')
except:
logger.exception("Unexpected error while running Biopython")
raise
included = io.hit_filter(lambda x: x.is_included)
hitDict = OrderedDict()
rank = 0
for i, hit in enumerate(included):
rank = i + 1
for hsp in hit.hsps:
# sequence_identity = float(sum([a==b for a, b in zip(*[str(s.upper().seq) for s in hsp.aln.get_all_seqs()])])) / float(hsp.aln_span)
sh = SequenceHit()
sh.rank = rank
sh.pdb_id, sh.chain_id = hsp.hit_id.split('_')
sh.evalue = hsp.evalue # is i-Evalue - possibly evalue_cond in later BioPython
hstart = hsp.hit_start
hstop = hsp.hit_end
qstart, qstop = hsp.query_range
seq_ali = zip(range(qstart, qstop), hsp.hit.seq)
sh.seq_ali = [x[0] for x in seq_ali if x[1] != '-']
sh.query_start = qstart
sh.query_stop = qstop
sh.hit_start = hstart
sh.hit_stop = hstop
target_alignment = "".join(hsp.aln[0].upper()) # assume the first Sequence is always the target
sh.target_alignment = target_alignment
alignment = "".join(hsp.aln[1].upper()) # assume the first Sequence is always the target
sh.alignment = alignment
local, overall = simpleSeqID().getPercent(alignment, target_alignment, target_sequence)
sh.local_sequence_identity = local
sh.overall_sequence_identity = overall
if searchio_type == "hmmer3-text":
sh.score = hit.bitscore
hit_name = hit.id + "_" + str(hsp.domain_index)
sh.search_engine = "phmmer"
else:
sh.score = hit.score
hit_name = hit.id + "_" + str(hsp.output_index)
sh.search_engine = "hhsearch"
sh.name = hit_name
hitDict[hit_name] = sh
return hitDict
[docs]def sort_hits_by_size(hits, ascending=False):
reverse = not(ascending)
return OrderedDict(sorted(hits.items(), key=lambda x: x[1].length, reverse=reverse))
[docs]def run_phmmer(seq_info, dblvl=95):
logfile = "phmmer_{}.log".format(dblvl)
alnfile = "phmmerAlignment_{}.log".format(dblvl)
phmmerTblout = "phmmerTblout_{}.log".format(dblvl)
phmmerDomTblout = "phmmerDomTblout_{}.log".format(dblvl)
phmmerEXE = os.path.join(os.environ["CCP4"], "libexec", "phmmer")
delete_db = False
if dblvl == "af2":
seqdb = os.path.join(os.environ["CCP4"], "share", "mrbump", "data", "afdb.fasta")
else:
sb = makeSeqDB.sequenceDatabase()
seqdb = sb.makePhmmerFasta(RLEVEL=dblvl)
delete_db = True
cmd = [phmmerEXE + EXE_EXT,
'--notextw',
'--tblout', phmmerTblout,
'--domtblout', phmmerDomTblout,
'-A', alnfile,
seq_info.sequence_file, seqdb]
stdout = run_cmd(cmd)
if os.name == 'nt':
lines = stdout.split('\r\n')
lines[0] = "# phmmer :: search a protein sequence against a protein database"
stdout = "\n".join(lines)
with open(logfile, 'w') as f_out:
f_out.write(stdout)
if delete_db:
os.unlink(seqdb)
return logfile
[docs]def run_hhsearch(seq_info, hhsearch_exe, hhsearch_db):
logfile = "hhsearch.log"
cmd = [hhsearch_exe,
'-i', seq_info.sequence_file,
'-d', os.path.join(hhsearch_db, os.path.basename(hhsearch_db)),
'-o', logfile]
run_cmd(cmd)
return logfile