gibbs.py 14.7 KB
Newer Older
Mikael Boden's avatar
Mikael Boden committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
"""
Motif discovery using Gibb's sampling
@author: mikael
"""

import math
import random

import sym
import prob
import sequence

class GibbsMotif():
    """
    A class for discovering linear motifs in sequence data.
    Uses Gibb's sampling (Lawrence et al., Science 262:208-214 1993).

    Also see http://bayesweb.wadsworth.org/gibbs/content.html which has info
    on "site sampling", "motif sampling", "recursive sampling" and "centroid
    sampling". The first is implemented (roughly) below.
    """
    def __init__(self, seqs, length, alignment = None):
        """ Construct a "discovery" session by providing the sequences that will be used.
            seqs: sequences in which the motif is sought
            length: length of sought pattern (W)
            alignment: positions in each sequence for the initial alignment (use only if the alignment
            has been determined from a previous run).
            """
        self.seqs = seqs
        self.length = length # length of motif 1..W
        seqs = self.seqs
        self.alphabet = None
        k = 0
        for s in seqs:
            if self.alphabet != None and self.alphabet != s.alphabet:
                raise RuntimeError("Sequences invalid: different alphabets")
            self.alphabet = s.alphabet
            if alignment:
                if alignment[k] < 0 or alignment[k] >= len(s):
                    raise RuntimeError("Initial alignment invalid: does not match sequence " + s.name)
            k += 1
        """ Initialise parameters that are part of the setup (below) """
        self.alignment = alignment or [ random.randint(0, len(s) - length) for s in seqs ] # starting positions defining alignment

    def discover(self, pseudocount = None, niter = None):
        """ Find the most probable common pattern represented by a
            position weight matrix (PWM), based on W+1 distributions
            pseudocount: the distribution used for pseudo-counts (default is uniform)
            niter: number of iterations (if None, 100*N is used; where N is number of seqs).
        """
        """ Initialise parameters necessary for the discovery run (below) """
        N = len(self.seqs) # number of sequences 1..N
        seqs = self.seqs
        W = self.length    # motif width
        """ background that will be used as pseudo-counts """
        pseudocount = pseudocount or prob.Distrib(self.alphabet, 1.0)
        """ q: the foreground distribution (specifying the W distributions in aligned columns)
            p: the background distribution (for non-aligned positions in all sequences) """
        q = [ prob.Distrib(self.alphabet, pseudocount) for _ in range(W) ]
        p = prob.Distrib(self.alphabet, pseudocount)
        a = self.alignment

        new_z = random.randint(0, N-1) # pick a random sequence to withhold
        for k in range(N):
            if k != new_z:
                k_len = len(seqs[k]) # length of current seq
                offset = 0
                for i in range(k_len):
                    if i >= a[k] and i < a[k] + W: # within pattern
                        q[offset].observe(seqs[k][i])
                        offset += 1
                    else: # outside pattern
                        p.observe(seqs[k][i])

        """ Main loop: predictive update step THEN sampling step, repeat... """
        niter = niter or 100 * N # use specified number of iterations or default
        for round in range(niter):

            """ Predictive update step:
                One of the N sequences are chosen at random: z.
                We will not use it in the profile, nor background so we
                exclude it from our counts. """
            prev_z = new_z
            new_z = random.randint(0, N - 1)
            # q's and p's are updated from current a's and all sequences except z,
            # which is the same as use old q's and p's and subtract z's contribs...
            offset = 0
            for i in range(len(seqs[new_z])):
                if i >= a[new_z] and i < a[new_z] + W: # within pattern
                    q[offset].observe(seqs[new_z][i], -1) # subtract the count
                    offset += 1
                else: # outside pattern
                    p.observe(seqs[new_z][i], -1) # subtract the count
            # ... and add back the previous and now updated z
            offset = 0
            for i in range(len(seqs[prev_z])):
                if i >= a[prev_z] and i < a[prev_z] + W: # within pattern
                    q[offset].observe(seqs[prev_z][i], +1) # add the count
                    offset += 1
                else: # outside pattern
                    p.observe(seqs[prev_z][i], +1) # add the count

            """ Sampling step:
                Consider each position x in z as a match: find a weight Ax """
            z_len = len(seqs[new_z]) # length of seq z
            A = [ 0.0 for _ in range(z_len) ]
            Asum = 0.0
            for x in range(z_len - W + 1): # look at all starts for a W-wide pattern
                Px = 1.0; Qx = 1.0
                for w in range(W):
                    Px *= p[seqs[new_z][x+w]]
                    Qx *= q[w][seqs[new_z][x+w]]
                try:
                    A[x] = Qx / Px
                except ZeroDivisionError:
                    pass
                Asum += A[x]
            for x in range(z_len - W + 1): # score all starts for a W-wide pattern
                A[x] /= Asum               # normalise so that all Ax's sum to 1.0
            # Pick the next a[z], with a probability proportional to Ax
            pick = random.random()         # any value between 0 and 1
            cumul = 0.0                    # cumulative probability
            for x in range(z_len - W + 1): # check starts for a W-wide pattern
                cumul += A[x]
                if pick <= cumul:          # check if our random pick is smaller than the cumulative prob
                    a[new_z] = x
                    break

            """ Evaluate data log-likelihood """
            if round % 100 == 0: # but only every 100th round
                LL = 0.0
                for k in range(N):
                    Pk = 1.0; Qk = 1.0
                    for w in range(W):
                        Pk *= p[seqs[k][a[k]+w]]
                        Qk *= q[w][seqs[k][a[k]+w]]
                    try:
                        LL += math.log(Qk / Pk)
                    except ZeroDivisionError:
                        pass
Mikael Boden's avatar
Mikael Boden committed
141
                print("LL @ %5d=\t%5.2f" % (round, LL))
Mikael Boden's avatar
Mikael Boden committed
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314

        # end main for-loop
        self.q = q
        self.p = p
        self.alignment = a
        return q

    def getForeground(self):
        """ Return the probability distributions for columns in the discovered alignment. """
        return self.q

    def getBackground(self):
        """ Return the probability distributions for the background used in the discovery. """
        return self.p

def getAlignment(seqs, motif, background):
    """ Retrieve the best alignment (positions) in provided sequences defined by the specified
        motif params.
        seqs: sequence data
        motif: the foreground distribution (specifying the W distributions in aligned columns)
        background: the background distribution (for non-aligned positions in all sequences)
        Note that this is similar but not the same as the stochastically selected alignment that
        is kept while training. It can be implemented using a PWM constructed from a previous session.
        Note also that this alignment can be used as input to continue an earlier discovery session
        when motif distributions had been saved.  """
    N = len(seqs)
    q = motif
    p = background
    W = len(q)
    a = [0 for _ in range(N)] # start positions unknown
    for k in range(N):
        k_len = len(seqs[k]) # length of seq k
        Amax = None
        xmax = 0
        for x in range(k_len - W + 1):
            Px = 1.0; Qx = 1.0
            for w in range(W):
                Px *= p[seqs[k][x+w]]
                Qx *= q[w][seqs[k][x+w]]
            try:
                Atmp = math.log(Qx / Px)
            except ZeroDivisionError:
                pass
            if Amax == None or Amax < Atmp:
                Amax = Atmp
                xmax = x
        a[k] = xmax
    return a

class GibbsAlign():
    """ A class for performing ungapped sequence alignment.
        Uses Gibb's sampling (Lawrence et al., Science 262:208-214 1993).
    """

    def __init__(self, seqs, length, alignment = None):
        """ Construct a "discover" session by providing the sequences that will be aligned.
            seqs: sequences that will be aligned
            length: maximum length of alignment (must be equal or greater than max sequence length)
            alignment: positions in each sequence for the initial alignment (use only if the alignment
            has been determined from a previous run).
            """
        self.seqs = seqs
        self.length = length # length of motif 1..W
        seqs = self.seqs
        self.alphabet = None
        k = 0
        for s in seqs:
            if self.alphabet != None and self.alphabet != s.alphabet:
                raise RuntimeError("Sequences invalid: different alphabets")
            self.alphabet = s.alphabet
            if alignment:
                if alignment[k] < 0 or alignment[k] >= len(s):
                    raise RuntimeError("Initial alignment invalid: does not match sequence " + s.name)
            k += 1
        """ Initialise parameters that are part of the setup (below) """
        self.alignment = alignment or [ random.randint(0, length - len(s)) for s in seqs ] # starting offsets defining alignment

    def discover(self, pseudocount = None, niter = None):
        """ Find the most probable common pattern represented by a
            position weight matrix (PWM), based on W+1 distributions
            pseudocount: the distribution used for pseudo-counts (default is uniform)
            niter: number of iterations (if None, 100*N is used; where N is number of seqs).
        """
        """ Initialise parameters necessary for the discovery run (below) """
        N = len(self.seqs) # number of sequences 1..N
        seqs = self.seqs
        W = self.length    # alignment width
        """ background that will be used as pseudo-counts """
        pseudocount = pseudocount or prob.Distrib(self.alphabet, 1.0)
        """ q: the foreground distribution (specifying the W distributions in aligned columns)
            p: the background distribution (for non-aligned positions in all sequences) """
        q = [ prob.Distrib(self.alphabet, pseudocount) for _ in range(W) ]
        p = prob.Distrib(self.alphabet, pseudocount)
        a = self.alignment

        new_z = random.randint(0, N-1) # pick a random sequence to withhold
        for k in range(N):
            if k != new_z:
                k_len = len(seqs[k]) # length of current seq
                offset = 0
                for i in range(k_len):
                    if i >= a[k] and i < a[k] + W: # within pattern
                        q[offset].observe(seqs[k][i])
                        offset += 1
                    else: # outside pattern
                        p.observe(seqs[k][i])

        """ Main loop: predictive update step THEN sampling step, repeat... """
        niter = niter or 100 * N # use specified number of iterations or default
        for round in range(niter):

            """ Predictive update step:
                One of the N sequences are chosen at random: z.
                We will not use it in the profile, nor background so we
                exclude it from our counts. """
            prev_z = new_z
            new_z = random.randint(0, N - 1)
            # q's and p's are updated from current a's and all sequences except z,
            # which is the same as use old q's and p's and subtract z's contribs...
            offset = 0
            for i in range(len(seqs[new_z])):
                if i >= a[new_z] and i < a[new_z] + W: # within pattern
                    q[offset].observe(seqs[new_z][i], -1) # subtract the count
                    offset += 1
                else: # outside pattern
                    p.observe(seqs[new_z][i], -1) # subtract the count
            # ... and add back the previous and now updated z
            offset = 0
            for i in range(len(seqs[prev_z])):
                if i >= a[prev_z] and i < a[prev_z] + W: # within pattern
                    q[offset].observe(seqs[prev_z][i], +1) # add the count
                    offset += 1
                else: # outside pattern
                    p.observe(seqs[prev_z][i], +1) # add the count

            """ Sampling step:
                Consider each position x in z as a match: find a weight Ax """
            z_len = len(seqs[new_z]) # length of seq z
            A = [ 0.0 for _ in range(z_len) ]
            Asum = 0.0
            for x in range(z_len - W + 1): # look at all starts for a W-wide pattern
                Px = 1.0; Qx = 1.0
                for w in range(W):
                    Px *= p[seqs[new_z][x+w]]
                    Qx *= q[w][seqs[new_z][x+w]]
                try:
                    A[x] = Qx / Px
                except ZeroDivisionError:
                    pass
                Asum += A[x]
            for x in range(z_len - W + 1): # score all starts for a W-wide pattern
                A[x] /= Asum               # normalise so that all Ax's sum to 1.0
            # Pick the next a[z], with a probability proportional to Ax
            pick = random.random()         # any value between 0 and 1
            cumul = 0.0                    # cumulative probability
            for x in range(z_len - W + 1): # check starts for a W-wide pattern
                cumul += A[x]
                if pick <= cumul:          # check if our random pick is smaller than the cumulative prob
                    a[new_z] = x
                    break

            """ Evaluate data log-likelihood """
            if round % 100 == 0: # but only every 100th round
                LL = 0.0
                for k in range(N):
                    Pk = 1.0; Qk = 1.0
                    for w in range(W):
                        Pk *= p[seqs[k][a[k]+w]]
                        Qk *= q[w][seqs[k][a[k]+w]]
                    try:
                        LL += math.log(Qk / Pk)
                    except ZeroDivisionError:
                        pass
Mikael Boden's avatar
Mikael Boden committed
315
                print("LL @ %5d=\t%5.2f" % (round, LL))
Mikael Boden's avatar
Mikael Boden committed
316 317 318 319 320 321 322 323 324 325 326 327 328 329

        # end main for-loop
        self.q = q
        self.p = p
        self.alignment = a
        return q

    def getForeground(self):
        """ Return the probability distributions for columns in the discovered alignment. """
        return self.q

    def getBackground(self):
        """ Return the probability distributions for the background used in the discovery. """
        return self.p