lists gene names to include in new gaf-file-dest")
sys.exit(1)
if sys.argv[1].upper() == 'INCLUDE' and len(sys.argv) > 4:
f = open(sys.argv[4])
include_genes = set()
for line in f:
include_genes.add(line.strip())
reduceAnnotFile(sys.argv[2], sys.argv[3], include_genes)
else:
print('Unknown command \"' + sys.argv[1] + '\" or arguments:'),
for arg in sys.argv[2:]:
print(arg),
binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/gtf.py 0000664 0000000 0000000 00000020514 13400670236 0020616 0 ustar 00root root 0000000 0000000 import shlex
import ival
class GtfEntry():
'''
GFF fields:
seqname - The name of the sequence. Must be a chromosome or scaffold.
source - The program that generated this feature.
feature - The name of this type of feature. Some examples of standard feature types are "CDS" "start_codon" "stop_codon" and "exon"li>
start - The starting position of the feature in the sequence. The first base is numbered 1.
end - The ending position of the feature (inclusive).
score - A score between 0 and 1000. If the track line useScore attribute is set to 1 for this annotation data set, the score value will determine the level of gray in which this feature is displayed (higher numbers = darker gray). If there is no score value, enter ":.":.
strand - Valid entries include "+", "-", or "." (for don't know/don't care).
frame - If the feature is a coding exon, frame should be a number between 0-2 that represents the reading frame of the first base. If the feature is not a coding exon, the value should be ".".
group - All lines with the same group are linked together into a single item.
'''
def __init__(self, chrom, start, end, feature, score = ".", source = "unknown", strand = ".", frame = ".", group = None):
self.seqname = chrom
self.start = start
self.end = end
self.feature = feature
self.score = score
self.strand = strand
self.source = source
self.frame = frame
self.group = group
self.attr = {}
if self.group:
fields = self.group.split(';')
for f in fields:
pair = shlex.split(f.strip())
if len(pair) == 2:
self.attr[pair[0]] = pair[1]
def __getitem__(self, item):
return self.attr[item]
def __contains__(self, item):
return item in self.attr
def __str__(self):
return str((self.seqname, self.start, self.end))
def __len__(self):
return self.end - self.start
def getInterval(self):
return ival.Interval(self.start, self.end)
def dist(entry1, entry2, signed = False, centre2centre = False):
""" Calculate and return the BedEntry with the closest distance (from one end of the interval of this to the end of the interval of that).
If centre2centre is True, use the centre-to-centre distance instead.
If signed is True, the distance is negative if this interval is after the that.
"""
if isinstance(entry1, GtfEntry) and isinstance(entry2, GtfEntry):
if (entry1.seqname == entry2.seqname):
return ival.dist(entry1.getInterval(), entry2.getInterval(), signed, centre2centre)
return None
class GtfFile:
""" Read GTF/GFF file.
See http://genome.ucsc.edu/FAQ/FAQformat#format1
"""
def __init__(self, entries):
"""
Create a GtfFile instance.
:param entries: an iterable of entries or a filename
"""
if isinstance(entries, str): # filename
self.chroms = readGtfFile(entries)
else:
self.chroms = dict()
for entry in entries:
# check if the chromosome has been seen before
tree = self.chroms.get(entry.chrom)
if not tree:
tree = ival.IntervalTree()
self.chroms[entry.chrom] = tree
# put the entry in the interval tree for the appropriate chromosome
iv = ival.Interval(entry.start, entry.end)
tree.put(iv, entry)
def __len__(self):
n = 0
for c in self.chroms:
n += len(self.chroms[c])
return n
def generate(self, chrom):
mytree = self.chroms.get(chrom)
if mytree != None:
for e in mytree:
for entry in e.values:
yield entry
def __iter__(self):
self.chromqueue = ival.Stack()
for c in sorted(self.chroms.keys())[::-1]:
self.chromqueue.push(self.generate(c))
self.current = self.chromqueue.pop()
return self
def __next__(self):
try:
ret = next(self.current)
except StopIteration:
if not self.chromqueue.isEmpty():
self.current = self.chromqueue.pop()
ret = next(self.current)
else:
raise StopIteration
return ret
def __contains__(self, item):
if isinstance(item, GtfEntry):
tree = self.chroms.get(item.chrom)
if tree == None: return False
else: return ival.Interval(item.start, item.end) in tree
else:
return False
def getOverlap(self, item):
if isinstance(item, GtfEntry):
tree = self.chroms.get(item.chrom)
if tree == None: return None
else:
iv = ival.Interval(item.start, item.end)
res = tree.isectall(iv)
ret = []
for r in res:
ret.extend(r.values)
return ret
else: return None
def getClosest(self, item):
if isinstance(item, GtfEntry):
tree = self.chroms.get(item.chrom)
if tree == None: return None
else:
iv = ival.Interval(item.start, item.end)
node = tree.closest(iv)
if node != None: return node.values
else: return None
else: return None
def getOneOfClosest(self, item):
all = self.getClosest(item)
if all == None: return None
else: return next(iter(all))
def getOneOfOverlap(self, item):
all = self.getOverlap(item)
if all == None: return None
elif len(all) == 0: return None
else: return next(iter(all))
def readGtfFile(filename):
""" Read a GTF/GFF file.
"""
f = open(filename)
row = 0
acceptHeaderRows = 1
headerRow = None
chroms = dict()
for line in f:
row += 1
words = line.strip().split('\t')
if len(words) == 0:
continue # ignore empty lines
if words[0].strip().startswith('#'):
continue # comment
if words[0].strip().startswith('browser'):
continue # ignore
if words[0].strip().startswith('track'):
continue # ignore
try:
seqname = words[0]
source = words[1]
feature = words[2]
start = int(words[3])
end = int(words[4])
score = None
if words[5].isnumeric():
score = int(words[5])
strand = '.'
if words[6] == '+' or words[6] == '-':
strand = words[6]
frame = None
if words[7].isdigit():
frame = int(words[7])
group = None
if len(words) > 8:
group = words[8]
entry = GtfEntry(seqname, start, end, feature, score, source, strand, frame, group)
# check if the chromosome has been seen before
tree = chroms.get(seqname)
if not tree:
tree = ival.IntervalTree()
chroms[seqname] = tree
# put the entry in the interval tree for the appropriate chromosome
iv = ival.Interval(entry.start, entry.end)
tree.put(iv, entry)
except RuntimeError as e:
if not acceptHeaderRows:
raise RuntimeError('Error in GTF/GFF file at row %d (%s)' % (row, e.strerror))
else:
headerRow = words
acceptHeaderRows -= 1 # count down the number of header rows that can occur
f.close()
return chroms
def writeGtfFile(entries, filename, header = None):
""" Save the GTF entries to a file.
"""
f = open(filename, 'w')
if header:
f.write(header + '\n')
for row in entries:
f.write("%s\t%s\t%s\t%d\t%d\t%d\t%s\t%s\t%s" % (row.chrom, row.source, row.feature, row.start, row.end, row.score, row.strand, row.frame, row.group))
f.write("\n")
f.close()
if __name__ == '__main__':
bf = GtfFile('/Users/mikael/simhome/NFIX/WT1677.gtf')
print(bf.chroms.keys())
g = bf.generate('chr12')
print(next(g))
print(next(g))
print(next(g))
cnt = 0
for entry in bf:
cnt += 1
print(str(cnt) + '\t' + str(entry))
if cnt == 100:
break
binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/guide.py 0000664 0000000 0000000 00000164327 13400670236 0021146 0 ustar 00root root 0000000 0000000 ###################################################
# This module is a supplement to the Python guide #
# Version 2017.3 (10/03/2017) #
###################################################
'''
This module contains code that can help solve bioinformatics problems.
See the accompanying Python guide document for more explanations and examples.
Alphabet is a class that defines valid symbols that we then use to make up valid
biological sequences. Note that we also define variables corresponding to
DNA, RNA and Protein sequences that can be used directly.
Sequence is a class that defines basic parts and operations on biological sequences.
Alignment is a class that defines an alignment of sequences (how symbols in different sequences line
up when placed on-top of one another). Alignment methods should generate instances of this class.
SubstMatrix is a class that defines a substitution matrix, i.e. a scoring system for performing
alignments. You can read these from files or construct them manually.
GeneProfile is a class that defines parts and operations for gene expression profiles. Essentially,
the class will help to index expression data by gene name (rows) and by sample name (columns).
There are several methods not tied to a particular class because they construct new instances,
e.g. reading from file, retrieving from the internet, creating an alignment from sequences etc.
You need to have numpy installed (see http://www.numpy.org/).
Should work with Python v3.5 (see http://www.python.org/).
The code may contain bugs--please report to m.boden@uq.edu.au
'''
import math, numpy, urllib.request, urllib.parse, urllib.error, urllib.request, urllib.error, urllib.parse
###############################################################################
# Alphabet #
###############################################################################
class Alphabet():
""" A minimal class for alphabets
Alphabets include DNA, RNA and Protein """
def __init__(self, symbolString):
self.symbols = symbolString
def __len__(self): # implements the "len" operator, e.g. "len(Alphabet('XYZ'))" results in 3
return len(self.symbols) # will tell you the length of the symbols in an Alphabet instance
def __contains__(self, sym): # implements the "in" operator, e.g. "'A' in Alphabet('ACGT')" results in True
return sym in self.symbols # will tell you if 'A' is in the symbols in an Alphabet instance
def __iter__(self): # method that allows us to iterate over all symbols, e.g. "for sym in Alphabet('ACGT'): print sym" prints A, C, G and T on separate lines
tsyms = tuple(self.symbols)
return tsyms.__iter__()
def __getitem__(self, ndx):
""" Retrieve the symbol(s) at the specified index (or slice of indices) """
return self.symbols[ndx]
def index(self, sym):
""" Retrieve the index of the given symbol in the alphabet. """
return self.symbols.index(sym)
def __str__(self):
return self.symbols
""" Below we declare alphabet variables that are going to be available when
this module (this .py file) is imported """
DNA_Alphabet = Alphabet('ACGT')
RNA_Alphabet = Alphabet('ACGU')
Protein_Alphabet = Alphabet('ACDEFGHIKLMNPQRSTVWY')
Protein_wX = Alphabet('ACDEFGHIKLMNPQRSTVWYX')
Protein_wGAP = Alphabet('ACDEFGHIKLMNPQRSTVWY-')
###############################################################################
# Sequence #
###############################################################################
class Sequence():
""" A biological sequence class. Stores the sequence itself,
the alphabet and a name.
Usage:
Create an instance of Sequence - have to pass in sequence, alphabet and name - gappy is an optional argument
>>> seq1 = Sequence('ACGGGAGAGG', DNA_Alphabet, 'ABC')
>>> print(seq1)
ABC: ACGGGAGAGG
>>> 'C' in seq1
True
>>> for sym in seq1:
... print(sym)
"""
def __init__(self, sequence, alphabet, name = '', gappy = False, annot = ''):
""" Construct a sequence from a string, an alphabet (gappy or not) and a name.
The parameter gappy is for sequences when used in alignments. """
for sym in sequence:
if not sym in alphabet and (sym != '-' or not gappy): # error check: bail out
raise RuntimeError('Invalid symbol: ' + sym)
self.sequence = sequence # Store sequence
self.alphabet = alphabet # Store alphabet
self.name = name # Store name
self.gappy = gappy
self.annot = annot # some annotation, e.g. species
def __len__(self): # the "len" operator
return len(self.sequence)
def __iter__(self): # method that allows us to iterate over a sequence
tsyms = tuple(self.sequence)
return tsyms.__iter__()
def __contains__(self, item): # test for membership (the "in" operator)
for sym in self.sequence:
if sym == item:
return True
return False
def __getitem__(self, ndx): # [ndx] operator (retrieve a specified index (or a "slice" of indices) of the sequence data.
return self.sequence[ndx]
def writeFasta(self):
""" Write one sequence in FASTA format to a string and return it. """
fasta = '>' + self.name + ' ' + self.annot + '\n'
data = self.sequence
nlines = (len(self.sequence) - 1) // 60 + 1
for i in range(nlines):
lineofseq = ''.join(data[i*60 : (i+1)*60]) + '\n'
fasta += lineofseq
return fasta
def __str__(self): # "pretty" print sequence
str = self.name + ': '
for sym in self:
str += sym
return str
def count(self, findme):
""" Get the number of occurrences of specified symbol """
cnt = 0
for sym in self.sequence:
if findme == sym:
cnt = cnt + 1
return cnt
def find(self, findme):
""" Find the position of the specified symbol or sub-sequence """
return self.sequence.find(findme)
###############################################################################
# Alignment #
###############################################################################
class Alignment():
""" A sequence alignment class. Stores two or more sequences of equal length where
one symbol is gap '-'. The number of columns in the alignment is given by alignlen.
Example usage:
>>> seqs = [Sequence('THIS-LI-NE', Protein_Alphabet, gappy = True), Sequence('--ISALIGNED', Protein_Alphabet, gappy = True)]
>>> print(Alignment(seqs))
THIS-LI-NE-
--ISALIGNED """
def __init__(self, seqs):
self.alphabet = None
self.alignlen = -1
self.seqs = seqs
self.namelen = 0
for s in seqs:
if self.alphabet == None:
self.alphabet = s.alphabet
elif self.alphabet != s.alphabet:
raise RuntimeError("Alignment invalid: contains a mix of alphabets")
if self.alignlen == -1:
self.alignlen = len(s)
elif self.alignlen != len(s):
raise RuntimeError("Alignment invalid: lengths vary")
self.namelen = max(len(s.name), self.namelen)
def __str__(self):
string = u''
for seq in self.seqs:
string += seq.name.ljust(self.namelen+1)
for sym in seq:
string += sym
string += '\n'
return string
def __len__(self):
""" Defines what the "len" operator returns for an instance of Alignment: the number of sequences. """
return len(self.seqs)
def __getitem__(self, ndx):
return self.seqs[ndx]
def calcDistances(self, measure = 'fractional', a=1.0):
""" Calculate the evolutionary distance between all pairs of sequences
in this alignment, using the given measure. Measure can be one of
'fractional', 'poisson', 'gamma', 'jc' or 'k2p'. If 'gamma' or 'k2p' is
given, then the parameter a must also be specified (or else it will use
the default value of 1.0).
Definitions of each distance metric are found in Zvelebil and Baum p268-276.
These are mostly intended for DNA, but adapted for protein (as below).
Note however that there are alternative distance matrices for proteins (p276).
"""
measure = measure.lower()
if not measure in ['fractional', 'poisson', 'gamma', 'jc', 'k2p']:
raise RuntimeError('Unsupported evolutionary distance measure: %s' % measure)
a = float(a)
distmat = numpy.zeros((len(self.seqs), len(self.seqs)))
# Loop through each pair of sequences
for i in range(len(self.seqs)):
for j in range(i + 1, len(self.seqs)):
seqA = self.seqs[i]
seqB = self.seqs[j]
# Calculate the fractional distance (p) first
# The two sequences of interest are in seqA and seqB
L = 0
D = 0
for k in range(self.alignlen):
# For every non-gapped column, put to L
# For every non-gapped column where the sequences are
# different, put to D
if seqA[k] != '-' and seqB[k] != '-':
L += 1
if seqA[k] != seqB[k]:
D += 1
p = D / L
# Now calculate the specified measure based on p
if measure == 'fractional':
dist = p
else:
raise RuntimeError('Not implemented: %s' % measure)
distmat[i, j] = distmat[j, i] = dist
return distmat
def writeClustal(self):
""" Write the alignment to a string using the Clustal file format. """
symbolsPerLine = 60
maxNameLength = self.namelen + 1
mystring = u''
wholeRows = self.alignlen // symbolsPerLine
for i in range(wholeRows):
for j in range(len(self.seqs)):
mystring += self.seqs[j].name.ljust(maxNameLength) + u' '
mystring += self.seqs[j][i*symbolsPerLine:(i+1)*symbolsPerLine] + u'\n'
mystring += u'\n'
# Possible last row
lastRowLength = self.alignlen - wholeRows*symbolsPerLine
if lastRowLength > 0:
for j in range(len(self.seqs)):
if maxNameLength > 0:
mystring += self.seqs[j].name.ljust(maxNameLength) + u' '
mystring += self.seqs[j][-lastRowLength:] + '\n'
return mystring
def writeHTML(self, filename):
""" Generate HTML that displays the alignment in colour. """
fh = open(filename, 'wt')
fh.write(u'\nSequence Alignment\n\n')
html = u''.ljust(self.namelen) + u' '
for i in range(self.alignlen - 1):
if (i+1) % 10 == 0:
html += str(i//10+1)[-1]
else:
html += ' '
html += u'%s\n' % (self.alignlen)
fh.write(html)
if self.alignlen > 10:
html = u''.ljust(self.namelen) + u' '
for i in range(self.alignlen - 1):
if (i+1) % 10 == 0:
html += u'0'
else:
html += u' '
html += u'\n'
fh.write(html)
if len(self.alphabet) <= 5: # DNA or RNA
colours = {'A':u'green','C':u'orange','G':u'red','T':u'#66bbff','U':u'#66bbff'}
else: # amino acids
colours = {'G':u'orange','P':u'orange','S':u'orange','T':u'orange','H':u'red','K':u'red','R':u'red','F':u'#66bbff','Y':u'#66bbff','W':u'#66bbff','I':u'green','L':u'green','M':u'green','V':u'green'}
for seq in self.seqs:
html = seq.name.ljust(self.namelen) + u' '
for sym in seq:
try:
colour = colours[sym]
except KeyError:
colour = u'white'
html += u'%s' % (colour, sym)
html += u'\n'
fh.write(html)
fh.write(u'
\n')
fh.close()
def scoreAlignment(aln, substmat = None, gap = -1):
"""Score an alignment (aln) using a substitution matrix (substmat).
If the alignment consists of more than two sequences, the minimum
score of each column is used.
If substmat is not specified (None), the count of matches is returned.
"""
nseqs = len(aln.seqs)
total = 0
for pos in range(aln.alignlen):
min = None
for i in range(nseqs):
for j in range(i+1, nseqs):
gap_here = aln.seqs[i][pos] == '-' or aln.seqs[j][pos] == '-'
score = 0
if substmat == None:
if aln.seqs[i][pos] == aln.seqs[j][pos]:
score = 1
else: # we have a substitution matrix
if gap_here:
score = gap
else:
score = substmat.get(aln.seqs[i][pos], aln.seqs[j][pos])
if min == None:
min = score
elif min > score:
min = score
total += min
return total
###############################################################################
# Methods to create instances of Alignment #
###############################################################################
def align(seqA, seqB, substMatrix, gap=-1):
""" Align seqA with seqB using the Needleman-Wunsch
(global) algorithm. substMatrix is the substitution matrix to use and
gap is the linear gap penalty to use. """
stringA, stringB = seqA.sequence, seqB.sequence
lenA, lenB = len(seqA), len(seqB)
# Create the scoring matrix (S) and a matrix for traceback
S = numpy.zeros((lenA + 1, lenB + 1))
Traceback = numpy.zeros((lenA + 1, lenB + 1))
# Fill the first row and column of S with multiples of the gap penalty
for i in range(lenA + 1):
S[i, 0] = i * gap
for j in range(lenB + 1):
S[0, j] = j * gap
# Calculate the optimum score at each location in the matrix, note which option that was chosen for traceback
for i in range(1, lenA + 1):
for j in range(1, lenB + 1):
match = S[i - 1, j - 1] + substMatrix.get(stringA[i - 1], stringB[j - 1])
delete = S[i - 1, j] + gap
insert = S[i, j - 1] + gap
Traceback[i, j] = numpy.argmax([match, delete, insert])
S[i, j] = max([match, delete, insert])
# Trace back the optimal alignment
alignA = ''
alignB = ''
# Start at the end
i = lenA
j = lenB
# Stop when we hit the end of a sequence
while i > 0 and j > 0:
if Traceback[i, j] == 1:
# Got here by a gap in sequence B (go up)
alignA = stringA[i - 1] + alignA
alignB = '-' + alignB
i -= 1
elif Traceback[i, j] == 2:
# Got here by a gap in sequence A (go left)
alignA = "-" + alignA
alignB = stringB[j - 1] + alignB
j -= 1
else:
# Got here by aligning the bases (go diagonally)
alignA = stringA[i - 1] + alignA
alignB = stringB[j - 1] + alignB
i -= 1
j -= 1
# Fill in the rest of the alignment if it begins with gaps
# (i.e., trace back all the way to S[0, 0])
while i > 0:
# Go up
alignA = stringA[i - 1] + alignA
alignB = '-' + alignB
i -= 1
while j > 0:
# Go left
alignA = '-' + alignA
alignB = stringB[j - 1] + alignB
j -= 1
return Alignment([Sequence(alignA, seqA.alphabet, seqA.name, gappy=True),
Sequence(alignB, seqB.alphabet, seqB.name, gappy=True)])
###############################################################################
# SubstMatrix #
###############################################################################
class SubstMatrix():
""" Create a substitution matrix for an alphabet.
Example usage:
>>> sm = SubstMatrix(DNA_Alphabet)
>>> for a in DNA_Alphabet:
... for b in DNA_Alphabet:
... if a > b:
... sm.set(a, b, -1)
... elif a == b:
... sm.set(a, b, +1)
...
>>> print(sm)
A 1
C -1 1
G -1 -1 1
T -1 -1 -1 1
A C G T
>>> sm.get('C', 'T')
-1
"""
def __init__(self, alphabet, scoremat = None):
self.scoremat = scoremat or {} # start with empty dictionary
self.alphabet = alphabet
def _getkey(self, sym1, sym2):
""" Construct canonical (unordered) key for two symbols """
if sym1 <= sym2:
return tuple([sym1, sym2])
else:
return tuple([sym2, sym1])
def set(self, sym1, sym2, score):
""" Add a score to the substitution matrix """
self.scoremat[self._getkey(sym1, sym2)] = score
def get(self, sym1, sym2):
return self.scoremat[self._getkey(sym1, sym2)]
def __str__(self):
symbols = self.alphabet.symbols # what symbols are in the alphabet
i = len(symbols)
string = u''
for a in symbols:
string += a + ' '
for b in symbols[:len(symbols)-i+1]:
score = self.scoremat[self._getkey(a, b)]
if score != None:
string += str(score).rjust(3) + ' '
else:
string += "?".rjust(3) + ' '
string += '\n'
i -= 1
string += ' ' + ' '.join(self.alphabet.symbols)
return string
def writeFile(self, filename):
""" Write this substitution matrix to the given file. """
fh = open(filename, 'w')
file = u''
for key in self.scoremat:
file += ''.join(key) + ': ' + str(self.scoremat[key]) + '\n'
fh.write(file)
fh.close()
###############################################################################
# Below are some useful methods for loading data from strings and files. #
# They recognize the FASTA and Clustal formats (nothing fancy). #
###############################################################################
def readSubstMatrix(filename, alphabet):
""" Read in the substitution matrix stored in the given file. """
mat = SubstMatrix(alphabet)
fh = open(filename, 'r')
data = fh.read()
fh.close()
lines = data.splitlines()
for line in lines:
if len(line.strip()) == 0:
continue
symbols, score = line.split(':')
score = int(score)
mat.set(symbols[0], symbols[1], score)
return mat
def readFastaString(string, alphabet, gappy = False):
""" Read the given string as FASTA formatted data and return the list of
sequences contained within it. """
seqlist = [] # list of sequences contained in the string
seqname = '' # name of *current* sequence
seqannot = '' # annotation of *current* sequence
seqdata = [] # sequence data for *current* sequence
for line in string.splitlines(): # read every line
if len(line) == 0: # ignore empty lines
continue
if line[0] == '>': # start of new sequence
if seqname: # check if we've got one current
current = Sequence(''.join(seqdata), alphabet, seqname, gappy, seqannot)
seqlist.append(current)
# now collect data about the new sequence
parts = line[1:].split() # skip first char
seqname = '' # name of *current* sequence
seqannot = '' # annotation of *current* sequence
if len(parts) > 0: seqname = parts[0]
if len(parts) > 1: seqannot = line[len(seqname) + 2:] # the rest of the line
seqdata = []
else: # we assume this is (more) data for current
cleanline = line.split()
for thisline in cleanline:
seqdata.extend(tuple(thisline.strip('*')))
# we're done reading the file, but the last sequence remains
if seqname:
lastseq = Sequence(''.join(seqdata), alphabet, seqname, gappy, seqannot)
seqlist.append(lastseq)
return seqlist
def readFastaFile(filename, alphabet):
""" Read the given FASTA formatted file and return the list of sequences
contained within it. """
fh = open(filename)
data = fh.read()
fh.close()
seqlist = readFastaString(data, alphabet)
return seqlist
def writeFastaFile(filename, seqs):
""" Write the specified sequences to a FASTA file. """
fh = open(filename, 'wt')
for seq in seqs:
fh.write(seq.writeFasta())
fh.close()
def readClustalString(string, alphabet):
""" Read a ClustalW2 alignment in the given string and return as an
Alignment object. """
seqs = {} # sequence data
for line in string.splitlines():
if line.startswith('CLUSTAL') or line.startswith('STOCKHOLM') \
or line.startswith('#'):
continue
if len(line.strip()) == 0:
continue
if line[0] == ' ' or '*' in line or ':' in line:
continue
sections = line.split()
name, seq = sections[0:2]
if name in seqs:
seqs[name] += seq
else:
seqs[name] = seq
sequences = []
for name, seq in seqs.items():
sequences.append(Sequence(seq, alphabet, name, gappy = True))
return Alignment(sequences)
def readClustalFile(filename, alphabet):
""" Read a ClustalW2 alignment file and return an Alignment object
containing the alignment. """
fh = open(filename)
data = fh.read()
fh.close()
aln = readClustalString(data, alphabet)
return aln
def writeClustalFile(filename, aln):
""" Write the specified alignment to a Clustal file. """
fh = open(filename, 'wt')
fh.write('CLUSTAL W (1.83) multiple sequence alignment\n\n\n') # fake header so that clustal believes it
fh.write(aln.writeClustal())
fh.close()
###############################################################################
# GeneProfile #
###############################################################################
class GeneProfile():
""" A class for gene expression data.
Example usage:
>>> gp = GeneProfile('MyMicroarray', ['Exp1', 'Exp2'])
>>> gp['gene1'] = [0.1, 0.5]
>>> gp['gene2'] = [2, 1]
>>> gp.getSample('Exp2')
{'gene1': [0.5], 'gene2': [1.0]}
"""
def __init__(self, dataset_name='', sample_names=[], profiles = None):
""" Create a gene profile set. """
self.name = dataset_name
self.samples = sample_names
self.genes = profiles or {} # dictionary for storing all gene--measurement pairs
def __setitem__(self, name, probevalues):
if len(probevalues) == len(self.samples):
self.genes[name] = [float(y) for y in probevalues]
else:
raise RuntimeError('Invalid number of measurements for probe ' + name)
def __getitem__(self, name):
return self.genes[name]
def getSorted(self, index, descending=True):
"""Get a list of (gene, value) tuples in descending order by value"""
key_fn = lambda v: v[1][index]
return sorted(list(self.genes.items()), key=key_fn, reverse=descending)
def addSample(self, sample_name, sample_dict):
"""Add a sample to the current data set.
sample_dict is a dictionary with the same keys as the current gene set.
Only values for genes in the current set will be added. """
self.headers.extend(sample_name)
if not self.genes:
self.genes = sample_dict
else:
for gene in self.genes:
values = sample_dict[gene]
if values:
self.genes[gene].extend([float(y) for y in values])
else:
self.genes[gene].extend([0.0 for _ in sample_name])
return self.genes
def getSample(self, sample_name):
"""Construct a gene dictionary including only named samples. """
mygenes = {}
if isinstance(sample_name, str): # a single sample-name
mysamples = [sample_name]
else: # a list of sample-names
mysamples = sample_name
for gene in self.genes:
mygenes[gene] = []
for name in mysamples:
mygenes[gene].append(self.genes[gene][self.samples.index(name)])
return mygenes
def getRatio(self, sample1, sample2):
"""Get the ratio of two samples in the data set. """
mygenes = {}
index1 = self.samples.index(sample1)
index2 = self.samples.index(sample2)
for gene in self.genes:
mygenes[gene] = []
mygenes[gene].append(self.genes[gene][index1] / self.genes[gene][index2])
return mygenes
def __str__(self):
""" Display data as a truncated GEO SOFT file named filename. """
line = '^DATASET = ' + self.dataset + '\n'
line += '!dataset_table_begin\nID_REF\t'
for header in self.headers:
line += header + '\t'
line += '\n'
for gene in self.genes:
line += gene + '\t'
values = self.genes[gene]
for value in values:
line += format(value, '5.3f') + '\t'
line += '\n'
line += '!dataset_table_end\n'
def writeGeoFile(self, filename):
fh = open(filename, 'w')
fh.write(str(self))
fh.close()
def getLog(genedict, base=2):
"""Get the log-transformed value of a sample/column. """
mygenes = {}
for gene in genedict:
mygenes[gene] = []
for sample in genedict[gene]:
mygenes[gene].append(math.log(sample, base))
return mygenes
def readGeoFile(filename, id_column = 0):
""" Read a Gene Expression Omnibus SOFT file. """
dataset = None
fh = open(filename, "rU")
manylines = fh.read()
fh.close()
data_rows = False # Indicates whether we're reading the data section or metadata
name = u'Unknown'
cnt_data = 0
for line in manylines.splitlines():
if line.startswith('^DATASET'):
name = line.split('= ')[1]
continue
data_rows = line.startswith('!dataset_table_begin')
data_rows = not line.startswith('!dataset_table_end')
if len(line.strip()) == 0 or line.startswith('!') or line.startswith('#') or line.startswith('^'):
continue
if data_rows:
cnt_data += 1
if (cnt_data == 1): # First line contains the headers
headers = line.split('\t')
dataset = GeneProfile(name, headers[2:]) # Create the data set
continue
ignore = (dataset == None) # ignore the row if the dataset is not initialised
id = line.split('\t')[id_column]
values = []
cnt_word = 0
for word in line.split('\t'):
cnt_word += 1
if cnt_word <= (id_column + 1): # ignore the gene names
continue
if word == 'null':
ignore = True # ignore gene if a value is null
continue
try:
values.append(float(word))
except: # ignore values that are not "float"
continue
if not ignore:
dataset[id] = tuple(values)
print('Data set %s contains %d genes' % (name, len(dataset.genes)))
return dataset
###############################################################################
# Web service methods that find data in online databases.
# Our implementations are mainly serviced by EBI.
###############################################################################
def getSequence(entryId, dbName = 'uniprotkb', alphabet = Protein_Alphabet, format = 'fasta', debug: bool = True):
""" Retrieve a single entry from a database
entryId: ID for entry e.g. 'P63166' or 'SUMO1_MOUSE'
dbName: name of database e.g. 'uniprotkb' or 'pdb' or 'refseqn'; see http://www.ebi.ac.uk/Tools/dbfetch/dbfetch/dbfetch.databases for available databases
format: file format specific to database e.g. 'fasta' or 'uniprot' for uniprotkb (see http://www.ebi.ac.uk/Tools/dbfetch/dbfetch/dbfetch.databases)
See http://www.ebi.ac.uk/Tools/dbfetch/syntax.jsp for more info re URL syntax
"""
if not isinstance(entryId, str):
entryId = entryId.decode("utf-8")
url ='http://www.ebi.ac.uk/Tools/dbfetch/dbfetch?style=raw&db=' + dbName + '&format=' + format + '&id=' + entryId
try:
if debug:
print('DEBUG: Querying URL: {0}'.format(url))
data = urllib.request.urlopen(url).read()
if format == 'fasta':
return readFastaString(data.decode("utf-8"), alphabet)[0]
else:
return data.decode("utf-8")
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
def searchSequences(query, dbName = 'uniprot'):
"""
Retrieve multiple entries matching query from a database currently only via UniProtKB
query: search term(s) e.g. 'organism:9606+AND+antigen'
dbName: name of database e.g. 'uniprot', "refseq:protein", "refseq:pubmed"
See http://www.uniprot.org/faq/28 for more info re UniprotKB's URL syntax
See http://www.ncbi.nlm.nih.gov/books/NBK25499/ for more on NCBI's E-utils
"""
if dbName.startswith('uniprot'):
# Construct URL
url = 'http://www.uniprot.org/uniprot/?format=list&query=' + query
try:
data = urllib.request.urlopen(url).read()
return data.decode("utf-8").splitlines()
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
elif dbName.startswith('refseq'):
dbs = dbName.split(":")
if len(dbs) > 1:
dbName = dbs[1]
base = u'http://eutils.ncbi.nlm.nih.gov/entrez/eutils/'
url = base + "esearch.fcgi?db=" + dbName + "&term=" + query
# Get the entries
try:
data = urllib.request.urlopen(url).read()
words = data.decode("utf-8").split("")
words = [w[w.find("")+4:] for w in words[:-1]]
return words
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
return
def idmap(identifiers, frm='ACC', to='P_REFSEQ_AC'):
"""
Map identifiers between databases (based on UniProtKB;
see http://www.uniprot.org/help/programmatic_access)
identifiers: a list of identifiers (list of strings)
frm: the abbreviation for the identifier FROM which to idmap
to: the abbreviation for the identifier TO which to idmap
Returns a dictionary with key (from) -> value (to) """
url = u'http://www.uniprot.org/uploadlists/'
# construct query by concatenating the list of identifiers
if isinstance(identifiers, str):
query = identifiers.strip()
else: # assume it is a list of strings
query = u''
for id in identifiers:
query = query + id + ' '
query = query.strip() # remove trailing spaces
params = {
'from' : frm,
'to' : to,
'format' : 'tab',
'query' : query
}
if len(query) > 0:
data = urllib.parse.urlencode(params).encode("utf-8")
request = urllib.request.Request(url, data)
response = urllib.request.urlopen(request).read()
d = dict()
for row in response.decode("utf-8").splitlines()[1:]:
pair = row.split('\t')
d[pair[0]] = pair[1]
return d
else:
return dict()
###############################################################################
# Gene Ontology services.
# See http://www.ebi.ac.uk/QuickGO/WebServices.html for more info
###############################################################################
def getGODef(goterm):
"""
Retrieve information about a GO term
goterm: the identifier, e.g. 'GO:0002080'
"""
# Construct URL
url = u'http://www.ebi.ac.uk/QuickGO/GTerm?format=obo&id=' + goterm
# Get the entry: fill in the fields specified below
try:
entry={'id': None, 'name': None, 'def': None}
data = urllib.request.urlopen(url).read()
for row in data.decode("utf-8").splitlines():
index = row.find(u':')
if index > 0 and len(row[index:]) > 1:
field = row[0:index].strip()
value = row[index+1:].strip(u' "') # remove spaces
if field in list(entry.keys()): # check if we need field
if entry[field] == None: # check if assigned
entry[field] = value
return entry
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
def getGOTerms(genes, db='UniProtKB'):
"""
Retrieve all GO terms for a given set of genes (or single gene).
db: use specified database, e.g. 'UniProtKB', 'UniGene',
or 'Ensembl'.
The result is given as a map (key=gene name, value=list of unique
terms) OR in the case of a single gene as a list of unique terms.
"""
if type(genes) != list and type(genes) != set and type(genes) != tuple:
genes = [genes] # if 'genes' is a single gene, we make a single item list
map = dict()
uri = u'http://www.ebi.ac.uk/QuickGO/GAnnotation?format=tsv&db=' + db + u'&protein='
for gene in genes:
terms = set() # empty result set
url = uri + gene # Construct URL
try: # Get the entry: fill in the fields specified below
data = urllib.request.urlopen(url).read()
for row in data.decode("utf-8").splitlines()[1:]: # we ignore header row
values = row.split('\t')
if len(values) >= 7:
terms.add(values[6]) # add term to result set
map[gene] = list(terms) # make a list of the set
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
if len(genes) == 1:
return map[genes[0]]
else:
return map
def getGenes(goterms, db='UniProtKB', taxo=None):
"""
Retrieve all genes/proteins for a given set of GO terms
(or single GO term).
db: use specified database, e.g. 'UniProtKB', 'UniGene',
or 'Ensembl'
taxo: use specific taxonomic identifier, e.g. 9606 (human)
The result is given as a map (key=gene name, value=list of unique
terms) OR in the case of a single gene as a list of unique terms.
"""
if type(goterms) != list and type(goterms) != set and type(goterms) != tuple:
goterms = [goterms]
map = dict()
if taxo == None:
uri = u'http://www.ebi.ac.uk/QuickGO/GAnnotation?format=tsv&db=' + db + u'&term='
else:
uri = u'http://www.ebi.ac.uk/QuickGO/GAnnotation?format=tsv&db=' + db + u'&tax=' + str(taxo) + u'&term='
for goterm in goterms:
genes = set() # start with empty result set
url = uri + goterm # Construct URL
try: # Get the entry: fill in the fields specified below
data = urllib.request.urlopen(url).read()
for row in data.decode("utf-8").splitlines()[1:]: # we ignore first (header) row
values = row.split('\t')
if len(values) >= 7:
genes.add(values[1]) # add gene name to result set
map[goterm] = list(genes)
except urllib.error.HTTPError as ex:
raise RuntimeError(ex.read())
if len(goterms) == 1:
return map[goterms[0]]
else:
return map
###############################################################################
# PhyloTree #
###############################################################################
class PhyloTree:
""" Rooted, binary (bifurcating) tree for representing phylogenetic relationships.
Functionality includes labelling and traversing nodes; reading and writing to Newick format;
association with sequence alignment; maximum parsimony inference of ancestral sequence;
generation of single, bifurcating rooted tree by UPGMA.
Known issues: Binary only; Parsimony does not handle gaps in alignment.
Programmers should note that almost all functionality is implemented through recursion. """
def __init__(self, root):
""" Create a tree from a node that is "root" in the tree."""
self.root = root
def putAlignment(self, aln):
""" Associate the tree with a set of sequences/alignment.
Involves assigning the sequence to the leaf nodes. """
self.aln = aln
self.root._assignAlignment(aln)
def __str__(self):
""" Produce a printable representation of the tree, specifically the root of the tree. """
return str(self.root)
def strSequences(self, start=None, end=None):
""" Produce a sequence representation of the tree, specifically the root of the tree.
Specify the start and end positions in the alignment for the sequence to be printed
(if None the min and max positions will be used). """
if self.aln != None:
my_start = start or 0
my_end = end or self.aln.alignlen
return self.root._printSequences(my_start, my_end)
def findLabel(self, label):
""" Retrieve/return the node with the specified label.
Returns None if not found."""
return self.root._findLabel(label)
def getDescendantsOf(self, node, transitive=False):
""" Retrieve and return the (list of) descendants (children) of a specified node.
Node can be the label or the instance.
transitive indicates if only the direct descendants (False) or if all descendants
should be returned.
If node does not exist, None is returned.
If node has no descendants, an empty list will be returned."""
if not isinstance(node, PhyloNode):
node = self.root.findLabel(node)
if node:
return node.getDescendants(transitive)
return None
def getAncestorsOf(self, node, transitive=False):
""" Retrieve and return the ancestor (transitive=False) or
ancestors (transitive=True) of a specified node.
Node can be the label or the instance.
If node does not exist, None is returned.
If node is the root of the tree, None is returned."""
if not isinstance(node, PhyloNode):
node = self.root.findLabel(node)
if node:
myroot = self.root
found = False
branching = []
while not found and myroot != None:
branching.append(myroot)
if myroot.left == node or myroot.right == node:
found = True
break
if myroot.left:
if myroot.left.isAncestorOf(node, transitive=True):
myroot = myroot.left
else: # must be right branch then...
myroot = myroot.right
else: # must be right branch then...
myroot = myroot.right
if found and transitive:
return branching
elif found and len(branching) > 0:
return branching[len(branching) - 1]
return None
def parsimony(self):
""" Solve the "small parsimony problem",
i.e. find the sequences on each of the internal nodes.
See Jones and Pevzner, p. 368 and onwards, for details. """
self.root._forwardParsimony(self.aln) # setup and compute scores for all nodes
self.root._backwardParsimony(self.aln) # use scores to determine sequences
return self.root.getSequence() # return the sequence found at the root
###############################################################################
# PhyloNode #
###############################################################################
class PhyloNode:
""" A class for a node in a rooted, binary (bifurcating) tree.
Contains pointers to descendants/daughters (left and right),
optional fields include data, label, sequence and dist.
If parsimony is used scores and traceback pointers are available.
A number of methods are named with a _ prefix. These can be, but
are not intended to be used from outside the class. """
def __init__(self, label=''):
""" Initialise an initially unlinked node.
Populate fields left and right to link it with other nodes.
Set label to name it.
Use field data for any type of information associated with node.
Use dist to indicate the distance to its parent (if any).
Other fields are used internally, including sequence for associated alignment,
seqscores, backleft and backright for maximum parsimony. """
self.left = None
self.right = None
self.data = None
self.label = label
self.dist = None
self.sequence = None # The sequence after an alignment have been mapped (leaf) or the most parsimonous sequence (ancestral)
self.seqscores = None # The scores propagated from leaves via children
self.backleft = None # Pointers back to left child: what symbol rendered current/parent symbols
self.backright = None # Pointers back to right child: what symbol rendered current/parent symbols
def __str__(self):
""" Returns string with node (incl descendants) in a Newick style. """
left = right = label = dist = ''
if self.left:
left = str(self.left)
if self.right:
right = str(self.right)
if self.dist or self.dist == 0.0:
dist = ':' + str(self.dist)
if self.label != None:
label = str(self.label)
if not self.left and not self.right:
return label + dist
else:
return '(' + left + ',' + right + ')' + label + dist
else: # there is no label
if not self.left and self.right:
return ',' + right
elif self.left and not self.right:
return left + ','
elif self.left and self.right:
return '(' + left + ',' + right + ')' + dist
def _printSequences(self, start, end):
""" Returns string with node (incl descendants) in a Newick style. """
left = right = label = dist = ''
if self.left:
left = self.left._printSequences(start, end)
if self.right:
right = self.right._printSequences(start, end)
if self.dist:
dist = ':' + str(self.dist)
if self.sequence != None:
label = "".join(self.sequence[start:end]) + ""
if not self.left and not self.right:
return label + dist
else:
return '(' + left + ',' + right + ')' + label + dist
else: # there is no label
if not self.left and self.right:
return ',' + right
elif self.left and not self.right:
return left + ','
elif self.left and self.right:
return '(' + left + ',' + right + ')' + dist
def _findLabel(self, label):
""" Find a node by label at this node or in any descendants (recursively). """
if self.label == label:
return self
else:
if self.left:
foundLeft = self.left._findLabel(label)
if foundLeft:
return foundLeft
if self.right:
return self.right._findLabel(label)
return None
def _propagateDistance(self, parent_dist):
""" Convert absolute distances to relative.
The only parameter is the absolute distance to the parent of this node. """
travelled = self.dist # absolute distance to this node
self.dist = parent_dist - self.dist # relative distance to this node
if self.left != None: # if there is a child node...
self.left._propagateDistance(travelled) # pass absolute distance to this node
if self.right != None:
self.right._propagateDistance(travelled)
def _assignAlignment(self, aln):
""" Assign an alignment to the node, which implies assigning a sequence to it if one is
available in the alignment. """
self.sequence = None
if self.left != None:
self.left._assignAlignment(aln)
if self.right != None:
self.right._assignAlignment(aln)
for seq in aln.seqs:
if seq.name == self.label:
self.sequence = seq
break
def _forwardParsimony(self, aln):
""" Internal function that operates recursively to first initialise each node (forward),
stopping only once a sequence has been assigned to the node,
then to propagate scores from sequence assigned nodes to root (backward). """
if self.sequence == None: # no sequence has been assigned
if self.left == None and self.right == None: # no children, so terminal, cannot propagate scores
raise RuntimeError("No sequence assigned to leaf node:", self.label)
scoresleft = scoresright = None
if self.left != None:
scoresleft = self.left._forwardParsimony(aln)
if self.right != None:
scoresright = self.right._forwardParsimony(aln)
# for each position in the alignment,
# introduce (initially zero) score for each symbol in alphabet
self.seqscores = [[0 for _ in aln.alphabet] for col in range(aln.alignlen)]
# for each position in the alignment,
# allocate a position to put the left child symbol from which each current node symbol score was determined
self.backleft = [[None for _ in aln.alphabet] for _ in range(aln.alignlen)]
# allocate a position to put the right child symbol from which each current node symbol score was determined
self.backright = [[None for _ in aln.alphabet] for _ in range(aln.alignlen)]
for col in range(aln.alignlen):
# left child will contribute first
for a_parent in range(len(aln.alphabet)):
best_score_left = +9999999
best_symb_left = 0
for a in range(len(aln.alphabet)):
score = (scoresleft[col][a] + (
1 if a != a_parent else 0)) # if we want to weight scores, this would need to change
if score < best_score_left:
best_symb_left = a
best_score_left = score
self.seqscores[col][a_parent] = best_score_left
self.backleft[col][a_parent] = best_symb_left
# right child will contribute next
for a_parent in range(len(aln.alphabet)):
best_score_right = +9999999
best_symb_right = 0
for a in range(len(aln.alphabet)):
score = (scoresright[col][a] + (
1 if a != a_parent else 0)) # if we want to weight scores, this would need to change
if score < best_score_right:
best_symb_right = a
best_score_right = score
self.seqscores[col][a_parent] += best_score_right
self.backright[col][a_parent] = best_symb_right
else:
self.seqscores = [[0 if a == sym else 999999 for a in aln.alphabet] for sym in
self.sequence] # if we want to weight scores, this would need to change
return self.seqscores
def _backwardParsimony(self, aln, seq=None):
""" Internal function that operates recursively to inspect scores to determine
most parsimonious sequence, from root to leaves. """
if self.sequence == None: # no sequence has been assigned
leftbuf = []
rightbuf = []
if self.left == None and self.right == None: # no children, so terminal, cannot propagate scores
raise RuntimeError("No sequence assigned to leaf node:", self.label)
if seq == None: # Only root can do this, no parents to consider, so we pick the lowest scoring symbol
currbuf = []
for col in range(aln.alignlen):
min_score = 999999
min_symb = None
left_symb = None
right_symb = None
for a_parent in range(len(aln.alphabet)):
if self.seqscores[col][a_parent] < min_score:
min_score = self.seqscores[col][a_parent]
min_symb = a_parent
left_symb = self.backleft[col][a_parent]
right_symb = self.backright[col][a_parent]
currbuf.append(aln.alphabet[min_symb])
leftbuf.append(aln.alphabet[left_symb])
rightbuf.append(aln.alphabet[right_symb])
self.sequence = Sequence(currbuf, aln.alphabet, self.label, gappy=True)
else: # Non-root, but not leaf
self.sequence = seq
col = 0
for sym_parent in self.sequence:
a_parent = aln.alphabet.index(sym_parent)
left_symb = self.backleft[col][a_parent]
right_symb = self.backright[col][a_parent]
leftbuf.append(aln.alphabet[left_symb])
rightbuf.append(aln.alphabet[right_symb])
col += 1
self.left._backwardParsimony(aln, Sequence(leftbuf, aln.alphabet, self.label, gappy=True))
self.right._backwardParsimony(aln, Sequence(rightbuf, aln.alphabet, self.label, gappy=True))
return self.sequence
def getSequence(self):
""" Get the sequence for the node. Return None if no sequence is assigned.
Requires that an alignment is associated with the tree, and that sequence names match node labels.
If the explored node is not a leaf, the sequence can be determined by parsimony. """
if self.sequence != None: # a sequence has been assigned
return self.sequence
elif self.seqscores != None: # inferred by parsimony but not yet assigned
return None # determine most parsimonous sequence, not yet implemented
def isAncestorOf(self, node, transitive=True):
""" Decide if this node is the ancestor of specified node.
If transitive is True (default), all descendants are included.
If transitive is False, only direct descendants are included. """
if node == self.left or node == self.right:
return True
elif transitive:
if self.left:
statusLeft = self.left.isAncestorOf(node, transitive)
if statusLeft: return True
if self.right:
return self.right.isAncestorOf(node, transitive)
else:
return False
def getDescendants(self, transitive=False):
""" Retrieve and return (list of) nodes descendant of this.
If transitive is False (default), only direct descendants are included.
If transitive is True, all descendants are (recursively) included. """
children = []
if self.left:
children.append(self.left)
if self.right:
children.append(self.right)
if not transitive:
return children
else:
grandchildren = []
for c in children:
d = c.getDescendants(transitive)
if d:
grandchildren.extend(d)
children.extend(grandchildren)
return children
###############################################################################
# Methods for generating a single tree by clustering, here UPGMA Zvelebil and Baum p. 278
# Methods for processing files of trees on the Newick format
###############################################################################
def runUPGMA(aln, measure, absoluteDistances=False):
""" Generate an ultra-metric, bifurcating, rooted tree from an alignment based on pairwise distances.
Use specified distance metric (see sequence.calcDistances).
If absoluteDistances is True, the tree will be assigned the total distance from provided species.
Otherwise, the relative addition at each path will be assigned."""
D = {}
N = {} # The number of sequences in each node
M = aln.calcDistances(measure) # determine all pairwise distances
nodes = [PhyloNode(seq.name) for seq in aln.seqs] # construct all leaf nodes
""" For each node-pair, assign the distance between them. """
for i in range(len(nodes)):
nodes[i].sequence = aln.seqs[i]
nodes[i].dist = 0.0
N[nodes[i]] = 1 # each cluster contains a single sequence
for j in range(0, i):
D[frozenset([nodes[i], nodes[j]])] = M[i, j]
""" Now: treat each node as a cluster,
until there is only one cluster left,
find the *closest* pair of clusters, and
merge that pair into a new cluster (to replace the two that merged).
In each case, the new cluster is represented by the (phylo)node that is formed. """
while len(N) > 1: # N will contain all "live" clusters, to be reduced to a signle below
closest_pair = (None, None) # The two nodes that are closest to one another according to supplied metric
closest_dist = None # The distance between them
for pair in D: # check all pairs which should be merged
dist = D[pair]
if closest_dist == None or dist < closest_dist:
closest_dist = dist
closest_pair = list(pair)
# So we know the closest, now we need to merge...
x = closest_pair[0] # See Zvelebil and Baum p. 278 for notation
y = closest_pair[1]
z = PhyloNode() # create a new node for the cluster z
z.dist = D.pop(frozenset([x, y])) / 2.0 # assign the absolute distance, travelled so far, note: this will change to relative distance later
Nx = N.pop(x) # find number of sequences in x, remove the cluster from list N
Ny = N.pop(y) # find number of sequences in y, remove the cluster from list N
dz = {} # new distances to cluster z
for w in N: # for each node w ...
# we will merge x and y into a new cluster z, so need to consider w (which is not x or y)
dxw = D.pop(frozenset([x, w])) # retrieve and remove distance from D: x to w
dyw = D.pop(frozenset([y, w])) # retrieve and remove distance from D: y to w
dz[w] = (Nx * dxw + Ny * dyw) / (Nx + Ny) # distance: z to w
N[z] = Nx + Ny # total number of sequences in new cluster, insert new cluster in list N
for w in dz: # we have to run through the nodes again, now not including the removed x and y
D[frozenset([z, w])] = dz[w] # for each "other" cluster, update distance per EQ8.16 (Z&B p. 278)
z.left = x # link the phylogenetic tree
z.right = y
nodes.append(z)
if not absoluteDistances:
x._propagateDistance(z.dist) # convert absolute distances to relative by recursing down left path
y._propagateDistance(z.dist) # convert absolute distances to relative by recursing down right path
z.dist = 0.0 # root z is at distance 0 from merged x and y
return PhyloTree(z) # make it to tree, return
def _findComma(string, level=0):
""" Find first comma at specified level of embedding """
mylevel = 0
for i in range(len(string)):
if string[i] == '(':
mylevel += 1
elif string[i] == ')':
mylevel -= 1
elif string[i] == ',' and mylevel == level:
return i
return -1
def parseNewickNode(string):
""" Utility function that recursively parses embedded string using Newick format. """
first = string.find('(')
last = string[::-1].find(')') # look from the back
if first == -1 and last == -1: # we are at leaf
y = string.split(':')
node = PhyloNode(y[0])
if len(y) >= 2:
node.dist = float(y[1])
return node
elif first >= 0 and last >= 0:
# remove parentheses
last = len(string) - last - 1 # correct index to refer from start instead of end of string
embed = string[first + 1:last]
tail = string[last + 1:]
# find where corresp comma is
comma = _findComma(embed)
if comma == -1:
raise RuntimeError('Invalid format: invalid placement of "," in sub-string "' + embed + '"')
left = embed[0:comma].strip()
right = embed[comma + 1:].strip()
y = tail.split(':')
node = PhyloNode(y[0])
if len(y) >= 2:
node.dist = float(y[1])
node.left = parseNewickNode(left)
node.right = parseNewickNode(right)
return node
else:
raise RuntimeError('Invalid format: unbalanced parentheses in sub-string "' + string + '"')
def parseNewick(string):
""" Main method for parsing a Newick string into a (phylogenetic) tree.
Handles labels (on both leaves and internal nodes), and includes distances (if provided).
Returns an instance of a PhyloTree. """
if string.find(';') != -1:
string = string[:string.find(';')]
return PhyloTree(parseNewickNode(string))
def readNewickFile(filename):
""" Read file on Newick format.
Returns an instance of a PhyloTree."""
f = open(filename)
string = ''.join(f)
return parseNewick(string)
def writeNewickFile(filename, tree):
""" Write the specified tree to a Newick file. """
fh = open(filename, 'w')
fh.write(tree.__str__())
fh.close()
binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/ival.py 0000664 0000000 0000000 00000035650 13400670236 0021000 0 ustar 00root root 0000000 0000000 import random
class IntervalTree:
"""
Binary search tree for storing long integer intervals, and for performing queries on them.
See https://en.wikipedia.org/wiki/Interval_tree, specifically the Augmented kind.
The present implementation balances the tree by using randomisation.
"""
root = None # pointer to the root node of the binary search tree
stack = None
def __iter__(self):
"""
Create an iterator for the tree; this will iterate through nodes of the tree. Note that a node has one interval and (potentially) multiple values.
:return: iterator of IntervalNodes stored in this tree
"""
self.current = self.root
self.stack = Stack()
return self
def __next__(self):
while self.current != None:
self.stack.push(self.current)
self.current = self.current.left
if self.stack.isEmpty():
raise StopIteration
self.current = self.stack.pop()
ret = self.current
self.current = self.current.right
return ret
def __len__(self):
if self.root != None:
return self.root.N
else:
return 0
def __contains__(self, ival):
return self.get(ival) != None
def get(self, ival):
return self._get(self.root, ival)
def _get(self, node, ival):
if node == None: return None
if ival < node.ival:
return self._get(node.left, ival)
elif ival > node.ival:
return self._get(node.right, ival)
else:
return node
def isect(self, ival, node = None):
""" Look for intersecting interval in subtree rooted at specified node (root by default).
Returns node of intersecting interval. """
if self.root == None: return None
if node == None: return self.isect(ival, self.root)
while node != None:
if isect(ival, node.ival): return node
elif node.left == None: node = node.right
elif node.left.max < ival.min: node = node.right
else: node = node.left
return None
def isectall(self, ival):
""" Look for all intersecting intervals in the subtree rooted at specified node (root by default).
Returns nodes of intersecting intervals. """
return _isectall(ival, self.root)
def closest(self, query):
""" Retrieve the interval Y stored in the tree that is closest to the given interval X.
If the given interval overlaps with one or more stored intervals, one is returned:
the interval Y with the greatest Jaccard index to X. If multiple intervals are equally close,
only one is returned (the one before I think).
:param query: the interval for which the closest is sought
:return: the interval closest to the given query interval
"""
ovlap = self.isectall(query)
if len(ovlap) == 0: # overlapping intervals are not in the tree
return _closest(query, self.root)
else:
best_iv = None
best_ji = 0
for node in ovlap:
ji = jaccard(node.ival, query)
if best_iv == None or ji > best_ji:
best_iv = node
best_ji = ji
return best_iv
def put(self, ival, value = None):
nodex = self.get(ival)
if nodex:
nodex.values.add(value)
else:
self.root = self._randomizedInsert(self.root, ival, value)
def putAll(self, tree):
for i in tree:
self.put(i.getInterval(), tree.get(i.getInterval()))
def _randomizedInsert(self, node, ival, value):
if node == None: return IntervalNode(ival, value)
if random.uniform(0,1) * node.N < 1.0: return self._rootInsert(node, ival, value)
if ival < node.ival:
node.left = self._randomizedInsert(node.left, ival, value)
else:
node.right = self._randomizedInsert(node.right, ival, value)
_fix(node)
return node
def _rootInsert(self, node, ival, value):
if node == None: return IntervalNode(ival, value)
if ival < node.ival:
node.left = self._rootInsert(node.left, ival, value)
node = _rotR(node)
else:
node.right = self._rootInsert(node.right, ival, value)
node = _rotL(node)
return node
def _isectall(ival, node):
""" Look for all intersecting intervals in the subtree rooted at specified node (root by default).
Returns nodes of intersecting intervals. """
if node == None: return []
found = []
if isect(ival, node.ival):
found = [node]
if node.left and node.left.max >= ival.min:
found.extend(_isectall(ival, node.left))
if len(found) > 0 or node.left == None or node.left.max < ival.min:
found.extend(_isectall(ival, node.right))
return found
def _closest(query, cand):
""" Recursively find the interval with the minimum distance to that given.
This internal function does not guarantee that distances are sensible when overlapping
intervals exist; essentially it assumes that overlaps have been eliminated prior.
:param query: interval
:param cand: node from which search starts
:return: closest interval """
fav = None
favdist = -1
while cand != None:
if query == cand.ival: return cand
distx = query.dist(cand.ival)
if fav == None or distx <= favdist:
fav = cand
favdist = distx
if cand.left == None: cand = cand.right
elif cand.right == None: cand = cand.left
elif cand.ival.min > query.max: cand = cand.left # the smallest, indexed value (on left) is AFTER the query min
else: # no way to choose without looking in the intervals below
favleft = None
distleft = query.dist(Interval(cand.left.min, cand.left.max))
if distleft < favdist:
favleft = _closest(query, cand.left)
distleft = query.dist(favleft.ival) if favleft != None else MAX_VALUE
distright = query.dist(Interval(cand.right.min, cand.right.max))
if distright < favdist:
favright = _closest(query, cand.right)
distright = query.dist(favright.ival) if favright != None else MAX_VALUE
if distleft < distright:
return favleft if distleft < favdist else fav
else:
return favright if distright < favdist else fav
return fav
class IntervalNode:
"""
Defines the node of the interval search tree.
Manages values associated with intervals.
"""
ival = None # the actual interval
values = None # values associated with the interval
left = None # subtree on the left (lesser)
right = None # subtree on the right (greater)
N = 1 # number of nodes under (and including) this one
min = 0 # min point of subtree rooted at this node
max = 0 # max point of subtree rooted at this node
def __init__(self, interval, value = None):
self.ival = interval
self.min = interval.min
self.max = interval.max
self.values = set()
if value != None:
self.values.add(value)
def getInterval(self):
"""
Retrieve the interval that defines this node
:return: the interval
"""
return self.ival
def getMin(self):
"""
Retrieve the min value for the interval of this node
:return: the min value of the interval
"""
return self.ival.min
def getMax(self):
"""
Retrieve the max value for the interval of this node
:return: the max value of the interval
"""
return self.ival.max
def add(self, value):
"""
Add a new value to this node, to the set of values associated with the interval
:param value:
:return:
"""
if value:
self.values.add(value)
def __str__(self):
leftstr = 'o' if self.left else 'x'
rightstr = 'o' if self.right else 'x'
return leftstr + self.ival.__str__() + rightstr
def __unicode__(self):
leftstr = 'o' if self.left else 'x'
rightstr = 'o' if self.right else 'x'
return leftstr + self.ival.__unicode__() + rightstr
def size(node):
if node == None: return 0
else: return node.N
def _fix(node):
if node == None: return
node.N = 1 + size(node.left) + size(node.right)
node.min = _min3(node.ival.min, _min(node.left), _min(node.right))
node.max = _max3(node.ival.max, _max(node.left), _max(node.right))
MAX_VALUE = 9E30
MIN_VALUE = -9E30
def _min(node):
return MAX_VALUE if node == None else node.min
def _max(node):
return MIN_VALUE if node == None else node.max
def _min3(a, b, c):
return min(a, min(b, c))
def _max3(a, b, c):
return max(a, max(b, c))
def _rotR(node):
y = node.left
node.left = y.right
y.right = node
_fix(node)
_fix(y)
return y
def _rotL(node):
y = node.right
node.right = y.left
y.left = node
_fix(node)
_fix(y)
return y
class Stack:
""" A stack to support an iterator over IntervalNodes in the IntervalTree. """
def __init__(self):
self.items = []
def isEmpty(self):
return self.items == []
def push(self, item):
self.items.append(item)
def pop(self):
return self.items.pop()
def peek(self):
return self.items[len(self.items) - 1]
def size(self):
return len(self.items)
class Interval:
"""
Define a one-dimensional interval.
"""
def __init__(self, min, max):
if (min <= max):
self.min = min
self.max = max
else:
raise RuntimeError
def isect(self, that):
if (that.max < self.min): return False
if (self.max < that.min): return False
return True
def contains(self, x):
return (min <= x) and (x <= max)
def __eq__(self, other):
if not isinstance(other, Interval): return False
return True if (self.min == other.min and self.max == other.max) else False
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
if not isinstance(other, Interval): return False
return True if (self.min < other.min or (self.min == other.min and self.max < other.max)) else False
def __gt__(self, other):
if not isinstance(other, Interval): return False
return True if (self.min > other.min or (self.min == other.min and self.max > other.max)) else False
def __str__(self):
return '[' + str(self.min) + ', ' + str(self.max) +']'
def __unicode__(self):
return '[' + str(self.min) + ', ' + str(self.max) +']'
def __sizeof__(self):
return self.max - self.min
def __len__(self):
return self.max - self.min
def dist(self, that, signed = False, centre2centre = False):
""" Calculate and return the closest distance (from one end of the interval of this to the end of the interval of that).
If centre2centre is True, use the centre-to-centre distance instead.
If signed is True, the distance is negative if this interval is after the that.
"""
if not centre2centre:
if not signed:
if (self.min > that.max): return self.min - that.max # that interval is BEFORE this
if (self.max < that.min): return that.min - self.max # that interval is AFTER this
else: # distance is signed
if (self.min > that.max): return that.max - self.min # that interval is BEFORE this
if (self.max < that.min): return that.min - self.max # that interval is AFTER this
return 0
else:
thiscentre = (self.max - self.min) / 2 + self.min
thatcentre = (that.max - that.min) / 2 + that.min
return thatcentre - thiscentre if signed else abs(thatcentre - thiscentre)
def dist(first, second, signed = False, centre2centre = False):
""" Calculate and return the closest distance (from one end of the interval to the other).
If centre2centre is True, use the centre-to-centre distance instead.
If signed is True, the distance is negative if the first is after the second.
"""
if isinstance(first, Interval) and isinstance(second, Interval):
return first.dist(second, signed, centre2centre)
return RuntimeError
def union(first, second):
if (first.isect(second)):
min = first.min if (first.min < second.min) else second.min
max = second.max if (first.max < second.max) else first.max
return Interval(min, max)
else:
raise RuntimeError
def isect(first, second):
if (first.isect(second)):
min = first.min if (first.min > second.min) else second.min
max = second.max if (first.max > second.max) else first.max
return Interval(min, max)
else:
return None
def jaccard(first, second):
if (isect(first, second)):
isect_min = first.min if (first.min > second.min) else second.min
isect_max = second.max if (first.max > second.max) else first.max
union_min = first.min if (first.min < second.min) else second.min
union_max = second.max if (first.max < second.max) else first.max
denom = union_max - union_min
if (denom > 0):
return (isect_max - isect_min) / denom
return 0
else:
return 0
if __name__ == '__main__':
a = Interval(13, 20)
b = Interval(25, 30)
c = Interval(27, 33)
d = Interval(40, 50)
e = Interval(21, 22)
f = Interval(36, 38)
g = Interval(16, 19)
h = Interval(28, 31)
i = Interval(55, 66)
j = Interval(-3, 0)
k = Interval(24, 24)
l = Interval(52, 55)
print('dist(b,a,signed=False,centre2centre=False)=', dist(b, a, signed = False, centre2centre=False))
print('dist(b,a,signed=True,centre2centre=False)=', dist(b, a, signed = True, centre2centre=False))
print('dist(b,a,signed=False,centre2centre=True)=', dist(b, a, signed = False, centre2centre=True))
print('dist(b,a,signed=True,centre2centre=True)=', dist(b, a, signed = True, centre2centre=True))
t = IntervalTree()
t.put(a, 'A')
t.put(b, 'B')
t.put(c, 'C')
t.put(d, 'D')
t.put(e, 'E')
t.put(b, 123)
t.put(b, 'blah')
t.get(d).add('x999')
t.put(i)
t.put(j)
t.put(g)
t.put(k)
print(c in t)
print(e in t)
print(t.get(a).values)
print(t.get(d).values)
print(t.get(b).values)
print(t.isect(f))
print(t.isect(g))
tryme = f
all = t.isectall(tryme)
print("Intersect with " + str(tryme) + ": ")
for n in all:
print('\t' + str(n))
print("Closest to " + str(tryme) + ": ")
print(t.closest(tryme))
print('Iterate through tree: ')
for n in t:
print('\t' + str(n)) binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/ml.py 0000664 0000000 0000000 00000032051 13400670236 0020445 0 ustar 00root root 0000000 0000000 import numpy
import numpy.random
import math
import random
class NN():
"""
A basic implementation of a standard, multi-layer, feed-forward neural network
and back-propagation learning.
"""
def __init__(self, nInput, nHidden, nOutput):
""" Constructs a neural network and initializes its weights to small random values.
nInput Number of input nodes
nHidden Number of hidden nodes
nOutput Number of output nodes
"""
self.ninput = nInput
self.hidden = numpy.empty(nHidden) # hidden nodes
self.output = numpy.empty(nOutput) # output nodes
self.w_hid = numpy.random.randn(nHidden, nInput) # weights in -> hid
self.b_hid = numpy.random.randn(nHidden) # biases hidden layer
self.w_out = numpy.random.randn(nOutput, nHidden) # weights hid -> out
self.b_out = numpy.random.randn(nOutput) # biases output layer
print("Constructed NN with %d inputs, %d hidden and %d output nodes." % (self.ninput, len(self.hidden), len(self.output)))
def writeFile(self, filename):
""" Save NN to a file. """
f = open(filename, 'w')
f.write(str(self.ninput)+'\n')
f.write(str(len(self.hidden))+'\n')
f.write(str(len(self.output))+'\n')
for row in self.w_hid:
for w in row:
f.write(str(w)+'\n')
for b in self.b_hid:
f.write(str(b)+'\n')
for row in self.w_out:
for w in row:
f.write(str(w)+'\n')
for b in self.b_out:
f.write(str(b)+'\n')
f.close()
def _fLogistic(self, net):
""" The logistic output function.
Computes the output value of a node given the summed incoming activation,
values bounded between 0 and 1.
net: The summed incoming activation. """
return 1.0 / (1.0 + numpy.exp(-net))
def _fSoftmax(self, net):
""" The softmax output function.
Computes the output value of a node given the summed incoming activation,
values bounded between 0 and 1, where all add to 1.0.
net: The summed incoming activation for each output (must be the full layer). """
tmp = numpy.exp(net)
sum = numpy.sum(tmp)
out = tmp / sum
return out
def _fprimeLogistic(self, y):
""" The derivative of the logistic output function.
y: The value by which the gradient is determined.
returns the gradient at output y. """
return y * (1.0 - y)
def feedforward(self, input):
""" Computes the output values of the output nodes in the network given input values.
input: the one-dim array of input values
returns the one-dim array of computed output values. """
# compute the activation of each hidden node (depends on supplied input values)
self.hidden = self._fLogistic(self.w_hid.dot(input) + self.b_hid)
# compute the activation of each output node (depends on hidden node activations computed above)
if len(self.output) == 1:
self.output = self._fLogistic(self.w_out.dot(self.hidden) + self.b_out)
else:
self.output = self._fSoftmax(self.w_out.dot(self.hidden) + self.b_out)
return self.output
def test(self, inputs, targets):
""" Create a confusion matrix for all predictions with known target classes. """
cm = numpy.zeros((len(self.output), len(self.output))) # confusion matrix
for p in range(len(inputs)):
input = inputs[p]
target = targets[p]
# present the input and calculate the outputs
output = self.feedforward(input)
# which class?
c_targ = maxIndex(target)
c_pred = maxIndex(output)
cm[c_targ, c_pred] += 1
return cm
def train(self, input, target, eta = 0.1, niter = 1, shuffle = True):
""" Adapts weights in the network given the values that should appear at the output (target)
when the input has been presented. The procedure is known as error back-propagation.
This implementation is "online" rather than "batched", that is, the change is not based
on the gradient of the golbal error, merely the local, pattern-specific error.
target: The desired output values
eta: The learning rate, always between 0 and 1, typically a small value (default 0.1)
shuffle: If true, input rows are shuffled before training (reduces bias imposed by order
in online training)
returns an error value (the root-mean-squared-error). """
try:
len(input[0])
multi_input = input
multi_targ = target
except TypeError:
multi_input = [ input ]
multi_targ = [ target ]
for i in range(niter):
mse = 0.0
entries = list(range(len(multi_input)))
if shuffle:
random.shuffle(entries)
for p in entries:
input = multi_input[p]
target = multi_targ[p]
# present the input and calculate the outputs
self.feedforward(input)
# compute the error of output nodes (explicit target is available -- so quite simple)
# also, calculate the root-mean-squared-error to indicate progress
dif_out = (target - self.output)
if len(self.output) == 1:
err_out = dif_out * self._fprimeLogistic(self.output)
else:
err_out = dif_out #* self._fprimeSoftmax(self.output)
# compute the error of hidden nodes (indirect contribution to error at output layer)
err_hid = self.w_out.T.dot(err_out) * self._fprimeLogistic(self.hidden)
# change weights according to errors
self.w_out += numpy.outer(err_out, self.hidden) * eta
self.b_out += err_out * eta
self.w_hid += numpy.outer(err_hid, input) * eta
self.b_hid += err_hid * eta
if i == niter - 1: # last round
mse += float(numpy.mean(numpy.square(dif_out)))
return math.sqrt(mse / len(entries)) # Root of mean squared error (RMSE)
def readNNFile(filename):
""" Load a NN from a file. """
f = open(filename, 'r')
nInput = int(f.readline())
nHidden = int(f.readline())
nOutput = int(f.readline())
nn = NN(nInput, nHidden, nOutput)
for i in range(nHidden):
for j in range(nInput):
nn.w_hid[i, j] = float(f.readline())
for i in range(nHidden):
nn.b_hid[i] = float(f.readline())
for i in range(nOutput):
for j in range(nHidden):
nn.w_out[i, j] = float(f.readline())
for i in range(nOutput):
nn.b_out[i] = float(f.readline())
f.close()
return nn
def maxIndex(output):
""" Figure out the index of the largest value in the specified array/list. """
if len(output) > 1: # multi-class
max = 0
for i in range(len(output)):
if output[i] > output[max]:
max = i
else: # two-class, single output 0/1
max = int(round(output[0]))
return max
def Qk(cm, alpha):
""" Compute the Q accuracy from a confusion matrix (see test method above) """
Q = {}
for a in alpha:
i = alpha.index(a)
Q[a] = (cm[i, i] / numpy.sum(cm[i])) * 100
tp = 0; pos = 0
for a in alpha:
i = alpha.index(a)
tp += cm[i, i]
pos += sum(cm[i])
return (float(tp) / float(pos)) * 100, Q
def readDenseDataFile(filename):
""" Read data from file for training a neural network.
The file follows the "dense" row-format:
... | ...
where ix are m input values and ox are n output values """
# first check format
ninputs = None
noutputs = None
nexamples = 0
f = open(filename)
cnt = 0
for row in f:
cnt += 1
inp, outp = row.split('|')
indata = [ float(token) for token in inp.split() ]
if ninputs:
if len(indata) != ninputs:
raise RuntimeError('Error reading file: Invalid input at row %d' % cnt)
ninputs = len(indata)
outdata = [ float(token) for token in outp.split() ]
if noutputs:
if len(outdata) != noutputs:
raise RuntimeError('Error reading file: Invalid output at row %d' % cnt)
noutputs = len(outdata)
f.close()
nexamples = cnt
inm = numpy.zeros((nexamples, ninputs))
outm = numpy.zeros((nexamples, noutputs))
f = open(filename)
cnt = 0
for row in f:
inp, outp = row.split('|')
inm[cnt] = [ float(token) for token in inp.split() ]
outm[cnt] = [ float(token) for token in outp.split() ]
cnt += 1
f.close()
return inm, outm
def fGaussian(x, mu = 0.0, sigma2 = 1.0):
""" Gaussian PDF for numpy arrays """
num = (x - mu) ** 2
den = 2 * sigma2
expon = numpy.exp(-num/den)
return expon / numpy.sqrt(2.0 * numpy.pi * sigma2)
class KMeans():
"""
K-means clustering is a special case of Expectation-Maximization (EM).
In K-means clustering we consider samples
x1,...,xn labeled with z1,...,zN with xt vectors in R^D and zt \in {1,...,K}.
In other words, zt is a class label, or cluster label, for the data point xt.
We can define a K-means probability model as follows where N(mu, I) denotes the
D-dimensional Gaussian distribution with mean mu \in R^D and with the
identity covariance matrix.
theta = , mu_k \in R^D
P(x1, . . . , xn, z1, . . . zn) = PROD P(zt) P(xt|zt) = PROD 1/K N(mu_zt, I) (xt)
We now consider the optimization problem defined for this model. For
this model
(mu_1,...,mu_K)* = argmin_mu min_z SUM || muzt - xt || ^2
The optimization problem defines K-means clustering (under quadratic distortion).
This problem is non-convex and in fact is NP-hard. The K-means algorithm is coordinate
descent applied to this objective and is equivalent to EM under the above probability
model. The K-means clustering algorithm can be written as follows where we specify a
typical initialization step.
1. Initialize mu_z to be equal to a randomly selected point xt.
2. Repeat the following until (z1, . . . zn) stops changing.
(a) zt := argmin_z || mu_z - xt || ^2
(b) Nz := |{t: zt = z}|
(c) mu_z := 1 / Nz SUM_t:zt=z xt
In words, the K-means algorithm first assigns a class center mu_z for each class z.
It then repeatedly classifies each point xt as belonging to the class whose center is
nearest xt and then recomputes the class centers to be the mean of the point placed in that class.
Because it is a coordinate descent algorithm, the sum of squares of the difference
between each point and its class center is reduced by each update. This implies that the
classification must eventually stabilize.
The procedure terminates when the class labels stop changing.
"""
def __init__(self, data):
""" Construct a K-means classifier using the provided data.
data: a two-dim numpy array, with one row corresponding to a data point.
If training is not performed, the provided data is used as the "means". """
assert len(data) > 0, "Data must be supplied"
self.data = data
self.means = data.copy()
self.samplelen = len(data[0])
self.vars = numpy.empty((len(data), self.samplelen))
def classify(self, sample):
assert len(sample) == self.samplelen, "Sample vector has invalid length: " + str(len(sample))
sqrdist = numpy.sum((self.means - sample) ** 2, 1)
return sqrdist.argmin(0)
def train(self, K):
data = self.data
N = len(data)
clusters = numpy.zeros((N, 1))
self.means = self.data[numpy.random.randint(N, size = K),:] # pick K random samples
while True:
previous = clusters.copy()
""" Compute cluster memberships GIVEN means """
kdist = numpy.empty((len(data), K))
for i in range(K):
kdist[:,i] = numpy.sum((data - self.means[i]) ** 2, 1)
clusters[:,0] = kdist.argmin(1)
nsame = numpy.sum(previous[:,0] == clusters[:,0])
if nsame == N:
break
""" Compute means GIVEN cluster memberships """
for i in range(K):
members = data[clusters[:,0] == i]
self.means[i] = members.mean(0) # mean over rows per column
def eucdist(v1, v2):
diff = 0
for i in range(len(v1)):
diff += (v1[i] - v2[i])**2
return math.sqrt(diff)
def cosdist(v1, v2):
if len(v1) != len(v2):
return None
sum0 = 0
sum1 = 0
sum2 = 0
for i in range(len(v1)):
sum0 += v1[i]*v2[i]
sum1 += v1[i]*v1[i]
sum2 += v2[i]*v2[i]
return 1 - (sum0 / (math.sqrt(sum1*sum2)))
binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/phylo.py 0000664 0000000 0000000 00000074577 13400670236 0021213 0 ustar 00root root 0000000 0000000 '''
Module with methods and classes for phylogeny.
@author: mikael
'''
import sequence
from collections import defaultdict
import annotations
class PhyloTree:
""" Rooted, binary (bifurcating) tree for representing phylogenetic relationships.
Functionality includes labelling and traversing nodes; reading and writing to Newick format;
association with sequence alignment; maximum parsimony inference of ancestral sequence;
generation of single, bifurcating rooted tree by UPGMA.
Known issues: Binary only; Parsimony does not handle gaps in alignment.
Programmers should note that almost all functionality is implemented through recursion. """
def __init__(self, root):
""" Create a tree from a node that is "root" in the tree."""
self.root = root
def putAlignment(self, aln):
""" Associate the tree with a set of sequences/alignment.
Involves assigning the sequence to the leaf nodes. """
self.aln = aln
self.root._assignAlignment(aln)
def putAnnotations(self, nexus_annotations):
self.nexus_annotations = nexus_annotations
# Update the annotations dictionary so that it contains PhyloNode objects as keys, not text labels
for node in self.getNodes():
if node.label in self.nexus_annotations.leaf_annotations:
self.nexus_annotations.leaf_annotations[node] = self.nexus_annotations.leaf_annotations[node.label]
self.nexus_annotations.leaf_annotations.pop(node.label)
def __str__(self):
""" Produce a printable representation of the tree, specifically the root of the tree. """
return str(self.root)
def strSequences(self, start=None, end=None):
""" Produce a sequence representation of the tree, specifically the root of the tree.
Specify the start and end positions in the alignment for the sequence to be printed
(if None the min and max positions will be used). """
if self.aln != None:
my_start = start or 0
my_end = end or self.aln.alignlen
return self.root._printSequences(my_start, my_end)
def findLabel(self, label):
""" Retrieve/return the node with the specified label.
Returns None if not found."""
return self.root._findLabel(label)
def getNodes(self, strategy = 'DEPTH-FIRST'):
""" Returns all nodes as a list """
nodes = []
queue = [self.root]
while len(queue) > 0:
node = queue.pop()
nodes.append(node)
# if strategy.upper().startswith('DEPTH'):
if node.left: queue.append(node.left)
if node.right: queue.append(node.right)
return nodes
def getDescendantsOf(self, node, transitive=False):
""" Retrieve and return the (list of) descendants (children) of a specified node.
Node can be the label or the instance.
transitive indicates if only the direct descendants (False) or if all descendants
should be returned.
If node does not exist, None is returned.
If node has no descendants, an empty list will be returned."""
if not isinstance(node, PhyloNode):
node = self.findLabel(node)
if node:
return node.getDescendants(transitive)
return None
def getAncestorsOf(self, node, transitive=False):
""" Retrieve and return the ancestor (transitive=False) or
ancestors (transitive=True) of a specified node.
Node can be the label or the instance.
If node does not exist, None is returned.
If node is the root of the tree, None is returned."""
if not isinstance(node, PhyloNode):
node = self.findLabel(node)
if node:
myroot = self.root
found = False
branching = []
while not found and myroot != None:
branching.append(myroot)
# check if "myroot" is a leaf node, i.e. does not have children
if myroot.left == node or myroot.right == node:
found = True
break
if myroot.left != None: # myroot has a "left" child
# check if the "left" child of "myroot" is the ancestor of "node"
if myroot.left.isAncestorOf(node, transitive=True): # if yes,
myroot = myroot.left # move to the "left" child
else: # if not,
myroot = myroot.right # move to the "right" child
else: # myroot does NOT have a "left" child, so let's move "right"
myroot = myroot.right
if found and transitive:
return branching
elif found and len(branching) > 0:
return branching[len(branching) - 1]
return None
def parsimony(self):
""" Solve the "small parsimony problem",
i.e. find the sequences on each of the internal nodes.
See Jones and Pevzner, p. 368 and onwards, for details. """
self.root._forwardParsimony(self.aln) # setup and compute scores for all nodes
self.root._backwardParsimony(self.aln) # use scores to determine sequences
return self.root.getSequence() # return the sequence found at the root
def canonise(self):
self.root._canonise()
def swap_annotations(self, annotation_key):
try:
for node in self.getNodes():
if node.isLeaf():
node.label = self.nexus_annotations.leaf_annotations[node][annotation_key]
except:
return
def write_to_nexus(self, out_path, write_annotations=True, nexus_annotations=None, exclude_annotations=[], use_symbols=False):
"""
Writes out the tree to NEXUS format, with any annotations stored in nexus_annotations added to the file
:param out_path: The path to write the NEXUS file to
:param nexus_annotations: The NexusAnnotations containing the annotations
"""
if write_annotations and not nexus_annotations:
if not self.nexus_annotations:
raise RuntimeError("This tree file has no associated annotation file. Either associate or supply one as a parameter.")
nexus_annotations = self.nexus_annotations
if nexus_annotations:
for node in self.getNodes():
if node in self.nexus_annotations.node_annotations:
node.annotate_node(self.nexus_annotations.node_annotations, self.nexus_annotations.annotation_symbols, exclude_annotations, use_symbols)
tree_annotation = str(self) + ";"
self.swap_annotations("Original")
for node in self.getNodes():
if node in self.nexus_annotations.leaf_annotations:
node.annotate_node(self.nexus_annotations.leaf_annotations, exclude_annotations)
leaves = []
for node in self.getNodes():
if node.isLeaf():
leaves.append(node.label)
leaf_annotation = ""
for leaf in leaves:
leaf_annotation += "\n\t%s" % (leaf)
with open(out_path, "w+") as file:
file.write(
"#NEXUS\nbegin taxa;\n\tdimensions ntax=%d;\n\ttaxlabels%s\n;\nend;\n\nbegin trees;\n\ttree tree_1 = "
"[&R] %s\nend;" % (len(leaves), leaf_annotation, tree_annotation))
class PhyloNode:
""" A class for a node in a rooted, binary (bifurcating) tree.
Contains pointers to descendants/daughters (left and right),
optional fields include data, label, sequence and dist.
If parsimony is used scores and traceback pointers are available.
A number of methods are named with a _ prefix. These can be, but
are not intended to be used from outside the class. """
def __init__(self, label=''):
""" Initialise an initially unlinked node.
Populate fields left and right to link it with other nodes.
Set label to name it.
Use field data for any type of information associated with node.
Use dist to indicate the distance to its parent (if any).
Other fields are used internally, including sequence for associated alignment,
seqscores, backleft and backright for maximum parsimony. """
self.left = None
self.right = None
self.data = None
self.label = label
self.dist = None
self.sequence = None # The sequence after an alignment have been mapped (leaf) or the most parsimonous sequence (ancestral)
self.seqscores = None # The scores propagated from leaves via children
self.backleft = None # Pointers back to left child: what symbol rendered current/parent symbols
self.backright = None # Pointers back to right child: what symbol rendered current/parent symbols
def isLeaf(self):
return self.left == self.right == None
def __str__(self):
""" Returns string with node (incl descendants) in a Newick style. """
left = right = label = dist = ''
if self.left:
left = str(self.left)
if self.right:
right = str(self.right)
if self.dist or self.dist == 0.0:
dist = ':' + str(self.dist)
if self.label != None:
label = str(self.label)
if not self.left and not self.right:
return label + dist
else:
return '(' + left + ',' + right + ')' + label + dist
else: # there is no label
if not self.left and self.right:
return ',' + right
elif self.left and not self.right:
return left + ','
elif self.left and self.right:
return '(' + left + ',' + right + ')' + dist
# def __le__(self, other):
# """ Returns indication of less than other node. """
# return other and self.__hash__() <= other.__hash__()
#
# def __eq__(self, other):
# """ Returns indication of equivalence to other node. """
# return other and self.__hash__() == other.__hash__()
#
# def __hash__(self):
# """ Returns hash of object. """
# return hash((self.label, self.dist, self.sequence))
def _printSequences(self, start, end):
""" Returns string with node (incl descendants) in a Newick style. """
left = right = label = dist = ''
if self.left:
left = self.left._printSequences(start, end)
if self.right:
right = self.right._printSequences(start, end)
if self.dist:
dist = ':' + str(self.dist)
if self.sequence != None:
label = "".join(self.sequence[start:end]) + ""
if not self.left and not self.right:
return label + dist
else:
return '(' + left + ',' + right + ')' + label + dist
else: # there is no label
if not self.left and self.right:
return ',' + right
elif self.left and not self.right:
return left + ','
elif self.left and self.right:
return '(' + left + ',' + right + ')' + dist
def _findLabel(self, label):
""" Find a node by label at this node or in any descendants (recursively). """
if self.label == label:
return self
else:
if self.left:
foundLeft = self.left._findLabel(label)
if foundLeft:
return foundLeft
if self.right:
return self.right._findLabel(label)
return None
def _propagateDistance(self, parent_dist):
""" Convert absolute distances to relative.
The only parameter is the absolute distance to the parent of this node. """
travelled = self.dist # absolute distance to this node
self.dist = parent_dist - self.dist # relative distance to this node
if self.left != None: # if there is a child node...
self.left._propagateDistance(travelled) # pass absolute distance to this node
if self.right != None:
self.right._propagateDistance(travelled)
def _assignAlignment(self, aln):
""" Assign an alignment to the node, which implies assigning a sequence to it if one is
available in the alignment. """
self.sequence = None
if self.left != None:
self.left._assignAlignment(aln)
if self.right != None:
self.right._assignAlignment(aln)
for seq in aln.seqs:
if seq.name == self.label:
self.sequence = seq
break
def _canonise(self):
if self.left == None and self.right == None: # at leaf
return self.label
myleft = self.left._canonise()
myright = self.right._canonise();
if myleft > myright:
tmpnode = self.left
self.left = self.right
self.right = tmpnode
return myright
return myleft
def _forwardParsimony(self, aln):
""" Internal function that operates recursively to first initialise each node (forward),
stopping only once a sequence has been assigned to the node,
then to propagate scores from sequence assigned nodes to root (backward). """
if self.sequence == None: # no sequence has been assigned
if self.left == None and self.right == None: # no children, so terminal, cannot propagate scores
raise RuntimeError("No sequence assigned to leaf node:", self.label)
scoresleft = scoresright = None
if self.left != None:
scoresleft = self.left._forwardParsimony(aln)
if self.right != None:
scoresright = self.right._forwardParsimony(aln)
# for each position in the alignment,
# introduce (initially zero) score for each symbol in alphabet
self.seqscores = [[0 for _ in aln.alphabet] for col in range(aln.alignlen)]
# for each position in the alignment,
# allocate a position to put the left child symbol from which each current node symbol score was determined
self.backleft = [[None for _ in aln.alphabet] for _ in range(aln.alignlen)]
# allocate a position to put the right child symbol from which each current node symbol score was determined
self.backright = [[None for _ in aln.alphabet] for _ in range(aln.alignlen)]
for col in range(aln.alignlen):
# left child will contribute first
for a_parent in range(len(aln.alphabet)):
best_score_left = +9999999
best_symb_left = 0
for a in range(len(aln.alphabet)):
score = (scoresleft[col][a] + (
1 if a != a_parent else 0)) # if we want to weight scores, this would need to change
if score < best_score_left:
best_symb_left = a
best_score_left = score
self.seqscores[col][a_parent] = best_score_left
self.backleft[col][a_parent] = best_symb_left
# right child will contribute next
for a_parent in range(len(aln.alphabet)):
best_score_right = +9999999
best_symb_right = 0
for a in range(len(aln.alphabet)):
score = (scoresright[col][a] + (
1 if a != a_parent else 0)) # if we want to weight scores, this would need to change
if score < best_score_right:
best_symb_right = a
best_score_right = score
self.seqscores[col][a_parent] += best_score_right
self.backright[col][a_parent] = best_symb_right
else:
self.seqscores = [[0 if a == sym else 999999 for a in aln.alphabet] for sym in
self.sequence] # if we want to weight scores, this would need to change
return self.seqscores
def _backwardParsimony(self, aln, seq=None):
""" Internal function that operates recursively to inspect scores to determine
most parsimonious sequence, from root to leaves. """
if self.sequence == None: # no sequence has been assigned
leftbuf = []
rightbuf = []
if self.left == None and self.right == None: # no children, so terminal, cannot propagate scores
raise RuntimeError("No sequence assigned to leaf node:", self.label)
if seq == None: # Only root can do this, no parents to consider, so we pick the lowest scoring symbol
currbuf = []
for col in range(aln.alignlen):
min_score = 999999
min_symb = None
left_symb = None
right_symb = None
for a_parent in range(len(aln.alphabet)):
if self.seqscores[col][a_parent] < min_score:
min_score = self.seqscores[col][a_parent]
min_symb = a_parent
left_symb = self.backleft[col][a_parent]
right_symb = self.backright[col][a_parent]
currbuf.append(aln.alphabet[min_symb])
leftbuf.append(aln.alphabet[left_symb])
rightbuf.append(aln.alphabet[right_symb])
self.sequence = sequence.Sequence(currbuf, aln.alphabet, self.label, gappy=True)
else: # Non-root, but not leaf
self.sequence = seq
col = 0
for sym_parent in self.sequence:
a_parent = aln.alphabet.index(sym_parent)
left_symb = self.backleft[col][a_parent]
right_symb = self.backright[col][a_parent]
leftbuf.append(aln.alphabet[left_symb])
rightbuf.append(aln.alphabet[right_symb])
col += 1
self.left._backwardParsimony(aln, sequence.Sequence(leftbuf, aln.alphabet, self.label, gappy=True))
self.right._backwardParsimony(aln, sequence.Sequence(rightbuf, aln.alphabet, self.label, gappy=True))
return self.sequence
def getSequence(self):
""" Get the sequence for the node. Return None if no sequence is assigned.
Requires that an alignment is associated with the tree, and that sequence names match node labels.
If the explored node is not a leaf, the sequence can be determined by parsimony. """
if self.sequence != None: # a sequence has been assigned
return self.sequence
elif self.seqscores != None: # inferred by parsimony but not yet assigned
return None # determine most parsimonous sequence, not yet implemented
def isAncestorOf(self, node, transitive=True):
""" Decide if this node is the ancestor of specified node.
If transitive is True (default), all descendants are included.
If transitive is False, only direct descendants are included. """
if node == self.left or node == self.right:
return True
elif transitive:
if self.left:
statusLeft = self.left.isAncestorOf(node, transitive)
if statusLeft: return True
if self.right:
return self.right.isAncestorOf(node, transitive)
else:
return False
def getDescendants(self, transitive=False):
""" Retrieve and return (list of) nodes descendant of this.
If transitive is False (default), only direct descendants are included.
If transitive is True, all descendants are (recursively) included. """
children = []
if self.left:
children.append(self.left)
if self.right:
children.append(self.right)
if not transitive:
return children
else:
grandchildren = []
for c in children:
d = c.getDescendants(transitive)
if d:
grandchildren.extend(d)
children.extend(grandchildren)
return children
def annotate_node(self, annotations, annotation_symbols= None, exclude_annotations = [], use_symbols=False ):
annotation_string = "[&"
for key, val_list in annotations[self].items():
if type(val_list) != list:
val_list = [val_list]
if key not in exclude_annotations:
# If we are using annotation symbols and the annotation has an associated symbol
for val in val_list:
if use_symbols and val in annotation_symbols:
sorted_symbols = sorted([annotation_symbols[val] for val in val_list])
annotation_string += '%s="%s",' % (key, ' '.join(['%s' % (val,) for val in sorted_symbols]))
else:
annotation_string += '%s="%s",' % (key, ' '.join(['%s' % (val,) for val in val_list]))
# Remove the final comma and add in a closing bracket
annotation_string = annotation_string[0: len(annotation_string) - 1] + "]"
if len(annotation_string) > 2:
if ":" in self.label:
self.label = self.label.split(":")[0] + annotation_string + self.label.split(":")[1]
else:
self.label = self.label + annotation_string
""" ----------------------------------------------------------------------------------------
Methods for generating a single tree by clustering, here UPGMA Zvelebil and Baum p. 278
----------------------------------------------------------------------------------------"""
def runUPGMA(aln, measure, absoluteDistances=False):
""" Generate an ultra-metric, bifurcating, rooted tree from an alignment based on pairwise distances.
Use specified distance metric (see sequence.calcDistances).
If absoluteDistances is True, the tree will be assigned the total distance from provided species.
Otherwise, the relative addition at each path will be assigned."""
D = {}
N = {} # The number of sequences in each node
M = aln.calcDistances(measure) # determine all pairwise distances
nodes = [PhyloNode(seq.name) for seq in aln.seqs] # construct all leaf nodes
""" For each node-pair, assign the distance between them. """
for i in range(len(nodes)):
nodes[i].sequence = aln.seqs[i]
nodes[i].dist = 0.0
N[nodes[i]] = 1 # each cluster contains a single sequence
for j in range(0, i):
D[frozenset([nodes[i], nodes[j]])] = M[i, j]
""" Now: treat each node as a cluster,
until there is only one cluster left,
find the *closest* pair of clusters, and
merge that pair into a new cluster (to replace the two that merged).
In each case, the new cluster is represented by the (phylo)node that is formed. """
while len(N) > 1: # N will contain all "live" clusters, to be reduced to a signle below
closest_pair = (None, None) # The two nodes that are closest to one another according to supplied metric
closest_dist = None # The distance between them
for pair in D: # check all pairs which should be merged
dist = D[pair]
if closest_dist == None or dist < closest_dist:
closest_dist = dist
closest_pair = list(pair)
# So we know the closest, now we need to merge...
x = closest_pair[0] # See Zvelebil and Baum p. 278 for notation
y = closest_pair[1]
z = PhyloNode() # create a new node for the cluster z
z.dist = D.pop(frozenset([x, y])) / 2.0 # assign the absolute distance, travelled so far, note: this will change to relative distance later
Nx = N.pop(x) # find number of sequences in x, remove the cluster from list N
Ny = N.pop(y) # find number of sequences in y, remove the cluster from list N
dz = {} # new distances to cluster z
for w in N: # for each node w ...
# we will merge x and y into a new cluster z, so need to consider w (which is not x or y)
dxw = D.pop(frozenset([x, w])) # retrieve and remove distance from D: x to w
dyw = D.pop(frozenset([y, w])) # retrieve and remove distance from D: y to w
dz[w] = (Nx * dxw + Ny * dyw) / (Nx + Ny) # distance: z to w
N[z] = Nx + Ny # total number of sequences in new cluster, insert new cluster in list N
for w in dz: # we have to run through the nodes again, now not including the removed x and y
D[frozenset([z, w])] = dz[w] # for each "other" cluster, update distance per EQ8.16 (Z&B p. 278)
z.left = x # link the phylogenetic tree
z.right = y
nodes.append(z)
if not absoluteDistances:
x._propagateDistance(z.dist) # convert absolute distances to relative by recursing down left path
y._propagateDistance(z.dist) # convert absolute distances to relative by recursing down right path
z.dist = 0.0 # root z is at distance 0 from merged x and y
return PhyloTree(z) # make it to tree, return
""" ----------------------------------------------------------------------------------------
Methods for processing files of trees on the Newick format
----------------------------------------------------------------------------------------"""
def _findComma(string, level=0):
""" Find first comma at specified level of embedding """
mylevel = 0
for i in range(len(string)):
if string[i] == '(':
mylevel += 1
elif string[i] == ')':
mylevel -= 1
elif string[i] == ',' and mylevel == level:
return i
return -1
def parseNewickNode(string):
""" Utility function that recursively parses embedded string using Newick format. """
first = string.find('(')
last = string[::-1].find(')') # look from the back
if first == -1 and last == -1: # we are at leaf
y = string.split(':')
node = PhyloNode(y[0])
if len(y) >= 2:
node.dist = float(y[1])
return node
elif first >= 0 and last >= 0:
# remove parentheses
last = len(string) - last - 1 # correct index to refer from start instead of end of string
embed = string[first + 1:last]
tail = string[last + 1:]
# find where corresp comma is
comma = _findComma(embed)
if comma == -1:
raise RuntimeError('Invalid format: invalid placement of "," in sub-string "' + embed + '"')
left = embed[0:comma].strip()
right = embed[comma + 1:].strip()
y = tail.split(':')
node = PhyloNode(y[0]) # node is an instance of the PhyloNode() class
if len(y) >= 2:
node.dist = float(y[1])
node.left = parseNewickNode(left)
node.right = parseNewickNode(right)
return node
else:
raise RuntimeError('Invalid format: unbalanced parentheses in sub-string "' + string + '"')
def parseNewick(string):
""" Main method for parsing a Newick string into a (phylogenetic) tree.
Handles labels (on both leaves and internal nodes), and includes distances (if provided).
Returns an instance of a PhyloTree. """
if string.find(';') != -1:
string = string[:string.find(';')]
return PhyloTree(parseNewickNode(string))
def readNewick(filename):
""" Read file on Newick format.
Returns an instance of a PhyloTree."""
f = open(filename)
string = ''.join(f)
return parseNewick(string)
def writeNewickFile(filename, my_tree):
with open(filename, 'w') as fh:
print(my_tree, end="", file=fh)
def read_nexus(filename):
"""
Read a file in Nexus format
:param filename:
:return:
"""
f = open(filename)
return parse_nexus(f)
def parse_nexus(string):
string = string.read()
lines = string.split("\n")
annotation_dict = defaultdict(dict)
for num, line in enumerate(lines):
# print (line)
if line.strip().startswith("dimensions ntax="):
taxon_number = line.strip().split("dimensions ntax=")[1].split(";")[0]
if line.strip().startswith("taxlabels"):
taxon_num = num + 1
while not lines[taxon_num].strip().startswith(";"):
taxon_name = lines[taxon_num].split("[")[0].strip()
for annot_line in lines[taxon_num].split("[&")[1].split(","):
#TODO: Make these regex calls
# print ("Annotation Key is ", annot_line.split("=")[0])
annot_key = annot_line.split("=")[0]
# print (annot_line.split("=")[1])
if '"' in annot_line:
annot_val = annot_line.split("=")[1].split("\"")[1]
else:
annot_val = annot_line.split("=")[1].split("]")[0]
annotation_dict[taxon_name][annot_key.strip()] = annot_val
taxon_num +=1
if line.strip().startswith("begin trees"):
tree_num = num + 1
tree = (lines[tree_num].split("[&R]")[1])
phylo_tree = parseNewick(tree)
nexus_annotations = annotations.NexusAnnotations(tree=phylo_tree)
nexus_annotations.add_annotations(annotation_dict)
# print (nexus_annotations.annotations)
phylo_tree.putAnnotations(nexus_annotations)
## Extract all of the annotations from the tree and add them to the NexusAnnotations object
print ("Number of taxons is %s " % (taxon_number))
return phylo_tree
""" ----------------------------------------------------------------------------------------
Method for generating a PhyloTree with unique tip names
----------------------------------------------------------------------------------------"""
def get_unique_tree(tree):
unique_tree = tree
unique_labels = {}
for node in unique_tree.getNodes():
if node.isLeaf() and node.label in unique_labels:
unique_labels[node.label] = unique_labels[node.label] + 1
node.label = node.label + str(unique_labels[node.label])
elif node.isLeaf():
unique_labels[node.label] = 1
return unique_tree
def unpack_list(list):
return (" ".join(["%s"] * len(list)) + "!") % (x for x in list)
binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/prob.py 0000664 0000000 0000000 00000114236 13400670236 0021005 0 ustar 00root root 0000000 0000000 '''
Module for classes and functions that are representing and processing basic probabilities.
Also includes Markov chain and hidden Markov model.
Uses and depends on "Alphabet" that is used to define discrete random variables.
'''
import random
from sym import *
from copy import deepcopy
import math
#################################################################################################
# Generic utility functions
#################################################################################################
def _getMeTuple(alphas, str):
""" Handy function that resolves what entries that are being referred to in the case
of written wildcards etc.
Example y = _getMeTuple([DNA_Alphabet, Protein_Alphabet], '*R') gives y = (None, 'R')
alphas: the alphabets
str: the string that specifies entries (may include '*' and '-' signifying any symbol) """
assert len(str) == len(alphas), "Entry invalid"
if not type(str) is tuple:
list = []
for ndx in range(len(alphas)):
if str[ndx] == '*' or str[ndx] == '-':
list.append(None)
else:
list.append(str[ndx])
return tuple(list)
else:
return str
#################################################################################################
# Distrib class
#################################################################################################
class Distrib():
""" A class for a discrete probability distribution, defined over a specified "Alphabet"
TODO: Fix pseudo counts
Exclude from counts, specify in constructor,
include only when computing probabilities by standard formula (n_a + pseudo_a * N^(1/2)) / (N + N^(1/2))
Exclude from filesaves, include with filereads (optional)
"""
def __init__(self, alpha, pseudo = 0.0):
""" Construct a new distribution for a specified alphabet, using an optional pseudo-count.
alpha: alphabet
pseudo: either a single "count" that applies to all symbols, OR a distribution/dictionary with counts.
"""
self.pseudo = pseudo or 0.0
self.alpha = alpha
self.cnt = [0.0 for _ in alpha]
try: # assume pseudo is a dictionary or a Distrib itself
self.tot = 0
symndx = 0
for sym in alpha:
cnt = float(pseudo[sym])
self.cnt[symndx] = cnt
self.tot = self.tot + cnt
symndx += 1
except TypeError: # assume pseudo is a single count for each symbol
self.cnt = [float(self.pseudo) for _ in alpha]
self.tot = float(self.pseudo) * len(alpha) # track total counts (for efficiency)
def observe(self, sym, cntme = 1.0):
""" Make an observation of a symbol
sym: symbol that is being observed
cntme: number/weight of observation (default is 1)
"""
ndx = self.alpha.symbols.index(sym)
self.cnt[ndx] = self.cnt[ndx] + cntme
self.tot = self.tot + cntme
return
def reset(self):
""" Re-set the counts of this distribution. Pseudo-counts are re-applied. """
try:
self.tot = 0
symndx = 0
for sym in self.alpha: # assume it is a Distribution
cnt = float(self.pseudo[sym])
self.cnt[symndx] = cnt
self.tot = self.tot + cnt
symndx += 1
except TypeError: # assume pseudo is a single count for each symbol
self.cnt = [float(self.pseudo) for _ in self.alpha]
self.tot = float(self.pseudo) * len(self.alpha) # track total counts (for efficiency)
def reduce(self, new_alpha):
""" Create new distribution from self, using (smaller) alphabet new_alpha. """
d = Distrib(new_alpha, self.pseudo)
for sym in new_alpha:
d.observe(sym, self.cnt[self.alpha.index(sym)])
return d
def count(self, sym = None):
""" Return the absolute count(s) of the distribution
or the count for a specified symbol. """
if sym != None:
ndx = self.alpha.symbols.index(sym)
return self.cnt[ndx]
else:
d = {}
index = 0
for a in self.alpha:
d[a] = self.cnt[index]
index += 1
return d
def add(self, distrib):
""" Add the counts for the provided distribution to the present. """
for i in range(len(self.cnt)):
cnt = distrib.count(self.alpha[i])
self.cnt[i] += cnt
self.tot += cnt
def subtract(self, distrib):
""" Subtract the counts for the provided distribution from the present. """
for i in range(len(self.cnt)):
cnt = distrib.count(self.alpha[i])
self.cnt[i] -= cnt
self.tot -= cnt
def getSymbols(self):
return self.alpha.symbols
def __getitem__(self, sym):
""" Retrieve the probability of a symbol (ascertained by counts incl pseudo-counts) """
if self.tot > 0.0:
return self.count(sym) / self.tot
else:
return 1.0 / len(self.alpha) # uniform
def prob(self, sym = None):
""" Retrieve the probability of a symbol OR the probabilities of all symbols
(listed in order of the alphabet index). """
if sym != None:
return self.__getitem__(sym)
elif self.tot > 0:
return [ s / self.tot for s in self.cnt ]
else:
return [ 1.0 / len(self.alpha) for _ in self.cnt ]
def __iter__(self):
return self.alpha
def __str__(self):
""" Return a readable representation of the distribution """
str = '< '
for s in self.alpha:
str += (s + ("=%4.2f " % self[s]))
return str + ' >'
def swap(self, sym1, sym2):
""" Swap the entries for specified symbols. Useful for reverse complement etc.
Note that changes are made to the current instance. Use swapxcopy if you
want to leave this instance intact. """
sym1ndx = self.alpha.index(sym1)
sym2ndx = self.alpha.index(sym2)
tmpcnt = self.cnt[sym1ndx]
self.cnt[sym1ndx] = self.cnt[sym2ndx]
self.cnt[sym2ndx] = tmpcnt
def swapxcopy(self, sym1, sym2):
""" Create a new instance with swapped entries for specified symbols.
Useful for reverse complement etc.
Note that changes are NOT made to the current instance.
Use swap if you want to modify this instance. """
newdist = Distrib(self.alpha, self.count())
newdist.swap(sym1, sym2)
return newdist
def writeDistrib(self, filename = None):
""" Write the distribution to a file or string.
Note that the total number of counts is also saved, e.g.
* 1000 """
str = ''
for s in self.alpha:
str += (s + ("\t%f\n" % self[s]))
str += "*\t%d\n" % self.tot
if filename != None:
fh = open(filename, 'w')
fh.write(str)
fh.close()
return str
def generate(self):
""" Generate and return a symbol from the distribution using assigned probabilities. """
alpha = self.alpha
p = random.random() # get a random value between 0 and 1
q = 0.0
for sym in alpha: # pick a symbol with a frequency proportional to its probability
q = q + self[sym]
if p < q:
return sym
return alpha[len(alpha)]
def getmax(self):
""" Generate the symbol with the largest probability. """
maxprob = 0.0
maxsym = None
for sym in self.alpha:
if self[sym] > maxprob or maxprob == 0.0:
maxsym = sym
maxprob = self[sym]
return maxsym
def getsort(self):
""" Return the list of symbols, in order of their probability. """
symlist = [sym for (sym, _) in self.getProbsort()]
return symlist
def getProbsort(self):
""" Return the list of symbol-probability pairs, in order of their probability. """
s = [(sym, self.prob(sym)) for sym in self.alpha]
ss = sorted(s, key=lambda y: y[1], reverse=True)
return ss
def divergence(self, distrib2):
""" Calculate the Kullback-Leibler divergence between two discrete distributions.
Note that when self.prob(x) is 0, the divergence for x is 0.
When distrib2.prob(x) is 0, it is replaced by 0.0001.
"""
assert self.alpha == distrib2.alpha
sum = 0.0
base = len(self.alpha)
for sym in self.alpha:
if self[sym] > 0:
if distrib2[sym] > 0:
sum += math.log(self[sym] / distrib2[sym]) * self[sym]
else:
sum += math.log(self[sym] / 0.0001) * self[sym]
return sum
def entropy(self):
""" Calculate the information (Shannon) entropy of the distribution.
Note that the base is the size of the alphabet, so maximum entropy is by definition 1.
Also note that if the probability is exactly zero, it is replaced by a small value to
avoid numerical issues with the logarithm. """
sum = 0.0
base = len(self.alpha)
for sym in self.alpha:
p = self.__getitem__(sym)
if p == 0:
p = 0.000001
sum += p * math.log(p, base)
return -sum
def writeDistribs(distribs, filename):
""" Write a list/set of distributions to a single file. """
str = ''
k = 0
for d in distribs:
str += "[%d]\n%s" % (k, d.writeDistrib())
k += 1
fh = open(filename, 'w')
fh.write(str)
fh.close()
def _readDistrib(linelist):
""" Extract distribution from a pre-processed list if strings. """
symstr = ''
d = {}
for line in linelist:
line = line.strip()
if len(line) == 0 or line.startswith('#'):
continue
sections = line.split()
sym, value = sections[0:2]
if len(sym) == 1:
if sym != '*':
symstr += sym
else:
raise RuntimeError("Invalid symbol in distribution: " + sym)
try:
d[sym] = float(value)
except ValueError:
raise RuntimeError("Invalid value in distribution for symbol " + sym + ": " + value)
if len(d) == 0:
return None
alpha = Alphabet(symstr)
if '*' in list(d.keys()): # tot provided
for sym in d:
if sym != '*':
d[sym] = d[sym] * d['*']
distrib = Distrib(alpha, d)
return distrib
def readDistribs(filename):
""" Load a list of distributions from file.
Note that if a row contains '* ' then it is assumed that each probability
associated with the specific distribution is based on counts. """
fh = open(filename)
string = fh.read()
distlist = []
linelist = []
for line in string.splitlines():
line = line.strip()
if line.startswith('['):
if len(linelist) != 0:
distlist.append(_readDistrib(linelist))
linelist = []
elif len(line) == 0 or line.startswith('#'):
pass # comment or blank line --> ignore
else:
linelist.append(line)
# end for-loop, reading the file
if len(linelist) != 0:
distlist.append(_readDistrib(linelist))
fh.close()
return distlist
def readDistrib(filename):
""" Load a distribution from file.
Note that if a row contains '* ' then it is assumed that each probability
is based on counts. """
dlist = readDistribs(filename)
if len(dlist) > 0: # if at least one distribution was in the file...
return dlist[0] # return the first
import re
def _readMultiCount(linelist, format = 'JASPAR'):
ncol = 0
symcount = {}
if format == 'JASPAR2010':
for line in linelist:
line = line.strip()
if len(line) > 0:
name = line.split()[0]
counts = []
for txt in re.findall(r'\w+', line):
try:
y = float(txt)
counts.append(y)
except ValueError:
pass # ignore non-numeric entries
if len(counts) != ncol and ncol != 0:
raise RuntimeError('Invalid row in file: ' + line)
ncol = len(counts)
if len(name) == 1: # proper symbol
symcount[name] = counts
alpha = Alphabet(''.join(list(symcount.keys())))
distribs = []
for col in range(ncol):
d = dict([(sym, symcount[sym][col]) for sym in symcount])
distribs.append(Distrib(alpha, d))
elif format == 'JASPAR':
alpha_str = 'ACGT'
alpha = Alphabet(alpha_str)
cnt = 0
for sym in alpha_str:
line = linelist[cnt].strip()
counts = []
for txt in re.findall(r'\w+', line):
try:
y = float(txt)
counts.append(y)
except ValueError:
pass # ignore non-numeric entries
if len(counts) != ncol and ncol != 0:
raise RuntimeError('Invalid row in file: ' + line)
ncol = len(counts)
symcount[sym] = counts
cnt += 1
distribs = []
for col in range(ncol):
d = dict([(sym, symcount[sym][col]) for sym in symcount])
distribs.append(Distrib(alpha, d))
else:
raise RuntimeError('Unsupported format: ' + format)
return distribs
def readMultiCounts(filename, format = 'JASPAR'):
""" Read a file of raw counts for multiple distributions over the same set of symbols
for (possibly) multiple (named) entries.
filename: name of file
format: format of file, default is 'JASPAR' exemplified below
>MA0001.1 SEP4
0 3 79 40 66 48 65 11 65 0
94 75 4 3 1 2 5 2 3 3
1 0 3 4 1 0 5 3 28 88
2 19 11 50 29 47 22 81 1 6
returns a dictionary of Distrib's, key:ed by entry name (e.g. MA001.1)
"""
fh = open(filename)
linelist = []
entryname = ''
entries = {}
for row in fh:
row = row.strip()
if len(row) < 1: continue
if row.startswith('>'):
if len(linelist) > 0:
entries[entryname] = _readMultiCount(linelist, format=format)
linelist = []
entryname = row[1:].split()[0]
else:
linelist.append(row)
if len(linelist) > 0:
entries[entryname] = _readMultiCount(linelist, format=format)
fh.close()
return entries
def readMultiCount(filename, format = 'JASPAR'):
""" Read a file of raw counts for multiple distributions over the same set of symbols.
filename: name of file
format: format of file, default is 'JASPAR' exemplified below
0 3 79 40 66 48 65 11 65 0
94 75 4 3 1 2 5 2 3 3
1 0 3 4 1 0 5 3 28 88
2 19 11 50 29 47 22 81 1 6
returns a list of Distrib's
"""
d = readMultiCounts(filename, format=format)
if len(d) > 0:
return list(d.values())[0]
#################################################################################################
# Joint class
#################################################################################################
class Joint(object):
""" A joint probability class.
The JP is represented as a distribution over n-tuples where n is the number of variables.
Variables can be for any defined alphabet. The size of each alphabet determine the
number of entries in the table (with probs that add up to 1.0) """
def __init__(self, alphas):
""" A distribution of n-tuples.
alphas: Alphabet(s) over which the distribution is defined
"""
if type(alphas) is Alphabet:
self.alphas = tuple( [alphas] )
elif type(alphas) is tuple:
self.alphas = alphas
else:
self.alphas = tuple( alphas )
self.store = TupleStore(self.alphas)
self.totalCnt = 0
def getN(self):
""" Retrieve the number of distributions/random variables. """
return len(self.alphas)
def __iter__(self):
return self.store.__iter__()
def reset(self):
""" Re-set the counts of this joint distribution. Pseudo-counts are re-applied. """
for entry in self.store:
self.store[entry] = None
self.totalCnt = 0
def observe(self, key, cnt = 1):
""" Make an observation of a tuple/key
key: tuple that is being observed
cnt: number/weight of observation (default is 1)
"""
key = _getMeTuple(self.alphas, key)
if not None in key:
score = self.store[key]
if (score == None):
score = 0
self.totalCnt += cnt
self.store[key] = score + cnt
else: # there are wildcards in the key
allkeys = [mykey for mykey in self.store.getAll(key)]
mycnt = float(cnt)/float(len(allkeys))
self.totalCnt += cnt
for mykey in allkeys:
score = self.store[mykey]
if (score == None):
score = 0
self.store[mykey] = score + mycnt
return
def count(self, key):
""" Return the absolute count that is used for the joint probability table. """
key = _getMeTuple(self.alphas, key)
score = self.store[key]
if (score == None):
score = 0.0
for match in self.store.getAll(key):
y = self.store[match]
if y != None:
score += y
return score
def __getitem__(self, key):
""" Determine and return the probability of a specified expression of the n-tuple
which can involve "wildcards"
Note that no assumptions are made regarding independence. """
key = _getMeTuple(self.alphas, key)
score = self.store[key]
if (score == None):
score = 0.0
for match in self.store.getAll(key):
y = self.store[match]
if y != None:
score += y
if self.totalCnt == 0:
return 0.0
return float(score) / float(self.totalCnt)
def __str__(self):
""" Return a textual representation of the JP. """
str = '< '
if self.totalCnt == 0.0:
return str + 'None >'
for s in self.store:
if self[s] == None:
y = 0.0
else:
y = self[s]
str += (''.join(s) + ("=%4.2f " % y))
return str + ' >'
def items(self, sort = False):
""" In a dictionary-like way return all entries as a list of 2-tuples (key, prob).
If sort is True, entries are sorted in descending order of probability.
Note that this function should NOT be used for big (>5 variables) tables."""
if self.totalCnt == 0.0:
return []
ret = []
for s in self.store:
if self[s] != None:
ret.append((s, self[s]))
if sort:
return sorted(ret, key=lambda v: v[1], reverse=True)
return ret
class IndepJoint(Joint):
def __init__(self, alphas, pseudo = 0.0):
""" A distribution of n-tuples.
All positions are assumed to be independent.
alphas: Alphabet(s) over which the distribution is defined
"""
self.pseudo = pseudo
if type(alphas) is Alphabet:
self.alphas = tuple( [alphas] )
elif type(alphas) is tuple:
self.alphas = alphas
else:
self.alphas = tuple( alphas )
self.store = [Distrib(alpha, pseudo) for alpha in self.alphas]
def getN(self):
""" Retrieve the number of distributions/random variables. """
return len(self.alphas)
def __iter__(self):
return TupleStore(self.alphas).__iter__()
def reset(self):
""" Re-set the counts of each distribution. Pseudo-counts are re-applied. """
self.store = [Distrib(alpha, self.pseudo) for alpha in self.alphas]
def observe(self, key, cnt = 1, countGaps = True):
""" Make an observation of a tuple/key
key: tuple that is being observed
cnt: number/weight of observation (default is 1)
"""
assert len(key) == len(self.store), "Number of symbols must agree with the number of positions"
for i in range(len(self.store)):
subkey = key[i]
if subkey == '-' and countGaps == False:
continue
if subkey == '*' or subkey == '-':
for sym in self.alphas[i]:
score = self.store[i][sym]
if (score == None):
score = 0
self.store[i].observe(sym, float(cnt)/float(len(self.alphas[i])))
else:
score = self.store[i][subkey]
if (score == None):
score = 0
self.store[i].observe(subkey, cnt)
def __getitem__(self, key):
""" Determine and return the probability of a specified expression of the n-tuple
which can involve "wildcards"
Note that variables are assumed to be independent. """
assert len(key) == len(self.store), "Number of symbols must agree with the number of positions"
prob = 1.0
for i in range(len(self.store)):
mykey = key[i]
if mykey == '*' or mykey == '-':
pass # same as multiplying with 1.0 (all symbols possible)
else:
prob *= self.store[i][mykey]
return prob
def get(self, sym, pos):
""" Retrieve the probability of a specific symbol at a specified position. """
mystore = self.store[pos]
return mystore[sym]
def getColumn(self, column, count = False):
""" Retrieve all the probabilities (or counts) for a specified position.
Returns values as a dictionary, with symbol as key."""
d = {}
for a in self.alphas[column]:
if count: # absolute count
d[a] = self.store[column].count(a)
else: # probability
d[a] = self.store[column][a]
return d
def getRow(self, sym, count = False):
""" Retrieve the probabilities (or counts) for a specific symbol over all columns/positions.
Returns a list of values in the order of the variables/alphabets supplied to the constructor. """
d = []
for store in self.store:
if count: # absolute count
d.append(store.count(sym))
else: # probability
d.append(store[sym])
return d
def getMatrix(self, count = False):
""" Retrieve the full matrix of probabilities (or counts) """
d = {}
for a in self.alphas[0]:
d[a] = self.getRow(a, count)
return d
def displayMatrix(self, count = False):
""" Pretty-print matrix """
print((" \t%s" % (''.join("\t%5d" % (i + 1) for i in range(len(self.alphas))))))
for a in self.alphas[0]:
if count:
print(("%s\t%s" % (a, ''.join("\t%5d" % (y) for y in self.getRow(a, True)))))
else:
print(("%s\t%s" % (a, ''.join("\t%5.3f" % (y) for y in self.getRow(a)))))
def __str__(self):
""" Text representation of the table. Note that size is an issue so big tables
will not be retrieved and displayed. """
if self.alphas > 5:
return '< ... too large to process ... >'
tstore = TupleStore(self.alphas)
str = '< '
for key in tstore:
p = 1.0
for i in range(len(self.store)):
value = self.store[i][key[i]]
if value != None and value != 0.0:
p *= value
else:
p = 0;
break;
str += (''.join(key) + ("=%4.2f " % p))
return str + ' >'
def items(self, sort = False):
""" In a dictionary-like way return all entries as a list of 2-tuples (key, prob).
If sort is True, entries are sorted in descending order of probability.
Note that this function should NOT be used for big (>5 variables) tables."""
tstore = TupleStore(self.alphas)
ret = []
for key in tstore:
p = 1.0
for i in range(len(self.store)):
value = self.store[i][key[i]]
if value != None and value != 0.0:
p *= value
else:
p = 0;
break;
if p > 0.0:
ret.append((key, p))
if sort:
return sorted(ret, key=lambda v: v[1], reverse=True)
return ret
#################################################################################################
# Naive Bayes' classifier
#################################################################################################
class NaiveBayes():
""" NaiveBayes implements a classifier: a model defined over a class variable
and conditional on a list of discrete feature variables.
Note that feature variables are assumed to be independent. """
def __init__(self, inputs, output, pseudo_input = 0.0, pseudo_output = 0.0):
""" Initialise a classifier.
inputs: list of alphabets that define the values that input variables can take.
output: alphabet that defines the possible values the output variable takes
pseudo_input: pseudo-count used for each input variable (default is 0.0)
pseudo_output: pseudo-count used for the output variable (default is 0.0) """
if type(inputs) is Alphabet:
self.inputs = tuple( [inputs] )
elif type(inputs) is tuple:
self.inputs = inputs
else:
self.inputs = tuple( inputs )
self.condprobs = {} # store conditional probabilities as a dictionary (class is key)
for outsym in output: # GIVEN the class
# for each input variable initialise a conditional probability
self.condprobs[outsym] = [ Distrib(input, pseudo_input) for input in self.inputs ]
self.classprob = Distrib(output, pseudo_output) # the class prior
def observe(self, inpseq, outsym):
""" Record an observation of an input sequence of feature values that belongs to a class.
inpseq: sequence/list of feature values, e.g. 'ATG'
outsym: the class assigned to these feature values. """
condprob = self.condprobs[outsym]
for i in range(len(inpseq)):
condprob[i].observe(inpseq[i])
self.classprob.observe(outsym)
def __getitem__(self, key):
""" Determine and return the class probability GIVEN a specified n-tuple of feature values
The class probability is given as an instance of Distrib. """
out = Distrib(self.classprob.alpha)
for outsym in self.classprob.getSymbols():
condprob = self.condprobs[outsym]
prob = self.classprob[outsym]
for i in range(len(key)):
prob *= condprob[i][key[i]] or 0.0
out.observe(outsym, prob)
return out
#################################################################################################
# Markov chain
#################################################################################################
class MarkovChain():
""" Markov Chain in a simple form; supports higher-orders and can determine (joint) probability of sequence.
"""
def __init__(self, alpha, order = 1, startsym = '^', endsym = '$'):
""" Construct a new Markov chain based on a given alphabet of characters.
alpha: alphabet of allowed characters and states
order: the number of states to include in memory (default is 1)
startsym: the symbol to mark the first character in the internal sequence, and the first state
endsym: the symbol to mark the termination of the internal sequence (and the last state)
"""
self.order = order
self.startsym = startsym
self.endsym = endsym
self.alpha = getTerminatedAlphabet(alpha, self.startsym, self.endsym)
self.transit = TupleStore([self.alpha for _ in range(order)]) # transition probs, i.e. given key (prev state/s) what is the prob of current state
def _getpairs(self, term_seq):
""" Return a tuple of all (tuple) Markov pairs from a sequence. Used internally. """
ret = []
for i in range(len(term_seq) - self.order):
past = tuple(term_seq[i:i + self.order])
present = term_seq[i + self.order]
ret.append(tuple([past, present]))
return tuple(ret)
def observe(self, wholeseq):
""" Set parameters of Markov chain by counting transitions, as observed in the sequence.
wholeseq: the sequence not including the termination symbols.
"""
myseq = _terminate(wholeseq, self.order, self.startsym, self.endsym)
for (past, present) in self._getpairs(myseq):
d = self.transit[past]
if not d: # no distrib
d = Distrib(self.alpha)
self.transit[past] = d
d.observe(present)
def __getitem__(self, wholeseq):
""" Determine the log probability of a given sequence.
wholeseq: the sequence not including the termination symbols.
returns the joint probability
"""
myseq = _terminate(wholeseq, self.order, self.startsym, self.endsym)
logp = 0
for (past, present) in self._getpairs(myseq):
d = self.transit[past]
if not d:
return None
p = d[present]
if p == 0:
return None
logp += math.log(p)
return logp
def _terminate(unterm_seq, order = 1, startsym = '^', endsym = '$'):
""" Terminate sequence with start and end symbols """
term_seq = [startsym for _ in range(order)]
term_seq.extend(unterm_seq)
term_seq.append(endsym)
return term_seq
def getTerminatedAlphabet(alpha, startsym = '^', endsym = '$'):
""" Amend the given alphabet with termination symbols """
return Alphabet(alpha.symbols + tuple([startsym, endsym]))
#################################################################################################
# Hidden Markov model (HMM)
#################################################################################################
class HMM():
""" Basic, first-order HMM.
Has functionality to set up HMM, and query it with Viterbi and Forward algorithms."""
def __init__(self, states, symbols, startstate = '^', endstate = '$'):
""" Construct HMM with states and symbols, here given as strings of characters.
> cpg_hmm = prob.HMM('HL','ACGT')
"""
if isinstance(states, str):
states = Alphabet(states)
self.mystates = getTerminatedAlphabet(states, startstate, endstate)
if isinstance(symbols, str):
symbols = Alphabet(symbols)
self.mysymbols = getTerminatedAlphabet(symbols, startstate, endstate)
self.a = dict() # transition probabilities
self.e = dict() # emission probabilities
self.startsym = startstate
self.endsym = endstate
def transition(self, fromstate, distrib):
""" Add a transition to the HMM, determining with the probability of transitions, e.g.
> cpg_hmm.transition('^',{'^':0,'$':0,'H':0.5,'L':0.5})
> cpg_hmm.transition('H',{'^':0,'$':0.001,'H':0.5,'L':0.5})
> cpg_hmm.transition('L',{'^':0,'$':0.001,'H':0.4,'L':0.6})
> cpg_hmm.transition('$',{'^':1,'$':0,'H':0,'L':0})
"""
if not isinstance(distrib, Distrib):
distrib = Distrib(self.mystates, distrib)
self.a[fromstate] = distrib
def emission(self, state, distrib):
""" Add an emission probability to the HMM, e.g.
> cpg_hmm.emission('^',{'^':1,'$':0,'A':0,'C':0,'G':0,'T':0})
> cpg_hmm.emission('H',{'^':0,'$':0,'A':0.2,'C':0.3,'G':0.3,'T':0.2})
> cpg_hmm.emission('L',{'^':0,'$':0,'A':0.3,'C':0.2,'G':0.2,'T':0.3})
> cpg_hmm.emission('$',{'^':0,'$':1,'A':0,'C':0,'G':0,'T':0})
"""
if not isinstance(distrib, Distrib):
distrib = Distrib(self.mysymbols, distrib)
self.e[state] = distrib
def joint(self, symseq, stateseq):
"""
Determine the joint probability of the sequence and the given path.
:param symseq: sequence of characters
:param stateseq: sequence of states
:return: the probability
"""
X = _terminate(symseq, 1, self.startsym, self.endsym)
P = _terminate(stateseq, 1, self.startsym, self.endsym)
p = 1
for i in range(len(X) - 1):
p = p * self.e[P[i]][X[i]] * self.a[P[i]][P[i + 1]]
return p
def viterbi(self, symseq, V = dict(), trace = dict()):
"""
Determine the Viterbi path (the most probable sequence of states) given a sequence of symbols
:param symseq: sequence of symbols
:param V: the Viterbi dynamic programming variable as a matrix (optional; pass an empty dict if you need it)
:param trace: the traceback (optional; pass an empty dict if you need it)
:return: the Viterbi path as a string of characters
> X = 'GGCACTGAA' # sequence of characters
> states = cpg_hmm.viterbi(X)
> print(states)
"""
X = _terminate(symseq, 1, self.startsym, self.endsym) # put start and end symbols on sequence
# Initialise state scores for each index in X
for state in self.mystates:
# Fill in emission probabilities in V for each index of X
# (only the first position is really needed)
V[state] = [self.e[state][x] for x in X]
trace[state] = []
# Next loop through the sequence
for j in range(len(X) - 1):
i = j + 1 # sequence index that we're processing start with 1, not 0
for tostate in self.mystates: # check each state for i = 1, ...
tracemax = 0
beststate = None # the state v with max[Vv(i-1) * t(v,u)]
for fromstate in self.mystates:
# determine the best score propagated forward from previous state
score = V[fromstate][i - 1] * self.a[fromstate][tostate]
if score > tracemax:
beststate = fromstate
tracemax = score
# record the transition that will appear in the traceback
trace[tostate].append(beststate)
# finalise the dynamic programming score for current i in state u
V[tostate][i] = self.e[tostate][X[i]] * tracemax
# finally, assemble the string that describes the most probable path
ret = ''
traced = '$'
for j in range(len(X)):
i = len(X) - 2 - j
traced = trace[traced][i]
if j > 0 and j < len(X) - 2:
ret = traced + ret
return ret
def forward(self, symseq, F = dict()):
"""
Determine the probability of the sequence, summing over all possible state paths
:param symseq: sequence of symbols
:param F: the Forward dynamic programming variable as a matrix (optional; pass an empty dict if you need it)
:return: the probability
> X = 'GGCACTGAA' # sequence of characters
> prob = cpg_hmm.forward(X)
> print(prob)
"""
X = _terminate(symseq, 1, self.startsym, self.endsym)
# Initialise state scores for each index in X
for state in self.mystates:
# Fill in emission probabilities for each index in X
F[state] = [self.e[state][x] for x in X]
for j in range(len(X) - 1):
i = j + 1 # sequence index that we're processing
for tostate in self.mystates:
mysum = 0
for fromstate in self.mystates:
mysum += F[fromstate][i - 1] * self.a[fromstate][tostate]
F[tostate][i] = self.e[tostate][X[i]] * mysum
traced = '$'
return F[traced][len(X) - 1]
def writeHTML(self, X, Viterbi, Trace = None, filename = None):
""" Generate HTML that displays a DP matrix from Viterbi (or Forward) algorithms.
> from IPython.core.display import HTML
> X = 'GGCACTGAA' # sequence of characters
> V = dict()
> T = dict()
> cpg_hmm.viterbi(X, V, T)
> HTML(cpg_hmm.writeHTML(X, V, T))
"""
html = '''\nHMM dynamic programming matrix\n\n'''
html += '
\n'
html += '\n'
html += 'X | \n'
for state in Viterbi:
html += '%s | \n' % str(state)
html += '
\n'
# process each sequence symbol
X = _terminate(X, 1, self.startsym, self.endsym)
for row in range(len(X)):
html += '\n'
html += 'x%d=%s | \n' % (row, str(X[row]))
for state in Viterbi:
if Trace and row > 0:
html += 'e%s(%s)=%4.2fV%s=%3.1e↑%s[%4.2f] | \n' % (str(state),X[row],self.e[state][X[row]],str(state),Viterbi[state][row],Trace[state][row - 1],self.a[Trace[state][row - 1]][state] if Trace[state][row - 1] != None else 0)
else:
html += 'e%s(%s)=%4.2fV%s=%3.1e | \n' % (str(state),X[row],self.e[state][X[row]],str(state),Viterbi[state][row])
html += '
\n'
html += '
\n'
html += ''
if filename:
fh = open(filename, 'w')
fh.write(html)
fh.close()
return html
binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/rcdict.py 0000664 0000000 0000000 00000004225 13400670236 0021307 0 ustar 00root root 0000000 0000000 import sym
class RCDict(dict):
""" Class that extends a standard dictionary to accept only fixed-length DNA symbol strings as keys.
Additionally, it maps the reverse complement to the same value. """
def __init__(self, alpha = sym.DNA_Alphabet):
""" Initialise a reverse-complement dictionary to accept strings of a given alphabet (DNA by default) """
self.alpha = alpha
self.length = None
def __setitem__(self, key, value):
""" Set the value for a key.
Checks to see that if
(a) previous keys have been used that the length is the same, and
(b) the key consists only of valid symbols. """
if self.length == None:
self.length = len(key)
elif len(key) != self.length:
raise RuntimeError("Invalid key: " + str(key))
for i in range(len(key)):
if not key[i] in sym.DNA_Alphabet:
raise RuntimeError("Invalid symbol in key: " + str(key[i]))
super(RCDict, self).__setitem__(self.canonical(key), value)
def canonical(self, key):
""" Figures out the canonical key (original or its reverse complement).
Note that is you use other than DNA you may need to modify this code. """
if self.length == None:
return key
alpha = self.alpha
rcindx = [0 for _ in range(self.length)]
fwindx = [alpha.index(sym) for sym in key]
undecided = True
for forw in range(self.length):
backw = self.length - forw - 1
rcindx[forw] = 3 - fwindx[backw] # formula for converting A <-> T, C <-> G
if undecided and rcindx[forw] > fwindx[forw]: # RC is "higher" than original
return key
undecided = rcindx[forw] == fwindx[forw]
return ''.join([alpha.symbols[indx] for indx in rcindx])
def __getitem__(self, key):
""" Retrieve the value associated with a specified key. """
return super(RCDict, self).__getitem__(self.canonical(key))
def getSum(self, IUPAC_key):
""" Retrieve the sum of all the entries that match the specified IUPAC key. """
# TODO
pass
binfpy-e1db8f84908d7f3d2146551c9451d88557b82827/sam.py 0000664 0000000 0000000 00000051365 13400670236 0020626 0 ustar 00root root 0000000 0000000 """
This python module reads in sam files from RNA-seq experiment and processes
them and RNA-seq data1
"""
from collections import Counter
import matplotlib.pyplot as plt
import numpy as np
import itertools
import operator
import math
from scipy import stats
from numpy import array, empty
import scipy.cluster.hierarchy as sch
def sam_reader(filename):
"""Mandatory fields are QNAME,FLAG,RNAME,POS,MAPQ,CIGAR,RNEXT,PNEXT,TLEN,SEQ,QUAL
for more info http://samtools.github.io/hts-specs/SAMv1.pdf """
data=[]
f= open(filename,'r')
for row in f:
if row.startswith('@'): # skip the header
pass
else:
info=row.strip().split('\t')
data.append(info)
return data
def base_percentages(reads):
"reports base percentage %A,%T,%C,%G "
all_seqs=[]
for read in reads:
seq=read[9]
seq=[seq[i:i+1] for i in range(0,len(seq),1)]
for nuc in seq:
all_seqs.append(nuc)
counts=dict(Counter(all_seqs))
nucs=list(counts.keys())
freqs={}
for nuc in nucs:
freqs[nuc]=float(counts[nuc])/sum(counts.values())
return freqs
def numberofreads(reads):
"""Incremented for every sequence-containing line in the sam file, regardless of whether it represents an alignment.
for some files, this is not actually the number of reads. indeed, this may be a poor name for this stat"""
return len(reads)
def mapped_reads(reads,paired_end=True):
"""If duplicate tracking was enabled via -D, then this attempts to recapitulate the number of unique, mapped, probe-id's in the original sam file. It is multiplied by 2 for paired-end data with duplicate read id's.
The idea is that if you divide this by the number of reads in the fastq you aligned (possibly from the output of fastq-stats),
you will get an accurate "percentage of reads aligned" statistic.
"mapped" is something with a non-negative position, and a "non-asterisk" cigar string."""
mapped_reads=[]
store_reads=[]
for read in reads:
if read[3]>0 and read[5]!='*':
mapped_reads.append(read[0])
store_reads.append(read)
mapped=set(mapped_reads)
list_mapped=list(mapped)
if paired_end==True:
mapped=len(mapped)+len(mapped)
else:
mapped=len(mapped)
print("number of mapped reads",mapped)
return store_reads
def mappedBases(mapped_reads):
"""Total number of mapped bases in sam file"""
seq=""
for read in mapped_reads:
seq=seq+read[9]
return len(seq)
def forward(mapped_reads):
"""The number of lines in the sam file that were aligned to the "forward" strand. No accounting is done on duplicates."""
forward=[read for read in mapped_reads if read[9]>0]
return forward
def reverse(mapped_reads):
"""The number of lines in the sam file that were aligned to the "reverse" strand. No accounting is done on duplicates."""
reverse=[read for read in mapped_reads if read[9]<0]
return reverse
########Qualities and STATS
def subgroups(mapped_reads):
"""form groups p<1e-3 one group,1e-3<=p<1e-2 one group,1e-2<=p<1 one group a total of three groups"""
group1=[]
group2=[]
group3=[]
for read in mapped_reads:
if int(read[4])>29:
group1.append(read)
elif int(read[4])<=29 and int(read[4])>17:
group2.append(read)
elif int(read[4])<=17:
group3.append(read)
else:
pass
print(len(group1),"in p<1e-3 group")
print(len(group2),"in 1e-3<=p<1e-2 group")
print(len(group3),"in 1e-2<=p<1 group")
return group1,group2,group3
def dinuc_freq(mapped_reads):
"reports dinucleotide composition using p(Rho) statistics for overrepresentation"
all_seqs=[]
for read in mapped_reads:
seq=read[9]
seq=[seq[i:i+1] for i in range(0,len(seq),1)]
for nuc in seq:
all_seqs.append(nuc)
counts=dict(Counter(all_seqs))
nucs=list(counts.keys())
freqs={}
for nuc in nucs:
freqs[nuc]=float(counts[nuc])/sum(counts.values())
all_seqs=[]
for read in mapped_reads:
seq=read[9]
seq=[seq[i:i+2] for i in range(0,len(seq),2)]
for nuc in seq:
all_seqs.append(nuc)
counts=dict(Counter(all_seqs))
dinucs=list(counts.keys())
dinuc_counts={}
for i in dinucs:
val=float(counts[i])/sum(counts.values())
dinuc_counts[i]=val/(freqs[i[0]]*freqs[i[1]]) # p-values
return dinuc_counts
def PercentReadsAligned(group1,group2,group3,numfastq):
"""Provide a list of mapped_reads and the number of reads in the fastq file"""
mapped_reads=group1+group2+group3
Mapped=len(mapped_reads)/float(numfastq)
Unmapped=1-float(Mapped)
## print "Mapping stats"
## print"p<1e-3", len(group1)/float(numfastq)
## print"1e-3<=p<1e-2",len(group2)/float(numfastq)
## print "1e-2<=p<1",len(group3)/float(numfastq)
## print "Unmapped",Unmapped
labels="p<1e-3","1e-3<=p<1e-2","1e-2<=p<1","Unmapped"
x=[len(group1)/float(numfastq),len(group2)/float(numfastq),len(group3)/float(numfastq),Unmapped]
plt.figure(1, figsize=(8,8))
ax = plt.axes([0.1, 0.1, 0.8, 0.8])
plt.pie(x,labels=labels,autopct='%1.1f%%', shadow=True)
plt.title('Mapping stats')
plt.show()
return Mapped
def length_stats(group1,group2,group3):
"""returns basic stats relating to the lengths of the reads
Calculations are based on the the length of the (possibly hard-clipped) sequence in the sam file."""
reads=[group1,group2,group3]
data=[]
for i in range(0,len(reads)):
lengths=[]
for read in reads[i]:
if int(read[8])<0:
length=-1*int(read[8])
else:
length=int(read[8])
lengths.append(length)
mean_len=np.mean(lengths)
print("group"+str(i+1)+"mean",mean_len)
max_len=np.max(lengths)
print("group"+str(i+1)+"max length",max_len)
min_len=np.min(lengths)
print("group"+str(i+1)+"min length",min_len)
data.append(["group"+str(i+1),mean_len,max_len,min_len])
return data
def plot_length_distrib(group,name):
"""distribution of lengths of all the sam reads"""
lengths=[]
for read in group:
if int(read[8])<0:
length=-1*int(read[8])
else:
length=int(read[8])
lengths.append(length)
##Visualize length distribution
plt.figure(1, figsize=(8,8))
ax = plt.axes([0.1, 0.1, 0.8, 0.8])
n, bins, patches = plt.hist(lengths,100, normed=0, facecolor='g')
plt.xlabel("lengths")
plt.ylabel("number of mapped reads")
plt.title(name)
plt.show()
def inv_logit(p):
return 10**(p/-10)
def plot_base_composition(reads,sym):
"reports nucelotide frequencies at each position in the sam sequences"
#DNA_Alphabet=["A","C","T","G","N"]
all_nucs=[]
for read in reads:
nucs={}#dictionary to store nucleotide data
seq=read[9]
for i in range(0,len(seq)):
nucs[str(i+1)]=seq[i]
all_nucs.append(nucs)
all_items=[]
counts=[]
pos=list(range(1,len(seq)+1))
for dicts in all_nucs:
for item in list(dicts.items()):
all_items.append(item)
all_items.sort(key=operator.itemgetter(0))
groups= [list(map(operator.itemgetter(1),list(group))) for key, group in itertools.groupby(all_items, operator.itemgetter(0))]
for group in groups:
counts.append(group.count(sym))
print(counts)
plt.figure(1, figsize=(8,8))
ax = plt.axes([0.1, 0.1, 0.8, 0.8])
plt.bar(pos,counts,facecolor='g')
plt.xlabel("Position")
plt.ylabel("number of mapped reads")
plt.title(sym)
plt.show()
return counts
#####################################################
#Transcript reader
def raw_count_reader(filename):
"""Count the raw counts in the file"""
data={}
f= open(filename,'r')
for row in f:
if row.startswith('t1'): # skip the header
pass
else:
info=row.strip().split('\t')
data[info[0]]=[int(info[1]),int(info[2]),int(info[3]),int(info[4]),float(info[5])] #t1,rept1,t10,rept10,len
return data
#####Normalisation methods
def get_RPKM(data,num_map1,num_map2,num_map3,num_map4):
"""provide number of mapped reads for the two groups of interest and raw count data .This method provides length normalisation to prevent length and total count bias"""
all_rpkms=[];final={}
for i,s,ii,ss,v in list(data.values()):
rpkms=[]
num_mapped_reads=[num_map1,num_map2,num_map3,num_map4]
vals=[i,s,ii,ss]
lengths=[v,v,v,v]
for n in range(0,len(vals)):
if vals[n]==0:
rpkm=0
rpkms.append(rpkm)
else:
#perform RPKM calc
rpkm= float(vals[n])/(lengths[n]*(float(num_mapped_reads[n])/10**6))
rpkms.append(rpkm)
all_rpkms.append(rpkms)
#return gene names and rpkms
for i in range(0,len(list(data.keys()))):
final[list(data.keys())[i]]=[float(all_rpkms[i][0]),float(all_rpkms[i][1]),float(all_rpkms[i][2]),float(all_rpkms[i][3])]
return final
def write_RPKM_data(RPKM_data,filename):
"""write RPKM data to a file"""
f=open(filename,'w')
for i in range(0,len(RPKM_data)):
f.write("%s\t%d\t%d\t%d\t%d\n"%(list(RPKM_data.keys())[i],int(list(RPKM_data.values())[i][0]),int(list(RPKM_data.values())[i][1]),int(list(RPKM_data.values())[i][2]),int(list(RPKM_data.values())[i][3])))
f.close()
###############Visualize replicates to determine degree of biological variation
def pearson_def(x, y):
"""Pearson correlation coefficient R"""
assert len(x) == len(y)
n = len(x)
assert n > 0
avg_x = np.mean(x)
avg_y = np.mean(y)
diffprod = 0
xdiff2 = 0
ydiff2 = 0
for idx in range(n):
xdiff = x[idx] - avg_x
ydiff = y[idx] - avg_y
diffprod += xdiff * ydiff
xdiff2 += xdiff * xdiff
ydiff2 += ydiff * ydiff
return diffprod / math.sqrt(xdiff2 * ydiff2)
def plotreprpkm(rpkm_data,timepoint):
"""plot showing level of agreement between technical replicates for RPKM between replicates and plots coefficient of determination"""
one=[]
two=[]
if timepoint=="t1":
for i in range(0,len(list(rpkm_data.values()))):
one.append(int(list(rpkm_data.values())[i][0]))
two.append(int(list(rpkm_data.values())[i][1]))
else:
for i in range(0,len(list(rpkm_data.values()))):
one.append(int(list(rpkm_data.values())[i][2]))
two.append(int(list(rpkm_data.values())[i][3]))
plt.plot(one,two,'o')
pcc=pearson_def(one,two)
R2=pcc**2
name="""Technical Replicates
R2="""+str(R2)
m,b= np.polyfit(one,two,1)
plt.figure(1, figsize=(8,8))
plt.plot(one, np.array(one)*m +b,'r-')
plt.text(3000, max(two)-1000,name , fontsize=12)
plt.xlabel("RPKM replicate 1")
plt.ylabel("RPKM replicate 2")
plt.title(timepoint)
plt.show()
def plotMAreprpkm(rpkm_data,timepoint):
"""MA Plot of log(RPKM) vs Average log(RPKM) of replicates"""
m=[]
a=[]
if timepoint=="t1":
for i in range(0,len(list(rpkm_data.values()))):
y=np.log2(list(rpkm_data.values())[i][0]+1)-np.log2(list(rpkm_data.values())[i][1]+1)
x=(np.log2(list(rpkm_data.values())[i][0]+1)+np.log2(list(rpkm_data.values())[i][1]+1))/2
m.append(y)
a.append(x)
else:
for i in range(0,len(list(rpkm_data.values()))):
y=np.log2(list(rpkm_data.values())[i][2]+1)-np.log2(list(rpkm_data.values())[i][3]+1)
x=(np.log2(list(rpkm_data.values())[i][2]+1)+np.log2(list(rpkm_data.values())[i][3]+1))/2
m.append(y)
a.append(x)
plt.figure(1, figsize=(8,8))
ax = plt.axes([0.1, 0.1, 0.8, 0.8])
plt.plot(a,m,'o')
plt.axhline(np.mean(m)+1.96*np.std(m),color="green",label="avg diff +1.96(std diff)")
plt.axhline(np.mean(m)-1.96*np.std(m),color="green",label="avg diff -1.96(std diff)")
plt.xlabel("Average log(RPKM) of replicates")
plt.ylabel("Difference in log(RPKM) of replicates")
plt.legend(loc="lower right")
plt.title(timepoint)
plt.show()
def get_cv(data1,condition):
cvs=[]
if condition=="t1":
for i in range(0,len(list(data1.values()))):
mean = np.mean([list(data1.values())[i][0],list(data1.values())[i][1]])
std=np.std([list(data1.values())[i][0],list(data1.values())[i][1]])
if mean==0.0 and std==0.0:
pass
else:
cv=float(mean+1)/(std+1)
cvs.append(cv)
else:
for i in range(0,len(list(data1.values()))):
mean = np.mean([list(data1.values())[i][2],list(data1.values())[i][3]])
std=np.std([list(data1.values())[i][2],list(data1.values())[i][3]])
if mean==0.0 and std==0.0:
pass
else:
cv=float(mean+1)/(std+1)
cvs.append(cv)
return cvs
def get_boxplots(norm,original):
"""distribution of the coeficient of variation across samples (replicates) normalised using the methods provided"""
bp=plt.boxplot([norm,original],notch=False, patch_artist=True)
for box in bp['boxes']:
box.set(color="red")
box.set(color="blue")
plt.ylabel("coefficient of variation")
plt.xlabel("Methods")
my_xticks = ['RPKM','raw counts']
x=[1,2]
plt.xticks(x,my_xticks)
plt.ylim(0,400)
plt.show()
def plotavg_cv(norm,original):
"""distribution of the coeficient of variation across samples (replicates) normalised using the methods provided"""
x=[1,2]
y=[np.mean(norm),np.mean(original)]
plt.figure(1, figsize=(8,8))
ax = plt.axes([0.1, 0.1, 0.8, 0.8])
plt.bar(x[0],y[0],color="red",label="RPKM")
plt.bar(x[1],y[1],color="blue",label="Raw counts")
plt.ylabel("Average coefficient of variation")
plt.xlabel("Methods")
ax.xaxis.set_ticklabels([])
plt.legend(loc="upper right")
plt.show()
def plotMA(rpkm_data,cutoff=[-1.5,1.5]):
"""Produce MA plot using logfold as cutoff"""
logfc=[]
avg_rpkm=[]
sig_logfc=[]
sig_avg_rpkm=[]
logfc2=[]
avg_rpkm2=[]
sig_logfc2=[]
sig_avg_rpkm2=[]
for i,ii,s,ss in list(rpkm_data.values()):
fc=np.log2(float(s+1)/(i+1))
if fccutoff[1]:
sig_logfc.append(fc)
sig_avg_rpkm.append(np.log2(s+1)+np.log2(i+1)/2)
else:
logfc.append(fc)
avg_rpkm.append(np.log2(s+1)+np.log2(i+1)/2)
for i,ii,s,ss in list(rpkm_data.values()):
fc2=np.log2(float(ss+1)/(ii+1))
if fc2cutoff[1]:
sig_logfc2.append(fc2)
sig_avg_rpkm2.append(np.log2(ss+1)+np.log2(ii+1)/2)
else:
logfc2.append(fc2)
avg_rpkm2.append(np.log2(ss+1)+np.log2(ii+1)/2)
plt.figure(1, figsize=(8,8))
ax = plt.axes([0.1, 0.1, 0.8, 0.8])
plt.plot(avg_rpkm,logfc,'o',color="blue",label="rep1")
plt.plot(avg_rpkm2,logfc2,'x',color="blue",label="rep2")
plt.plot(sig_avg_rpkm,sig_logfc,'o',color="red",label="sig rep1")
plt.plot(sig_avg_rpkm2,sig_logfc2,'x',color="red",label="sig rep2")
plt.axhline(cutoff[0],color="orange")
plt.axhline(cutoff[1],color="orange")
plt.ylabel("Fold Change (log2)")
plt.xlabel("Average RPKM (log2)")
plt.title("MA plot")
plt.legend(loc="upper left")
plt.show()
def plotMA_pval(rpkm_data,cutoff=0.05):
"""Produce MA plot using the pvalue as cutoff"""
logfc=[]
avg_rpkm=[]
sig_logfc=[]
sig_avg_rpkm=[]
logfc2=[]
avg_rpkm2=[]
sig_logfc2=[]
sig_avg_rpkm2=[]
for i,ii,s,ss,pval in list(rpkm_data.values()):
fc=np.log2(float(s+1)/(i+1))
if float(pval)