Commit 06f131e6 authored by Mikael Boden's avatar Mikael Boden

markov_chain

parent 0f8a8558
......@@ -718,3 +718,54 @@ class NaiveBayes():
prob *= condprob[i][key[i]] or 0.0
out.observe(outsym, prob)
return out
class MarkovChain():
""" Markov Chain in a very simple form
"""
def __init__(self, alpha, order = 1):
self.order = order
self.startsym = '^'
self.endsym = '$'
self.alpha = Alphabet(alpha.symbols + tuple([self.startsym, self.endsym]))
self.transit = TupleStore([self.alpha for _ in range(order)]) # transition probs, i.e. given key (prev state/s) what is the prob of current state
def _terminate(self, unterm_seq):
""" Terminate sequence with start and end symbols """
term_seq = [self.startsym for _ in range(self.order)]
term_seq.extend(unterm_seq)
term_seq.append(self.endsym)
return term_seq
def _getpairs(self, term_seq):
""" Return a tuple of all (tuple) Markov pairs from a sequence """
ret = []
for i in range(len(term_seq) - self.order):
past = tuple(term_seq[i:i + self.order])
present = term_seq[i + self.order]
ret.append(tuple([past, present]))
return tuple(ret)
def observe(self, wholeseq):
""" Set parameters by counting transitions """
myseq = self._terminate(wholeseq)
for (past, present) in self._getpairs(myseq):
d = self.transit[past]
if not d: # no distrib
d = Distrib(self.alpha)
self.transit[past] = d
d.observe(present)
def __getitem__(self, wholeseq):
""" Determine the log probability of a given sequence """
myseq = self._terminate(wholeseq)
logp = 0
for (past, present) in self._getpairs(myseq):
d = self.transit[past]
if not d:
return None
p = d[present]
if p == 0:
return None
logp += math.log(p)
return logp
\ No newline at end of file
......@@ -139,7 +139,10 @@ class Sequence(object):
def writeFasta(self):
""" Write one sequence in FASTA format to a string and return it. """
fasta = '>' + self.name + ' ' + self.info + '\n'
if parseDefline(self.info)[0] == self.name: # this sequence was previously "parsed" and info should hold the original header
fasta = '>' + self.info + '\n'
else:
fasta = '>' + self.name + ' ' + self.info + '\n'
data = ''.join(self.sequence)
nlines = int(math.ceil((len(self.sequence) - 1) / 60 + 1))
for i in range(nlines):
......@@ -290,7 +293,7 @@ def getMarkov(seqs, order = 0):
else:
if seq.alphabet != myalpha:
raise RuntimeError('Sequence ' + seq.name + ' uses an invalid alphabet ')
jp = Joint([myalpha for _ in range(order + 1)])
jp = Joint([myalpha for _ in range(order)])
for seq in myseqs:
for i in range(len(seq) - order):
sub = seq[i:i + order + 1]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment