"""
Motif discovery using Gibb's sampling
@author: mikael
"""
import math
import random
import sym
import prob
import sequence
class GibbsMotif():
"""
A class for discovering linear motifs in sequence data.
Uses Gibb's sampling (Lawrence et al., Science 262:208-214 1993).
Also see http://bayesweb.wadsworth.org/gibbs/content.html which has info
on "site sampling", "motif sampling", "recursive sampling" and "centroid
sampling". The first is implemented (roughly) below.
"""
def __init__(self, seqs, length, alignment = None):
""" Construct a "discovery" session by providing the sequences that will be used.
seqs: sequences in which the motif is sought
length: length of sought pattern (W)
alignment: positions in each sequence for the initial alignment (use only if the alignment
has been determined from a previous run).
"""
self.seqs = seqs
self.length = length # length of motif 1..W
seqs = self.seqs
self.alphabet = None
k = 0
for s in seqs:
if self.alphabet != None and self.alphabet != s.alphabet:
raise RuntimeError("Sequences invalid: different alphabets")
self.alphabet = s.alphabet
if alignment:
if alignment[k] < 0 or alignment[k] >= len(s):
raise RuntimeError("Initial alignment invalid: does not match sequence " + s.name)
k += 1
""" Initialise parameters that are part of the setup (below) """
self.alignment = alignment or [ random.randint(0, len(s) - length) for s in seqs ] # starting positions defining alignment
def discover(self, pseudocount = None, niter = None):
""" Find the most probable common pattern represented by a
position weight matrix (PWM), based on W+1 distributions
pseudocount: the distribution used for pseudo-counts (default is uniform)
niter: number of iterations (if None, 100*N is used; where N is number of seqs).
"""
""" Initialise parameters necessary for the discovery run (below) """
N = len(self.seqs) # number of sequences 1..N
seqs = self.seqs
W = self.length # motif width
""" background that will be used as pseudo-counts """
pseudocount = pseudocount or prob.Distrib(self.alphabet, 1.0)
""" q: the foreground distribution (specifying the W distributions in aligned columns)
p: the background distribution (for non-aligned positions in all sequences) """
q = [ prob.Distrib(self.alphabet, pseudocount) for _ in range(W) ]
p = prob.Distrib(self.alphabet, pseudocount)
a = self.alignment
new_z = random.randint(0, N-1) # pick a random sequence to withhold
for k in range(N):
if k != new_z:
k_len = len(seqs[k]) # length of current seq
offset = 0
for i in range(k_len):
if i >= a[k] and i < a[k] + W: # within pattern
q[offset].observe(seqs[k][i])
offset += 1
else: # outside pattern
p.observe(seqs[k][i])
""" Main loop: predictive update step THEN sampling step, repeat... """
niter = niter or 100 * N # use specified number of iterations or default
for round in range(niter):
""" Predictive update step:
One of the N sequences are chosen at random: z.
We will not use it in the profile, nor background so we
exclude it from our counts. """
prev_z = new_z
new_z = random.randint(0, N - 1)
# q's and p's are updated from current a's and all sequences except z,
# which is the same as use old q's and p's and subtract z's contribs...
offset = 0
for i in range(len(seqs[new_z])):
if i >= a[new_z] and i < a[new_z] + W: # within pattern
q[offset].observe(seqs[new_z][i], -1) # subtract the count
offset += 1
else: # outside pattern
p.observe(seqs[new_z][i], -1) # subtract the count
# ... and add back the previous and now updated z
offset = 0
for i in range(len(seqs[prev_z])):
if i >= a[prev_z] and i < a[prev_z] + W: # within pattern
q[offset].observe(seqs[prev_z][i], +1) # add the count
offset += 1
else: # outside pattern
p.observe(seqs[prev_z][i], +1) # add the count
""" Sampling step:
Consider each position x in z as a match: find a weight Ax """
z_len = len(seqs[new_z]) # length of seq z
A = [ 0.0 for _ in range(z_len) ]
Asum = 0.0
for x in range(z_len - W + 1): # look at all starts for a W-wide pattern
Px = 1.0; Qx = 1.0
for w in range(W):
Px *= p[seqs[new_z][x+w]]
Qx *= q[w][seqs[new_z][x+w]]
try:
A[x] = Qx / Px
except ZeroDivisionError:
pass
Asum += A[x]
for x in range(z_len - W + 1): # score all starts for a W-wide pattern
A[x] /= Asum # normalise so that all Ax's sum to 1.0
# Pick the next a[z], with a probability proportional to Ax
pick = random.random() # any value between 0 and 1
cumul = 0.0 # cumulative probability
for x in range(z_len - W + 1): # check starts for a W-wide pattern
cumul += A[x]
if pick <= cumul: # check if our random pick is smaller than the cumulative prob
a[new_z] = x
break
""" Evaluate data log-likelihood """
if round % 100 == 0: # but only every 100th round
LL = 0.0
for k in range(N):
Pk = 1.0; Qk = 1.0
for w in range(W):
Pk *= p[seqs[k][a[k]+w]]
Qk *= q[w][seqs[k][a[k]+w]]
try:
LL += math.log(Qk / Pk)
except ZeroDivisionError:
pass
print("LL @ %5d=\t%5.2f" % (round, LL))
# end main for-loop
self.q = q
self.p = p
self.alignment = a
return q
def getForeground(self):
""" Return the probability distributions for columns in the discovered alignment. """
return self.q
def getBackground(self):
""" Return the probability distributions for the background used in the discovery. """
return self.p
def getAlignment(seqs, motif, background):
""" Retrieve the best alignment (positions) in provided sequences defined by the specified
motif params.
seqs: sequence data
motif: the foreground distribution (specifying the W distributions in aligned columns)
background: the background distribution (for non-aligned positions in all sequences)
Note that this is similar but not the same as the stochastically selected alignment that
is kept while training. It can be implemented using a PWM constructed from a previous session.
Note also that this alignment can be used as input to continue an earlier discovery session
when motif distributions had been saved. """
N = len(seqs)
q = motif
p = background
W = len(q)
a = [0 for _ in range(N)] # start positions unknown
for k in range(N):
k_len = len(seqs[k]) # length of seq k
Amax = None
xmax = 0
for x in range(k_len - W + 1):
Px = 1.0; Qx = 1.0
for w in range(W):
Px *= p[seqs[k][x+w]]
Qx *= q[w][seqs[k][x+w]]
try:
Atmp = math.log(Qx / Px)
except ZeroDivisionError:
pass
if Amax == None or Amax < Atmp:
Amax = Atmp
xmax = x
a[k] = xmax
return a
class GibbsAlign():
""" A class for performing ungapped sequence alignment.
Uses Gibb's sampling (Lawrence et al., Science 262:208-214 1993).
"""
def __init__(self, seqs, length, alignment = None):
""" Construct a "discover" session by providing the sequences that will be aligned.
seqs: sequences that will be aligned
length: maximum length of alignment (must be equal or greater than max sequence length)
alignment: positions in each sequence for the initial alignment (use only if the alignment
has been determined from a previous run).
"""
self.seqs = seqs
self.length = length # length of motif 1..W
seqs = self.seqs
self.alphabet = None
k = 0
for s in seqs:
if self.alphabet != None and self.alphabet != s.alphabet:
raise RuntimeError("Sequences invalid: different alphabets")
self.alphabet = s.alphabet
if alignment:
if alignment[k] < 0 or alignment[k] >= len(s):
raise RuntimeError("Initial alignment invalid: does not match sequence " + s.name)
k += 1
""" Initialise parameters that are part of the setup (below) """
self.alignment = alignment or [ random.randint(0, length - len(s)) for s in seqs ] # starting offsets defining alignment
def discover(self, pseudocount = None, niter = None):
""" Find the most probable common pattern represented by a
position weight matrix (PWM), based on W+1 distributions
pseudocount: the distribution used for pseudo-counts (default is uniform)
niter: number of iterations (if None, 100*N is used; where N is number of seqs).
"""
""" Initialise parameters necessary for the discovery run (below) """
N = len(self.seqs) # number of sequences 1..N
seqs = self.seqs
W = self.length # alignment width
""" background that will be used as pseudo-counts """
pseudocount = pseudocount or prob.Distrib(self.alphabet, 1.0)
""" q: the foreground distribution (specifying the W distributions in aligned columns)
p: the background distribution (for non-aligned positions in all sequences) """
q = [ prob.Distrib(self.alphabet, pseudocount) for _ in range(W) ]
p = prob.Distrib(self.alphabet, pseudocount)
a = self.alignment
new_z = random.randint(0, N-1) # pick a random sequence to withhold
for k in range(N):
if k != new_z:
k_len = len(seqs[k]) # length of current seq
offset = 0
for i in range(k_len):
if i >= a[k] and i < a[k] + W: # within pattern
q[offset].observe(seqs[k][i])
offset += 1
else: # outside pattern
p.observe(seqs[k][i])
""" Main loop: predictive update step THEN sampling step, repeat... """
niter = niter or 100 * N # use specified number of iterations or default
for round in range(niter):
""" Predictive update step:
One of the N sequences are chosen at random: z.
We will not use it in the profile, nor background so we
exclude it from our counts. """
prev_z = new_z
new_z = random.randint(0, N - 1)
# q's and p's are updated from current a's and all sequences except z,
# which is the same as use old q's and p's and subtract z's contribs...
offset = 0
for i in range(len(seqs[new_z])):
if i >= a[new_z] and i < a[new_z] + W: # within pattern
q[offset].observe(seqs[new_z][i], -1) # subtract the count
offset += 1
else: # outside pattern
p.observe(seqs[new_z][i], -1) # subtract the count
# ... and add back the previous and now updated z
offset = 0
for i in range(len(seqs[prev_z])):
if i >= a[prev_z] and i < a[prev_z] + W: # within pattern
q[offset].observe(seqs[prev_z][i], +1) # add the count
offset += 1
else: # outside pattern
p.observe(seqs[prev_z][i], +1) # add the count
""" Sampling step:
Consider each position x in z as a match: find a weight Ax """
z_len = len(seqs[new_z]) # length of seq z
A = [ 0.0 for _ in range(z_len) ]
Asum = 0.0
for x in range(z_len - W + 1): # look at all starts for a W-wide pattern
Px = 1.0; Qx = 1.0
for w in range(W):
Px *= p[seqs[new_z][x+w]]
Qx *= q[w][seqs[new_z][x+w]]
try:
A[x] = Qx / Px
except ZeroDivisionError:
pass
Asum += A[x]
for x in range(z_len - W + 1): # score all starts for a W-wide pattern
A[x] /= Asum # normalise so that all Ax's sum to 1.0
# Pick the next a[z], with a probability proportional to Ax
pick = random.random() # any value between 0 and 1
cumul = 0.0 # cumulative probability
for x in range(z_len - W + 1): # check starts for a W-wide pattern
cumul += A[x]
if pick <= cumul: # check if our random pick is smaller than the cumulative prob
a[new_z] = x
break
""" Evaluate data log-likelihood """
if round % 100 == 0: # but only every 100th round
LL = 0.0
for k in range(N):
Pk = 1.0; Qk = 1.0
for w in range(W):
Pk *= p[seqs[k][a[k]+w]]
Qk *= q[w][seqs[k][a[k]+w]]
try:
LL += math.log(Qk / Pk)
except ZeroDivisionError:
pass
print("LL @ %5d=\t%5.2f" % (round, LL))
# end main for-loop
self.q = q
self.p = p
self.alignment = a
return q
def getForeground(self):
""" Return the probability distributions for columns in the discovered alignment. """
return self.q
def getBackground(self):
""" Return the probability distributions for the background used in the discovery. """
return self.p