Commit 6b05a37e authored by Mikael Boden's avatar Mikael Boden

updated_webservice.py_with_new_protocols_for_accessing_uniprot_and_GO

parent eff5f374
......@@ -7,7 +7,7 @@ matplotlib_colours={'aliceblue':'#F0F8FF','aqua':'#00FFFF','aquamarine':'#7FFFD4
twenty_colours ={'dodgerblue':'#1E90FF', 'orangered':'#FF4500', 'greenyellow':'#ADFF2F', 'orchid':'#DA70D6'}
symbols = ["*", "^", "!", "#", "~", "+", ":", "<", ">", "@", "%", "=", "-"]
symbols = ["*", "^", "!", "#", "~", "+", ":", "<", ">", "@", "%", "=", "-", "a", "b", "c", "d", "e" "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "aa", "bb", "cc", "dd", "ee", "ff", "gg", "hh", "ii", "jj", "kk", "ll", "mm", "nn", "oo", "pp", "qq", "rr", "ss", "tt", "uu", "vv", "ww", "xx", "yy", "zz"]
class NexusAnnotations():
"""
......@@ -59,6 +59,10 @@ class NexusAnnotations():
if annot not in self.annotation_symbols:
symbol = self.generate_annotation_symbol(annot)
self.add_annotation_symbol(annot, symbol)
else:
symbol = self.generate_annotation_symbol(annot)
self.add_annotation_symbol(annot, symbol)
# Add in a symbol to represent this annotation if one doesn't already exist
......@@ -113,6 +117,8 @@ class NexusAnnotations():
symbol = random.choice(self.symbol_list)
if symbol not in self.annotation_symbols.values():
return symbol
else:
return symbol
i+=1
def add_annotation_symbol(self, symbol, annotation):
......@@ -127,13 +133,16 @@ class NexusAnnotations():
:return: A unique colour
"""
i = 0
while i < len(self.colour_dict.values()):
colour = random.choice(list(self.colour_dict.values()))
if colour not in self.used_colours:
return colour
i+=1
# i = 0
# while i < len(self.colour_dict.values()):
# colour = random.choice(list(self.colour_dict.values()))
# if colour not in self.used_colours:
# return colour
# else:
# return colour
# i+=1
colour = random.choice(list(self.colour_dict.values()))
return colour
def generate_colour_list(self, num):
return num
......@@ -4,6 +4,8 @@ from time import sleep
import stats
from io import StringIO
import gzip
import ssl
import json
""" This module is collection of functions for accessing the EBI REST web services,
including sequence retrieval, searching, gene ontology, BLAST and ClustalW.
......@@ -11,14 +13,13 @@ import gzip
performing BLAST and ClustalW queries.
See
http://www.ebi.ac.uk/Tools/webservices/tutorials/01_intro and
http://www.ebi.ac.uk/Tools/webservices/tutorials/02_rest
http://www.ebi.ac.uk/Tools/webservices/tutorials/06_programming/python/rest/urllib
http://www.ebi.ac.uk/Tools/webservices/tutorials
"""
__ebiUrl__ = 'http://www.ebi.ac.uk/Tools/' # Use UQ mirror when available
__ebiGOUrl__ = 'http://www.ebi.ac.uk/QuickGO-Old/' # Use UQ mirror when available
__uniprotUrl__ = 'http://www.uniprot.org/' #
__ebiUrl__ = 'http://www.ebi.ac.uk/Tools/'
__ebiGOUrl__ = 'https://www.ebi.ac.uk/QuickGO/services/'
__uniprotUrl__ = 'http://www.uniprot.org/'
__ebiSearchUrl__ = 'http://www.ebi.ac.uk/ebisearch/'
def fetch(entryId, dbName='uniprotkb', format='fasta'):
"""
......@@ -27,16 +28,17 @@ def fetch(entryId, dbName='uniprotkb', format='fasta'):
dbName: name of database e.g. 'uniprotkb' or 'pdb' or 'refseqn'; see http://www.ebi.ac.uk/Tools/dbfetch/dbfetch/dbfetch.databases for available databases
format: file format specific to database e.g. 'fasta' or 'uniprot' for uniprotkb (see http://www.ebi.ac.uk/Tools/dbfetch/dbfetch/dbfetch.databases)
See http://www.ebi.ac.uk/Tools/dbfetch/syntax.jsp for more info re URL syntax
http://www.ebi.ac.uk/Tools/dbfetch/dbfetch?db=uniprotkb&id=P63166&format=fasta&style=raw&Retrieve=Retrieve
"""
# Construct URL
url = __ebiUrl__ + 'dbfetch/dbfetch?style=raw&db=' + dbName + '&format=' + format + '&id=' + entryId
url = __ebiUrl__ + 'dbfetch/dbfetch?style=raw&Retrieve=Retrieve&db=' + dbName + '&format=' + format + '&id=' + entryId
# Get the entry
try:
data = urllib.request.urlopen(url).read().decode("utf-8")
if data.startswith("ERROR"):
raise RuntimeError(data)
return data
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
......@@ -95,53 +97,13 @@ authorised_database_tag = {9606: ['Homo sapiens', 'ACC', 'ID'],
4932: ['Saccharomyces cerevisiae', 'SGD_ID', 'CYGD_ID'],
10090: ['Mus musculus', 'MGI_ID']}
def idmap(identifiers, frm='ACC', to='P_REFSEQ_AC', format='tab', reverse=False):
"""
Map identifiers between databases (based on UniProtKB; see http://www.uniprot.org/faq/28)
identifiers: a list of identifiers (list of strings)
frm: the tag/abbreviation for the identifier FROM which to idmap
to: the tag/abbreviation for the identifier TO which to idmap
format: the results format to use
reverse: reverse the returned mapping key (to) -> value (from)
Returns a dictionary with key (from) -> value (to)
Set reverse to True if dictionary should contain the reverse mapping, useful if the mapping is non-unique
"""
url = __uniprotUrl__ + 'mapping/'
# construct query by concatenating the list of identifiers
if isinstance(identifiers, str):
query = identifiers.strip()
else: # assume it is a list of strings
query = ''
for id in identifiers:
query = query + id.strip() + ' '
query = query.strip() # remove trailing spaces
params = {
'from' : frm,
'to' : to,
'format' : format,
'query' : query
}
if len(query) > 0:
request = urllib.request.Request(url, urllib.parse.urlencode(params))
response = urllib.request.urlopen(request).read()
d = dict()
for row in response.splitlines()[1:]:
pair = row.split('\t')
if not reverse:
d[pair[0]] = pair[1]
else:
d[pair[1]] = pair[0]
return d
else:
return dict()
"""
Gene Ontology service (QuickGO)
http://www.ebi.ac.uk/QuickGO/WebServices.html
Note that this service can be slow for queries involving a large number of entries.
"""
def getGOReport(positives, background = None, database = 'UniProtKB'):
def getGOReport(positives, background = None):
""" Generate a complete GO term report for a set of genes (positives).
Each GO term is also assigned an enrichment p-value (on basis of background, if provided).
Returns a list of tuples (GO_Term_ID[str], Foreground_no[int], Term_description[str]) with no background, OR
......@@ -149,7 +111,7 @@ def getGOReport(positives, background = None, database = 'UniProtKB'):
E-value is a Bonferroni-corrected p-value.
"""
pos = set(positives)
fg_map = getGOTerms(pos, database)
fg_map = getGOTerms(pos)
fg_list = []
for id in fg_map:
for t in fg_map[id]:
......@@ -159,7 +121,7 @@ def getGOReport(positives, background = None, database = 'UniProtKB'):
neg = set()
if background != None:
neg = set(background).difference(pos)
bg_map = getGOTerms(neg, database)
bg_map = getGOTerms(neg)
for id in bg_map:
for t in bg_map[id]:
bg_list.append(t)
......@@ -195,129 +157,108 @@ def getGODef(goterm):
Retrieve information about a GO term
goterm: the identifier, e.g. 'GO:0002080'
"""
# Construct URL
url = __ebiGOUrl__ + 'GTerm?format=obo&id=' + goterm
# first turn off server certificate verification
if (not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)):
ssl._create_default_https_context = ssl._create_unverified_context
# Construct URL with query term
url = __ebiGOUrl__ + 'ontology/go/search?query=' + goterm
# Get the entry: fill in the fields specified below
try:
entry={'id': None, 'name': None, 'def': None}
entry={'id': None, 'name': None, 'aspect': None}
data = urllib.request.urlopen(url).read().decode("utf-8")
for row in data.splitlines():
index = row.find(':')
if index > 0 and len(row[index:]) > 1:
field = row[0:index].strip()
value = row[index+1:].strip(' "') # remove spaces and quotation marks
if field in list(entry.keys()): # check if we need this field
if entry[field] == None: # check if not yet assigned
entry[field] = value
ret = json.loads(data)
for row in ret['results']:
for key in entry:
try:
entry[key] = row[key]
except:
entry[key] = None
entry['def'] = row['definition']['text']
return entry
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
def getGOTerms(genes, database='UniProtKB', completeAnnot = False):
def getGOTerms(genes):
"""
Retrieve all GO terms for a given set of genes (or single gene).
database: use specified database, e.g. 'UniProtKB', 'UniGene', or 'Ensembl'
The result is given as a map (key=gene name, value=list of unique terms) OR
in the case of a single gene as a list of unique terms.
If completeAnnot is True (default is False) then the above "terms" is the first element
in a tuple with (gene-terms-map, gene-taxon-id).
The result is given as a map (key=gene name, value=list of unique terms).
"""
if type(genes) != list and type(genes) != set and type(genes) != tuple:
genes = [genes]
termsmap = dict()
taxonmap = dict()
uri_string = 'GAnnotation?format=tsv&gz&db=' + database + '&protein='
# build queries (batches of genes)
queryLength = 2000
queries = []
query = None
for gene in genes:
if query == None:
query = gene
elif len(query) < queryLength:
query += ','+gene
else:
queries.append(query)
query = gene
if query != None:
queries.append(query)
# execute queries, each involving a number of genes
for query in queries:
# Construct URL
url = __ebiGOUrl__ + uri_string + query
# Get the entry: fill in the fields specified below
try:
urlreq = urllib.request.Request(url)
urlreq.add_header('Accept-encoding', 'gzip')
response = urllib.request.urlopen(urlreq)
if response.info().get('Content-Encoding') == 'gzip':
buf = StringIO(response.read())
f = gzip.GzipFile(fileobj=buf)
data = f.read().decode("utf-8")
map = dict()
uri_string = 'annotation/search?geneProductId='
batchsize = 100 # size of query batch
genecnt = 0
while genecnt < len(genes):
genebatch = []
for index in range(batchsize):
if genecnt < len(genes):
genebatch.append(genes[genecnt])
else:
data = response.read().decode("utf-8")
for row in data.splitlines()[1:]: # we ignore first (header) row
values = row.split('\t')
if len(values) >= 7:
key = values[1]
if key in termsmap:
termsmap[key].add(values[6])
break
genecnt += 1
for i in range(len(genebatch)):
gene = genebatch[i]
uri_string += gene + "," if i < len(genes) - 1 else gene
# Construct URL
url = __ebiGOUrl__ + uri_string
# Get the entry: fill in the fields specified below
try:
# first turn off server certificate verification
if (not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)):
ssl._create_default_https_context = ssl._create_unverified_context
urlreq = urllib.request.Request(url)
urlreq.add_header('Accept-encoding', 'gzip')
response = urllib.request.urlopen(urlreq)
if response.info().get('Content-Encoding') == 'gzip':
buf = StringIO(response.read())
f = gzip.GzipFile(fileobj=buf)
data = f.read().decode("utf-8")
else:
data = response.read().decode("utf-8")
ret = json.loads(data)
for row in ret['results']:
genename = row['geneProductId'] # would look like "UniProtKB:A0A140VJQ9"
gotermid = row['goId'] # would look like "GO:0002080"
if not genename in map:
map[genename] = set([gotermid])
else:
termsmap[key] = set([values[6]])
taxonmap[key] = int(values[4])
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
if completeAnnot:
if len(genes) == 1:
if len(termsmap) == 1:
return (termsmap[genes[0]], taxonmap[genes[0]])
else:
return (set(), None)
else:
return (termsmap, taxonmap)
else:
if len(genes) == 1:
if len(termsmap) == 1:
return termsmap[genes[0]]
else:
return set()
else:
return termsmap
map[genename].add(gotermid)
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
return map
def getGenes(goterms, database='UniProtKB', taxo=None):
def getGenes(goterms, taxo=None):
"""
Retrieve all genes/proteins for a given set of GO terms (or single GO term).
database: use specified database, e.g. 'UniProtKB', 'UniGene', or 'Ensembl'
taxo: use specific taxonomic identifier, e.g. 9606 (human)
The result is given as a map (key=gene name, value=list of unique terms) OR
in the case of a single gene as a list of unique terms.
Genes that are annotated with a more specific GO term than those given are included.
taxo: use specific taxonomic identifier, e.g. 9606 (human); default is all
The result is given as a map (key=gene name, value=list of unique terms).
"""
if type(goterms) != list and type(goterms) != set and type(goterms) != tuple:
goterms = [goterms]
map = dict()
if taxo == None:
uri_string = 'GAnnotation?format=tsv&db=' + database + '&term='
else:
uri_string = 'GAnnotation?format=tsv&db=' + database + '&tax=' + str(taxo) + '&term='
for goterm in goterms:
genes = set()
# Construct URL
url = __ebiGOUrl__ + uri_string + goterm.strip()
# Get the entry: fill in the fields specified below
try:
data = urllib.request.urlopen(url).read().decode("utf-8")
for row in data.splitlines()[1:]: # we ignore first (header) row
values = row.split('\t')
if len(values) >= 7:
genes.add(values[1])
map[goterm] = list(genes)
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
if len(goterms) == 1:
return map[goterms[0]]
else:
return map
uri_string = 'annotation/search?taxonId=' + taxo + "&goId=" if taxo else 'annotation/search?goId='
for i in range(len(goterms)):
goterm = goterms[i]
uri_string += goterm + "," if i < len(goterms) - 1 else goterm
# Get the entry: fill in the fields specified below
try:
# first turn off server certificate verification
if (not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)):
ssl._create_default_https_context = ssl._create_unverified_context
data = urllib.request.urlopen(__ebiGOUrl__ + uri_string).read().decode("utf-8")
ret = json.loads(data)
for row in ret['results']:
genename = row['geneProductId'] # would look like "UniProtKB:A0A140VJQ9"
gotermid = row['goId'] # would look like "GO:0002080"
if not gotermid in map:
map[gotermid] = set([genename])
else:
map[gotermid].add(genename)
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
return map
class EBI(object):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment