parent a30165e5
 ... ... @@ -288,7 +288,7 @@ def scoreAlignment(aln, substmat = None, gap = -1): if gap_here: score = gap else: score = substmat.get(aln.seqs[i][pos], aln.seqs[j][pos]) score = substmat.__getitem__(aln.seqs[i][pos], aln.seqs[j][pos]) if min == None: min = score elif min > score: ... ... @@ -317,7 +317,7 @@ def align(seqA, seqB, substMatrix, gap=-1): # Calculate the optimum score at each location in the matrix, note which option that was chosen for traceback for i in range(1, lenA + 1): for j in range(1, lenB + 1): match = S[i - 1, j - 1] + substMatrix.get(stringA[i - 1], stringB[j - 1]) match = S[i - 1, j - 1] + substMatrix.__getitem__(stringA[i - 1], stringB[j - 1]) delete = S[i - 1, j] + gap insert = S[i, j - 1] + gap Traceback[i, j] = numpy.argmax([match, delete, insert]) ... ... @@ -382,7 +382,7 @@ class SubstMatrix(): G -1 -1 1 T -1 -1 -1 1 A C G T >>> sm.get('C', 'T') >>> sm.__getitem__('C', 'T') -1 """ def __init__(self, alphabet, scoremat = None): ... ...
hca.py 0 → 100644
This diff is collapsed.
heap.py 0 → 100644
 import numpy as np class LabelHeap: """ Min and max heap: data structure for keeping a list of labels, sorted by a value associated with each. Based on max heap in Necaise, "Data structures and algorithms in Python" (Ch 13); fixed a bunch of bugs though... """ def __init__(self, maxsize, reverse = False): """ Initialise a heap. :param maxsize: the maximum size of the heap :param reverse: heap in descending order if true, else ascending """ self.reverse = reverse self._elements = np.array([None for _ in range(maxsize)]) self._idx2val = dict() self._count = 0 def __len__(self): """ The number of elements in the heap currently. :return: the number of added elements """ return self._count def __str__(self): """ String representation of heap. A list of labels in a binary tree (first element is the smallest/greatest value) :return: heap as a string """ return str([y for y in self._elements[:self._count]]) def __repr__(self): return self.__str__() def capacity(self): """ Maximum size allocated to heap :return: the number of elements that this heap can store """ return len(self._elements) def __getitem__(self, i): """ Retrieve the value by tree index (index 0 is the root and contains the smallest/greatest value) :param i: index in tree :return: the value at this index """ return self._idx2val[self._elements[i]] def add(self, label, value): """ Add a label with value to heap :param label: :param value: """ assert self._count < self.capacity(), "Cannot add to a full heap" assert not label in self._idx2val, "Cannot add a duplicate label" self._elements[self._count] = label self._idx2val[label] = value self._count += 1 self._siftUp(self._count - 1) def pop(self): """ Pop the (label, value) pair with minimum/maximum value; removes the entry :return: tuple with label and value """ assert self._count > 0, "Cannot extract from an empty heap" label = self._elements self._count -= 1 self._elements = self._elements[self._count] self._siftDown(0) return (label, self._idx2val[label]) def peek(self): """ Peek the (label, value) pair with minimum/maximum value; does not change the heap :return: tuple with label and value """ assert self._count > 0, "Cannot peek in an empty heap" return (self._elements, self._idx2val[self._elements]) def _delete(self, i): """ Delete by internal, binary tree index :param i: index :return: """ assert self._count > i, "Cannot delete index" + str(i) self._count -= 1 self._elements[i] = self._elements[self._count] self._siftDown(i) def _siftUp(self, i): if i > 0: parent = (i-1) // 2 if (self[i] > self[parent] if self.reverse else self[i] < self[parent]): # swap tmp = self._elements[i] self._elements[i] = self._elements[parent] self._elements[parent] = tmp self._siftUp(parent) def _siftDown(self, i): left = 2 * i + 1 right = 2 * i + 2 extremist = i if left < self._count and (self[left] >= self[extremist] if self.reverse else self[left] <= self[extremist]): extremist = left if right < self._count and (self[right] >= self[extremist] if self.reverse else self[right] <= self[extremist]): extremist = right if extremist != i: # swap tmp = self._elements[i] self._elements[i] = self._elements[extremist] self._elements[extremist] = tmp self._siftDown(extremist) \ No newline at end of file
 ... ... @@ -97,7 +97,7 @@ class IntervalTree: def putAll(self, tree): for i in tree: self.put(i.getInterval(), tree.get(i.getInterval())) self.put(i.getInterval(), tree.__getitem__(i.getInterval())) def _randomizedInsert(self, node, ival, value): if node == None: return IntervalNode(ival, value) ... ...
 ... ... @@ -150,6 +150,8 @@ class PhyloNode: A number of methods are named with a _ prefix. These can be, but are not intended to be used from outside the class. """ _verbose = True def __init__(self, parent = None, label=''): """ Initialise a node. Set its parent (another PhyloNode), parent can be None. ... ... @@ -183,7 +185,8 @@ class PhyloNode: for i in range(self.nChildren()): stubs[i] = str(self.children[i]) if self.dist or self.dist == 0.0: dist = ':' + str(self.dist) if self.dist == 0.0: dist = '' else: dist = ':' + '%5.3f' % self.dist if self.label != None: label = str(self.label) if self.nChildren() == 0: ... ... @@ -277,6 +280,7 @@ class PhyloNode: else: self.seqscores = [[0 if a == sym else 999999 for a in aln.alphabet] for sym in self.sequence] # if we want to weight scores, this would need to change if self._verbose: print('Forward:', self.label, '\n\t', self.seqscores) return self.seqscores def _backwardParsimony(self, aln, seq=None): ... ... @@ -314,6 +318,7 @@ class PhyloNode: col += 1 for i in range(self.nChildren()): self.children[i]._backwardParsimony(aln, sequence.Sequence(childbuf[i], aln.alphabet, self.children[i].label or "Child of "+self.label, gappy=True)) if self._verbose: print('Backward:', self.label, '\n\t', self.backptr) return self.sequence def getSequence(self): ... ... @@ -394,7 +399,6 @@ class PhyloNode: Methods for generating a single tree by clustering, here UPGMA Zvelebil and Baum p. 278 ----------------------------------------------------------------------------------------""" def runUPGMA(aln, measure, absoluteDistances=False): """ Generate an ultra-metric, bifurcating, rooted tree from an alignment based on pairwise distances. Use specified distance metric (see sequence.calcDistances). ... ... @@ -403,6 +407,7 @@ def runUPGMA(aln, measure, absoluteDistances=False): D = {} N = {} # The number of sequences in each node M = aln.calcDistances(measure) # determine all pairwise distances print(M) nodes = [PhyloNode(label=seq.name) for seq in aln.seqs] # construct all leaf nodes """ For each node-pair, assign the distance between them. """ for i in range(len(nodes)): ... ... @@ -411,16 +416,17 @@ def runUPGMA(aln, measure, absoluteDistances=False): N[nodes[i]] = 1 # each cluster contains a single sequence for j in range(0, i): D[frozenset([nodes[i], nodes[j]])] = M[i, j] """ Now: treat each node as a cluster, until there is only one cluster left, find the *closest* pair of clusters, and merge that pair into a new cluster (to replace the two that merged). """ Treat each node as a cluster, until there is only one cluster left, find the *closest* pair of clusters, and merge that pair into a new cluster (to replace the two that merged). In each case, the new cluster is represented by the (phylo)node that is formed. """ while len(N) > 1: # N will contain all "live" clusters, to be reduced to a signle below while len(N) > 1: # N will contain all "live" clusters, to be reduced to a single below closest_pair = (None, None) # The two nodes that are closest to one another according to supplied metric closest_dist = None # The distance between them print(len(N), 'nodes remain') for pair in D: # check all pairs which should be merged dist = D[pair] pair_as_list = list(pair) print('Inspecting \"' + str(pair_as_list) + '\" and \"' + str(pair_as_list) + '\" at distance %5.3f' % D[pair]) if closest_dist == None or dist < closest_dist: closest_dist = dist closest_pair = list(pair) ... ... @@ -428,21 +434,23 @@ def runUPGMA(aln, measure, absoluteDistances=False): x = closest_pair # See Zvelebil and Baum p. 278 for notation y = closest_pair z = PhyloNode() # create a new node for the cluster z z.dist = D.pop(frozenset([x, y])) / 2.0 # assign the absolute distance, travelled so far, note: this will change to relative distance later z.dist = D.pop(frozenset([x, y])) / 2.0 # assign the absolute distance, change to relative distance later Nx = N.pop(x) # find number of sequences in x, remove the cluster from list N Ny = N.pop(y) # find number of sequences in y, remove the cluster from list N dz = {} # new distances to cluster z x.parent = z y.parent = z z.children = [x, y] print('Closest pair is \"' + str(x) + '\" ('+str(Nx)+') and \"' + str(y) + '\" ('+str(Ny)+') at distance %5.3f' % (z.dist * 2), 'form new node ' + str(z)) for w in N: # for each node w ... # we will merge x and y into a new cluster z, so need to consider w (which is not x or y) dxw = D.pop(frozenset([x, w])) # retrieve and remove distance from D: x to w dyw = D.pop(frozenset([y, w])) # retrieve and remove distance from D: y to w dz[w] = (Nx * dxw + Ny * dyw) / (Nx + Ny) # distance: z to w print(str(z) + ' gets distance to \"' + str(w) + '\": (', Nx, '* %5.3f' % dxw, '+', Ny, '* %5.3f' % dyw, ') / (', Nx, '+', Ny, ') = %5.3f' % dz[w]) N[z] = Nx + Ny # total number of sequences in new cluster, insert new cluster in list N for w in dz: # we have to run through the nodes again, now not including the removed x and y D[frozenset([z, w])] = dz[w] # for each "other" cluster, update distance per EQ8.16 (Z&B p. 278) x.parent = z y.parent = z z.children = [x, y] nodes.append(z) if not absoluteDistances: x._propagateDistance(z.dist) # convert absolute distances to relative by recursing down left path ... ... @@ -595,7 +603,17 @@ if __name__ == '__main__1': print(tree) if __name__ == '__main__': tree = readNewick('/Users/mikael/simhome/ASR/parsitest.nwk') tree.putAlignment(sequence.Alignment(sequence.readFastaFile('/Users/mikael/simhome/ASR/parsitest.aln', sequence.DNA_Alphabet))) aln = sequence.readFastaFile('/Users/mikael/Documents/Teaching/SCIE2100/Exams/pdistupgma.aln', sequence.Protein_Alphabet) tree = runUPGMA(sequence.Alignment(aln), "fractional") writeNewickFile('/Users/mikael/Documents/Teaching/SCIE2100/Exams/pdistupgma.nwk', tree) if __name__ == '__main__3': aln = sequence.readClustalFile('/Users/mikael/simhome/ASR/dp16_example.aln', sequence.Protein_Alphabet) tree = runUPGMA(aln, "poisson") writeNewickFile('/Users/mikael/simhome/ASR/dp16_example_UPGMA.nwk', tree) if __name__ == '__main__4': tree = readNewick('/Users/mikael/simhome/ASR/parsitest2.nwk') tree.putAlignment(sequence.Alignment(sequence.readFastaFile('/Users/mikael/simhome/ASR/parsitest2.aln', sequence.DNA_Alphabet))) tree.parsimony() print(tree.strSequences()) \ No newline at end of file
 ... ... @@ -269,12 +269,13 @@ def parseDefline(string): """ if len(string) == 0: return ('', '', '', '') s = string.split() if re.match("^sp\|[A-Z][A-Z0-9]{5}\|\S+", s): arg = s.split('|'); return (arg, arg, arg, '') if re.match("^sp\|[A-Z][A-Z0-9]*\|\S+", s): arg = s.split('|'); return (arg, arg, arg, '') elif re.match("^tr\|[A-Z][A-Z0-9]*\|\S+", s): arg = s.split('|'); return (arg, arg, arg, '') elif re.match("^gi\|[0-9]*\|\S+\|\S+", s): arg = s.split('|'); return (arg, arg, arg, arg) elif re.match("gb\|\S+\|\S+", s): arg = s.split('|'); return (arg, arg, arg, '') elif re.match("emb\|\S+\|\S+", s): arg = s.split('|'); return (arg, arg, arg, '') elif re.match("^refseq\|\S+\|\S+", s): arg = s.split('|'); return (arg, arg, arg, '') elif re.match("[A-Z][A-Z0-9]*\|\S+", s): arg = s.split('|'); return (arg, arg, 'UniProt', '') # assume this is UniProt else: return (s, '', '', '') def readFastaFile(filename, alphabet = None, ignore = False, gappy = False, parse_defline = True): ... ... @@ -849,7 +850,7 @@ def alignGlobal(seqA, seqB, substMatrix, gap = -1): # that ends at sequence indices i and j, for A and B, resp.) for i in range(1, lenA + 1): for j in range(1, lenB + 1): match = S[i-1, j-1] + substMatrix.get(seqA[i-1], seqB[j-1]) match = S[i-1, j-1] + substMatrix.__getitem__(seqA[i - 1], seqB[j - 1]) fromTop = S[i-1, j ] + gap fromLeft = S[i , j-1] + gap S[i, j] = max([match, fromTop, fromLeft]) ... ... @@ -908,7 +909,7 @@ def alignLocal(seqA, seqB, substMatrix, gap = -1): # that ends at sequence indices i and j, for A and B, resp.) for i in range(1, lenA + 1): for j in range(1, lenB + 1): match = S[i-1, j-1] + substMatrix.get(seqA[i-1], seqB[j-1]) match = S[i-1, j-1] + substMatrix.__getitem__(seqA[i - 1], seqB[j - 1]) fromTop = S[i-1, j ] + gap fromLeft = S[i , j-1] + gap S[i, j] = max([match, fromTop, fromLeft, 0]) # Local: add option that we re-start alignment from "0" ... ... @@ -967,12 +968,12 @@ def tripletAlignGlobal(seqA, seqB, seqC, subsMatrix, gap = -1): for j in range(1, lenB+1): for k in range(1, lenC+1): # Scored using sum-of-pairs matchABC = S[i-1, j-1, k-1] + subsMatrix.get(seqA[i-1], seqB[j-1]) \ + subsMatrix.get(seqA[i-1], seqC[k-1]) \ + subsMatrix.get(seqB[j-1], seqC[k-1]) matchAB = S[i-1, j-1, k] + 2*gap + subsMatrix.get(seqA[i-1], seqB[j-1]) matchBC = S[i, j-1, k-1] + 2*gap + subsMatrix.get(seqB[j-1], seqC[k-1]) matchAC = S[i-1, j, k-1] + 2*gap + subsMatrix.get(seqA[i-1], seqC[k-1]) matchABC = S[i-1, j-1, k-1] + subsMatrix.__getitem__(seqA[i - 1], seqB[j - 1]) \ + subsMatrix.__getitem__(seqA[i - 1], seqC[k - 1]) \ + subsMatrix.__getitem__(seqB[j - 1], seqC[k - 1]) matchAB = S[i-1, j-1, k] + 2*gap + subsMatrix.__getitem__(seqA[i - 1], seqB[j - 1]) matchBC = S[i, j-1, k-1] + 2*gap + subsMatrix.__getitem__(seqB[j - 1], seqC[k - 1]) matchAC = S[i-1, j, k-1] + 2*gap + subsMatrix.__getitem__(seqA[i - 1], seqC[k - 1]) gapAB = S[i, j, k-1] + 3*gap gapBC = S[i-1, j, k] + 3*gap gapAC = S[i, j-1, k] + 3*gap ... ...
 ... ... @@ -26,8 +26,6 @@ cf_dict = { # Chou-Fasman table 'T': ( 83, 119, 96, 0.086, 0.108, 0.065, 0.079 ), # Threonine 'W': ( 108, 137, 96, 0.077, 0.013, 0.064, 0.167 ), # Tryptophan 'Y': ( 69, 147, 114, 0.082, 0.065, 0.114, 0.125 ), # Tyrosine 'V': ( 106, 170, 50, 0.062, 0.048, 0.028, 0.053 ), # Valine 'Y': ( 69, 147, 114, 0.082, 0.065, 0.114, 0.125 ), # Tyrosine 'V': ( 106, 170, 50, 0.062, 0.048, 0.028, 0.053 ),} # Valine prot_alpha = sym.Protein_Alphabet ... ...
test_hca.py 0 → 100644
 import unittest from hca import * import random class MyTestCase(unittest.TestCase): N = 8 def setUp(self): """ Set up for each test """ self.pairidxs1 = dict() y = 0 for i in range(self.N): for j in range(i + 1, self.N): self.pairidxs1[(i, j)] = y y += 1 self.pairidxs2 = dict() for i in range(self.N): for j in range(0, i): self.pairidxs2[(i, j)] = self.pairidxs1[(j, i)] def test_PairArray1(self): pa1 = PairArray(self.N) pa2 = PairArray(self.N) for p in self.pairidxs1: pa1[p] = self.pairidxs1[p] for p in self.pairidxs2: pa2[p] = self.pairidxs2[p] for (i, j) in self.pairidxs1: self.assertEqual(pa1[(j, i)], self.pairidxs1[(i, j)]) for (i, j) in self.pairidxs2: self.assertEqual(pa2[(j, i)], pa1[(j, i)]) def test_DNode1(self): layer0 = [DNode(i) for i in range(0, 10)] layer1 = [] for i in range(0, len(layer0) // 2): layer1.append(DNode(i + len(layer0), children=[layer0[i * 2], layer0[i * 2 + 1]], dist = random.randint(1, 10))) root = DNode(len(layer0) + len(layer1), layer1, dist = 100) self.assertEquals(root.nChildren(), len(layer1)) self.assertEquals(len(root.getLeaves()), len(layer0)) for i in range(len(layer1)): self.assertEquals(layer1[i].nChildren(), 2) for i in range(len(layer0)): self.assertEquals(layer0[i].nChildren(), 0) def test_DNode2(self): layer0 = [DNode(i) for i in range(0, 10)] layer1 = [] for i in range(0, len(layer0) // 2): layer1.append(DNode(i + len(layer0), children=[layer0[i * 2], layer0[i * 2 + 1]], dist = random.randint(1, 10))) root1 = DNode(len(layer0) + len(layer1), layer1, dist = 100) s1 = str(root1) root2 = parse(s1) self.assertEquals(root2.nChildren(), root1.nChildren()) self.assertEquals(len(root2.getLeaves()), len(root1.getLeaves())) s2 = str(root2) root3 = parse(s2) self.assertEquals(str(root3), s2) def test_DNode3(self): layer0 = [DNode(i) for i in range(0, 8)] layer1 = [] for i in range(0, len(layer0) // 2): layer1.append(DNode(i + len(layer0), children=[layer0[i * 2], layer0[i * 2 + 1]], dist = random.randint(1, 10))) layer2 = [] for i in range(0, len(layer1) // 2): layer2.append(DNode(i + len(layer0) + len(layer1), children=[layer1[i * 2], layer1[i * 2 + 1]], dist = random.randint(11, 20))) root = DNode(len(layer0) + len(layer1) + len(layer2), layer2, dist = 30) chars = 'ABCDEFGHIJKLMNOP' labels_list = [ch for ch in chars] root1 = parse(root.newick(labels_list)) labels_rev = [ch for ch in chars[::-1]] labels_off1 = [ch for ch in chars[1:]] labels_dict = {} for i in range(len(labels_list)): labels_dict[i] = labels_list[i] root2 = parse(root.newick(labels_dict)) self.assertEquals(len(parse(root.newick(labels_rev)).getLeaves()), len(root.getLeaves())) self.assertEquals(root.newick(labels_dict), root.newick(labels_list)) for ch in chars[:-1]: # all chars except last one node1 = root1.findNode(ch) node2 = root2.findNode(ch) self.assertIsNotNone(node1) self.assertIsNotNone(node2) self.assertEquals(len(node1.getLeaves()), len(node2.getLeaves())) self.assertEquals(str(root1.findNode(ch)), str(root2.findNode(ch))) def test_DNode4(self): pass if __name__ == '__main__': unittest.main()
test_heap.py 0 → 100644
 import unittest from heap import * import random class MyTestCase(unittest.TestCase): def setUp(self): """ Set up for each test """ idxs = [i for i in range(random.randint(0, 10), random.randint(10, 50))] random.shuffle(idxs) self.a = [(idx, random.random()) for idx in idxs] self.mh = LabelHeap(len(self.a)) self.maxh = LabelHeap(len(self.a), reverse = True) for (address, value) in self.a: self.mh.add(address, value) self.maxh.add(address, value) def test_MinHeap1(self): self.assertEqual(len(self.mh), len(self.a)) def test_MinHeap2(self): minidx = 0 for i in range(1, len(self.a)): if self.a[i] < self.a[minidx]: minidx = i #print(self.mh._elements, self.mh) (address, value) = self.mh.pop() self.assertEqual(address, self.a[minidx]) self.assertEqual(value, self.a[minidx]) def test_MinHeap3(self): ys = [y for y in self.a] ys.sort(reverse=False) for y in ys: self.assertEqual(y, self.mh) self.mh.pop() def test_MaxHeap3(self): ys = [y for y in self.a] ys.sort(reverse=True) for y in ys: self.assertEqual(y, self.maxh) self.maxh.pop() def test_MinHeap4(self): mh1 = LabelHeap(10) self.assertEquals(len(mh1), 0) mh1.add('a', 2) self.assertEquals(len(mh1), 1) mh1.add('b', 1) self.assertEquals(len(mh1), 2) (label, y) = mh1.pop() self.assertEquals(label, 'b') self.assertEquals(len(mh1), 1) mh1.add('c', 3) self.assertEquals(len(mh1), 2) if __name__ == '__main__': unittest.main()
 ... ... @@ -176,7 +176,7 @@ def getGODef(goterm): goterm: the identifier, e.g. 'GO:0002080' """ # first turn off server certificate verification if (not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)): if (not os.environ.__getitem__('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)): ssl._create_default_https_context = ssl._create_unverified_context # Construct URL with query term url = __ebiGOUrl__ + 'ontology/go/search?query=' + goterm ... ... @@ -225,7 +225,7 @@ def getGOTerms(genes): # Construct URL # Get the entry: fill in the fields specified below # first turn off server certificate verification if (not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)): if (not os.environ.__getitem__('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)): ssl._create_default_https_context = ssl._create_unverified_context page = 1 try: ... ... @@ -234,7 +234,7 @@ def getGOTerms(genes): urlreq = urllib.request.Request(url) urlreq.add_header('Accept-encoding', 'gzip') response = urllib.request.urlopen(urlreq) if response.info().get('Content-Encoding') == 'gzip': if response.info().__getitem__('Content-Encoding') == 'gzip': buf = StringIO(response.read()) f = gzip.GzipFile(fileobj=buf) data = f.read().decode("utf-8") ... ... @@ -285,7 +285,7 @@ def getGenes(goterms, taxo=None): term = termbatch[i] uri_string += term + "," if i < len(termbatch) - 1 else term # first turn off server certificate verification if (not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)): if (not os.environ.__getitem__('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None)): ssl._create_default_https_context = ssl._create_unverified_context page = 1 try: ... ... @@ -294,7 +294,7 @@ def getGenes(goterms, taxo=None): urlreq = urllib.request.Request(url) urlreq.add_header('Accept-encoding', 'gzip') response = urllib.request.urlopen(urlreq) if response.info().get('Content-Encoding') == 'gzip': if response.info().__getitem__('Content-Encoding') == 'gzip': buf = StringIO(response.read()) f = gzip.GzipFile(fileobj=buf) data = f.read().decode("utf-8") ... ... @@ -534,7 +534,7 @@ def getUniProtDict(ids, cols="", db='uniprot', identities=None): request = urllib.request.Request(url, data) opener = urllib.request.build_opener() response = opener.open(request) page = response.read(200000).decode('utf-8') page = response.read(20000000).decode('utf-8') up_dict = {} # For each record we retrieve, split the line by tabs and build up the UniProt dict ... ...
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!