Commit e1db8f84 authored by Mikael Boden's avatar Mikael Boden


parent eda971ca
......@@ -165,9 +165,31 @@ class Sequence(object):
symbolCounts[symbol] = self.count(symbol)
return symbolCounts
def find(self, findme):
def getDegapped(self):
""" Create the sequence excluding gaps, and provide the corresponding indices for the gapped version, e.g.
>>> gappy = Sequence('AC--TA-GA', DNA_Alphabet, name = 'myseq', gappy = True)
>>> degapped, indices = gappy.getDegapped()
>>> print(degapped)
myseq: ACTAGA
>>> print(indices)
[0, 1, 4, 5, 7, 8]
idxs = []
newseq = []
for i in range(len(self.sequence)):
if not self.sequence[i] == '-':
return Sequence(newseq, self.alphabet,,, gappy = False), idxs
def find(self, findme, gappy = False):
""" Find the position of the specified symbol or sub-sequence """
return ''.join(self.sequence).find(findme)
if gappy == False or self.gappy == False:
return ''.join(self.sequence).find(findme)
else: # if the sequence is gappy AND the function is called with gappy = True THEN run the find on the de-gapped sequence
degapped, idxs = self.getDegapped()
idx = ''.join(degapped).find(findme)
return idxs[idx] if idx >= 0 else -1
Below are some useful methods for loading data from strings and files.
......@@ -590,6 +612,51 @@ class Alignment():
s.set(a, b, int(round(sab)))
return s
def outliers(self, cap = None):
Score the extent to which each sequence in the alignment is an outlier
:param cap: the number of sequences that need to share a the state of a position for it to be optimally aligned
:return: a tuple of two lists, each with a score for each sequences, in order of the alignment;
the first list contains an entropy-based score accumulated over the whole sequence;
the second list has a gap-continuity score (the greatest entropy-based score collated for a single, continuous gap, most probably a "deletion");
the third list has a character-continuity score (the greatest entropy-based score collated for a single, continuous character string, most probably an "insertion");
for all three scores, higher means outlier, zero means it is optimally aligned
nseqs = len(self.seqs)
if not cap:
cap = nseqs
gapmat = numpy.zeros((nseqs, self.alignlen))
ngaps = numpy.zeros((self.alignlen))
entscore = [0 for _ in range(nseqs)] # cumulative entropy based score
gapscore = [0 for _ in range(nseqs)] # highest gap score
insscore = [0 for _ in range(nseqs)] # highest insert score
for c in range(self.alignlen):
for r in range(nseqs):
gapmat[r, c] = 1 if self.seqs[r][c] == '-' else 0
ngaps[c] += gapmat[r, c]
for r in range(nseqs):
curgap = 0 # current gap score (cumulative from previous non-gap position)
curchr = 0 # current insertion score (cumulative from previous gap position)
in_gap = False
for c in range(self.alignlen):
agree_cnt = ngaps[c] if gapmat[r, c] == 1 else (nseqs - ngaps[c])
logent = math.log(math.log(agree_cnt, nseqs) + 0.000001) if agree_cnt < cap else 0.0
if gapmat[r, c] == 1:
if not in_gap:
curgap = 0
curgap -= logent
if curgap > gapscore[r]:
gapscore[r] = curgap
else: # gapmat[r, c] == 0, i.e. character
if in_gap: # first character in a string
curchr = 0
curchr -= logent
if curchr > insscore[r]:
insscore[r] = curchr
entscore[r] -= logent
in_gap = gapmat[r, c] == 1
return entscore, gapscore, insscore
def calcDistances(self, measure, a=1.0):
""" Calculate the evolutionary distance between all pairs of sequences
in this alignment, using the given measure. Measure can be one of
......@@ -1118,12 +1185,11 @@ class Regexp(object):
def search(self, sequence):
""" Find matches to the motif in the specified sequence. Returns a list
of triples, of the form (position, matched string, score). Note that
the score is always 1.0 because a consensus sequence either matches
the score is always 1.0 because a regexp either matches
or doesn't. """
if not type(sequence) is Sequence:
sequence = Sequence(sequence)
sequenceString = sequence[:]
results = []
for match in self.regex.finditer(sequenceString):
results.append((match.start(),, 1.0))
......@@ -1335,5 +1401,13 @@ def runBLAST(sequence, program='blastp', database='uniprotkb', exp='1e-1'):
return ids
if __name__ == '__main__':
seqs = readFastaFile('/Users/mikael/ASR/CYP11/CYP11_aln_full.fa', Protein_wX, gappy=True)
print(('Read', len(seqs), 'sequences'))
aln = readClustalFile('/Users/mikael/simhome/ASR/gappy.aln', Protein_Alphabet)
x, g, i = aln.outliers()
for s in range(len(aln)):
print(aln[s].name, x[s], g[s], i[s])
ngs, idxs = aln[s].getDegapped()
print('\t', ngs, idxs)
idx = aln[s].find('FFVK', gappy = True)
if idx >= 0:
print('\t', aln[s].sequence[idx:])
print(('Read', len(aln), 'sequences'))
......@@ -67,7 +67,7 @@ def search(query, dbName='uniprot', format='list', limit=100, columns=""):
return data
except urllib.error.HTTPError as ex:
raise RuntimeError(
raise RuntimeError("utf-8"))
elif dbName.startswith('refseq'):
dbs = dbName.split(":")
if len(dbs) > 1:
......@@ -358,12 +358,12 @@ class EBI(object):
databaseData = ''
for db in databaseList:
databaseData += '&database=' + db
encodedParams = urllib.parse.urlencode(params)
encodedParams += databaseData
encodedParams = urllib.parse.urlencode(params).encode('utf-8')
encodedParams += databaseData.encode('utf-8')
encodedParams = urllib.parse.urlencode(params)
encodedParams = urllib.parse.urlencode(params).encode('utf-8')
self.jobId = urllib.request.urlopen(url, encodedParams).read()
self.jobId = urllib.request.urlopen(url, encodedParams).read().decode('utf-8')
return self.jobId
......@@ -373,20 +373,20 @@ class EBI(object):
if jobId is None:
jobId = self.jobId
url = self.__ebiServiceUrl__ + self.service + '/status/%s' % jobId
status = urllib.request.urlopen(url).read()
status = urllib.request.urlopen(url).read().decode('utf-8')
return status
def resultTypes(self):
""" Get the available result types. Will only work on a finished job. """
url = self.__ebiServiceUrl__ + self.service + '/resulttypes/%s' % self.jobId
resultTypes = urllib.request.urlopen(url).read()
resultTypes = urllib.request.urlopen(url).read().decode('utf-8')
return resultTypes
def result(self, resultType):
""" Get the result of the given job of the specified type. """
url = self.__ebiServiceUrl__ + self.service + '/result/%s/%s' % (self.jobId, resultType)
result = urllib.request.urlopen(url).read()
result = urllib.request.urlopen(url).read().decode('utf-8')
if resultType == 'error':
raise RuntimeError('An error occurred: %s' % result)
......@@ -422,3 +422,108 @@ class EBI(object):
return results[0]
return results
def getUniProtDict(ids, cols="", db='uniprot', identities=None):
:param ids: The list of UniProt IDs
:param cols: The list of UniProt database columns
:param db: The database to search - uniprot or uniref
:param identity: The identity to search uniref with
:return: A dictionary mapping each UniProt ID to another dictionary where the keys are database columns and the
values are the information stored within those columns
Get a list of UniProt IDs and a list of UniProt columns you're interested in.
Full list of UniProt column names -
uniprot_names = ['Q9LIR4', 'Q1JUQ1', 'P05791', 'P0ADF6']
cols = ["lineage(SUPERKINGDOM)", "genes", "lineage(KINGDOM)"]
up_dict = getUniProtDict(uniprot_names, cols)
for record in up_dict:
print (record, up_dict[record].get("lineage(SUPERKINGDOM)"))
for record in up_dict:
print (record, up_dict[record].get("genes"))
If a record doesn't have an entry in UniProt for that column it'll just return None
print (up_dict['Q1JUQ1'])
print (up_dict['Q1JUQ1']['lineage(KINGDOM)'])
up_dict = getUniProtDict(["Q9LIR4", "P99999"], cols=["members"], db="uniref", identities = 1.0)
You can either pass a list of identities for each UniProt identifier (in which case the list of identities must be
the same size as the list of identifiers. Or you can just pass a single identity to search Uniref at.
# Format the lists of IDs and columns correctly
cols = ",".join(cols)
if db == 'uniprot':
updated_ids = ' or '.join(ids)
elif db == 'uniref':
# If we just got a single value for an identity, create a list the same size as the queries with just this value
if type(identities) != list:
identities = [identities] * len(ids)
elif len(identities) != len(ids):
raise RuntimeError('Either supply a single identity threshold or supply one for each identifier in the list')
# Check that the identity thresholds are valid values
for x in identities:
if x not in [1.0, 0.9, 0.5]:
raise RuntimeError("UniRef threshold values must be either 1.0, 0.9, or 0.5. Supplied value was - " + str(x))
# Add the query syntax around the identifiers
updated_ids = ""
for query, identity in zip(ids, identities):
updated_ids += "(member:" + query + "+AND+identity:" + str(identity) + ")+OR+"
updated_ids = updated_ids[0:-4]
raise RuntimeError('Database should be either uniprot or uniref')
url = '' + db + '/'
params = {
'format': 'tab',
'query': updated_ids,
'columns': "id," + cols
data = urllib.parse.urlencode(params).encode('utf-8')
request = urllib.request.Request(url, data)
opener = urllib.request.build_opener()
response =
page ='utf-8')
up_dict = {}
# For each record we retrieve, split the line by tabs and build up the UniProt dict
for line in page.split("\n")[1:]:
if line:
splitlines= line.split("\t")
id_dict = {}
pos = 1
for col in cols.split(","):
id_dict[col] = None if splitlines[pos] == "" else splitlines[pos]
pos +=1
up_dict[splitlines[0]] = id_dict
return up_dict
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment