Commit f5bf266d authored by Mikael Boden's avatar Mikael Boden

replace_seqdata_with_bed_twobit_and_ival

parent a236ba40
import ival
class BedEntry():
def __init__(self, chrom, chromStart, chromEnd):
self.chrom = chrom
self.chromStart = chromStart
self.chromEnd = chromEnd
self.blockCount = None
self.usestrand = False
self.strand = None
self.name = ''
def addOption(self,
name = None,
score = None,
strand = None,
thickStart = None,
thickEnd = None,
itemRgb = None,
blockCount = None,
blockSizes = None,
blockStarts = None,
signalValue = None,
pValue = None,
qValue = None,
peak = None,
tags = None,
summit = None,
fold = None,
fdr = None,
zscore = None,
bg = None):
if name: self.name = name
if score: self.score = score
if strand:
self.strand = strand
self.usestrand = True # use reverse complement when sequence is requested from genome
if thickStart: self.thickStart = thickStart
if thickEnd: self.thickEnd = thickEnd
if itemRgb: self.itemRgb = [int(color) for color in itemRgb.split(',')]
if blockCount:
self.blockCount = max(0, blockCount)
if blockCount > 0:
self.blockSizes = [int(sizeword) for sizeword in blockSizes.rstrip(',').split(',')]
self.blockStarts = [int(startword) for startword in blockStarts.rstrip(',').split(',')]
if len(self.blockSizes) != blockCount or len(self.blockStarts) != blockCount:
raise RuntimeError('Blockcount is incorrect in BED entry \"%s\"' % str(self))
if signalValue: self.signalValue = signalValue
if pValue: self.pValue = pValue
if qValue: self.qValue = qValue
if peak: self.peak = peak
if tags: self.tags = tags
if summit: self.summit = summit
if fold: self.fold = fold
if fdr: self.fdr = fdr
if bg: self.bg = bg
if zscore: self.zscore = zscore
def __str__(self):
return str((self.chrom, self.chromStart, self.chromEnd))
def __getitem__(self, i):
if self.blockCount:
return (self.chrom, self.chromStart + self.blockStarts[i], self.chromStart + self.blockStarts[i] + self.blockSizes[i])
def __iter__(self):
if self.blockCount:
for i in range(self.blockCount):
if self.blockSizes[i] > 0:
yield (self.chrom, self.chromStart + self.blockStarts[i], self.chromStart + self.blockStarts[i] + self.blockSizes[i])
def __len__(self):
return self.blockCount
def loc(self, genome = None, fixedwidth = None, usesummit = False, useshift = None):
""" Retrieve the genomic location for BED entry, or sequence if genome is provided
genome: a dictionary with keys for sequence names, e.g. 'chr1', 'chrX', etc, and values with indexed/sliceable strings
fixedwidth: the width of the location/sequence if the width in the BED entry is ignored, and only its centre is used
usesummit: centre a fixedwidth window around an assigned "summit"
useshift: centre a fixedwidth window around a shifted centre point, e.g. useshift=-125 will shiftcentre point 125bp upstream,
to say capture a fixedwidth=350bp window with 350/2-125=50bp downstream
"""
otherstrand = False
if (self.usestrand):
if (self.strand == '-'):
otherstrand = True
if (otherstrand == False):
end = self.chromEnd
start = self.chromStart
mywidth = fixedwidth or (self.chromEnd - self.chromStart)
mycentre = start + (self.chromEnd - self.chromStart) // 2
if usesummit:
mycentre = self.summit
if useshift:
mycentre = mycentre + useshift
if fixedwidth: # we need to re-calculate start and end
if genome:
end = min(len(genome[self.chrom]), mycentre + (mywidth // 2))
else:
end = mycentre + (mywidth // 2)
start = max(0, mycentre - (mywidth // 2))
else: # other strand
start = self.chromEnd
end = self.chromStart
mywidth = fixedwidth or (self.chromEnd - self.chromStart)
mycentre = self.chromStart + (self.chromEnd - self.chromStart) // 2
if usesummit:
mycentre = self.summit
if useshift:
mycentre = mycentre - useshift # shift is reversed on other strand
if fixedwidth: # we need to re-calculate start and end
end = max(0, mycentre - (mywidth // 2))
if genome:
start = min(len(genome[self.chrom]), mycentre + (mywidth // 2))
else:
start = mycentre + (mywidth // 2)
if genome: # refer to the genome sequence
return genome[self.chrom][start : end]
else:
return (self.chrom, start, end)
def setwidth(self, fixedwidth = None, usesummit = False):
if fixedwidth:
if usesummit:
diff = self.summit - fixedwidth // 2
else:
diff = (self.chromEnd - self.chromStart) // 2 - fixedwidth // 2
self.chromStart += diff
self.chromStart += diff + fixedwidth
return (self.chrom, self.chromStart, self.chromEnd)
def getInterval(self):
return ival.Interval(self.chromStart, self.chromEnd)
def dist(entry1, entry2):
if isinstance(entry1, BedEntry) and isinstance(entry2, BedEntry):
if (entry1.chrom == entry2.chrom):
return ival.dist(entry1.getInterval(), entry2.getInterval())
return None
class BedFile:
""" Read BED file.
See http://genome.ucsc.edu/FAQ/FAQformat#format1
The first three required BED fields are (part of all supported sub-formats):
chrom - The name of the chromosome (e.g. chr3, chrY, chr2_random) or scaffold (e.g. scaffold10671).
chromStart - The starting position of the feature in the chromosome or scaffold. The first base in a chromosome is numbered 0.
chromEnd - The ending position of the feature in the chromosome or scaffold. The chromEnd base is not included in the display of the feature. For example, the first 100 bases of a chromosome are defined as chromStart=0, chromEnd=100, and span the bases numbered 0-99.
The 9 additional optional BED fields are (part of sub-format "Optional"):
name - Defines the name of the BED line. This label is displayed to the left of the BED line in the Genome Browser window when the track is open to full display mode or directly to the left of the item in pack mode.
score - A score between 0 and 1000. If the track line useScore attribute is set to 1 for this annotation data set, the score value will determine the level of gray in which this feature is displayed (higher numbers = darker gray). This table shows the Genome Browser's translation of BED score values into shades of gray:
shade
strand - Defines the strand - either '+' or '-'.
thickStart - The starting position at which the feature is drawn thickly (for example, the start codon in gene displays).
thickEnd - The ending position at which the feature is drawn thickly (for example, the stop codon in gene displays).
itemRgb - An RGB value of the form R,G,B (e.g. 255,0,0). If the track line itemRgb attribute is set to "On", this RBG value will determine the display color of the data contained in this BED line. NOTE: It is recommended that a simple color scheme (eight colors or less) be used with this attribute to avoid overwhelming the color resources of the Genome Browser and your Internet browser.
blockCount - The number of blocks (exons) in the BED line.
blockSizes - A comma-separated list of the block sizes. The number of items in this list should correspond to blockCount.
blockStarts - A comma-separated list of block starts. All of the blockStart positions should be calculated relative to chromStart. The number of items in this list should correspond to blockCount.
ENCODE also defines broadpeaks and narrowpeaks format (part of our "Peaks" sub-format):
name - Defines the name of the BED line. This label is displayed to the left of the BED line in the Genome Browser window when the track is open to full display mode or directly to the left of the item in pack mode.
score - Indicates how dark the peak will be displayed in the browser (0-1000). If all scores were '0' when the data were submitted to the DCC, the DCC assigned scores 1-1000 based on signal value. Ideally the average signalValue per base spread is between 100-1000.
strand - +/- to denote strand or orientation (whenever applicable). Use '.' if no orientation is assigned.
signalValue - Measurement of overall (usually, average) enrichment for the region.
pValue - Measurement of statistical significance (-log10). Use -1 if no pValue is assigned.
qValue - Measurement of statistical significance using false discovery rate (-log10). Use -1 if no qValue is assigned.
peak - Point-source called for this peak; 0-based offset from chromStart. Use -1 if no point-source called.
MACS also defines a "summit" peaks format (part of our "Summit" sub-format)
It contains the peak summits locations for every peaks. The 5th column in this file is the .
In addition to the required three, the following fields follow:
length [redundant, ignored]
summit summit height of fragment pileup
tags
pValue [-10*log10(pvalue)]
fold [enrichment]
FDR [%; optional]
"CCAT" BED-like file format:
chromosome,
peakcenter [converted to summit],
regionstart,
regionend,
tags [tagcount],
bg [bgcount],
zscore,
fdr
"""
def __init__(self, entries, format = 'Limited'):
"""
Create a BedFile instance.
:param entries: an iterable of entries or a filename
:param format: the format of the BED file
"""
self.format = format
if isinstance(entries, str): # filename
self.chroms = readBedFile(entries, format)
else:
self.chroms = dict()
for entry in entries:
# check if the chromosome has been seen before
tree = self.chroms.get(entry.chrom)
if not tree:
tree = ival.IntervalTree()
self.chroms[entry.chrom] = tree
# put the entry in the interval tree for the appropriate chromosome
iv = ival.Interval(entry.chromStart, entry.chromEnd)
tree.put(iv, entry)
def __len__(self):
n = 0
for c in self.chroms:
n += len(self.chroms[c])
return n
def generate(self, chrom):
mytree = self.chroms.get(chrom)
if mytree != None:
for e in mytree:
for entry in e.values:
yield entry
def __iter__(self):
self.chromqueue = ival.Stack()
for c in sorted(self.chroms.keys())[::-1]:
self.chromqueue.push(self.generate(c))
self.current = self.chromqueue.pop()
return self
def __next__(self):
try:
ret = next(self.current)
except StopIteration:
if not self.chromqueue.isEmpty():
self.current = self.chromqueue.pop()
ret = next(self.current)
else:
raise StopIteration
return ret
def __contains__(self, item):
if isinstance(item, BedEntry):
tree = self.chroms.get(item.chrom)
if tree == None: return False
else: return ival.Interval(item.chromStart, item.chromEnd) in tree
else:
return False
def getOverlap(self, item):
if isinstance(item, BedEntry):
tree = self.chroms.get(item.chrom)
if tree == None: return None
else:
iv = ival.Interval(item.chromStart, item.chromEnd)
res = tree.isectall(iv)
ret = []
for r in res:
ret.extend(r.values)
return ret
else: return None
def getClosest(self, item):
if isinstance(item, BedEntry):
tree = self.chroms.get(item.chrom)
if tree == None: return None
else:
iv = ival.Interval(item.chromStart, item.chromEnd)
node = tree.closest(iv)
if node != None: return node.values
else: return None
else: return None
def readBedFile(filename, format = 'Limited'):
""" Read a BED file.
format: specifies the format of the file,
"Limited", e.g.
chr22 1000 5000
chr22 2000 6000
"Optional", e.g.
track name=pairedReads description="Clone Paired Reads" useScore=1
chr22 1000 5000 cloneA 960 + 1000 5000 0 2 567,488, 0,3512
chr22 2000 6000 cloneB 900 - 2000 6000 0 2 433,399, 0,3601
...
(also handles the Limited + score, and BED6 format)
"Peaks", e.g.
chr1 569780 569930 . 0 . 19 6.07811 -1 -1
chr1 713300 713450 . 0 . 54 49.1167 -1 -1
"Strand", e.g.
chr4 185772359 185772424 -
chr18 20513381 20513401 +
also supports a 5th label field
chr5 20611949 20611949 + ENSG00000251629_20611949
chr3 42187863 42187863 - ENSG00000234562_42187863
"Summit", e.g.
# d = 130
chr start end length summit tags -10*log10(pvalue) fold_enrichment FDR(%)
chr1 8250 8671 422 286 46 145.84 11.68 0.51
chr1 36382 36984 603 405 46 315.23 27.05 0.24
"CCAT", e.g.
chr8 94747805 94747070 94749250 525 3 21.519196 0.002000
chr17 55277895 55277070 55279280 560 18 21.283333 0.002000
"Cropped", e.g.
chr1 851602 10
chr1 921184 18
chr1 931838 9
"""
f = open(filename)
row = 0
acceptHeaderRows = 1
headerRow = None
chroms = dict()
for line in f:
row += 1
words = line.strip().split()
if len(words) == 0:
continue # ignore empty lines
if words[0].strip().startswith('#'):
continue # comment
if words[0].strip().startswith('browser'):
continue # ignore
if words[0].strip().startswith('track'):
continue # ignore
try:
chrom = words[0]
if format.lower().startswith('ccat'):
chromStart = int(words[2])
chromEnd = int(words[3])
else: # all other standard BED formats
chromStart = int(words[1])
chromEnd = int(words[2])
entry = BedEntry(chrom, chromStart, chromEnd)
if format.lower().startswith('opt'):
if len(words) >= 12:
entry.addOption(name = words[3], score = float(words[4]), strand = words[5], thickStart = int(words[6]), thickEnd = int(words[7]), itemRgb = words[8], blockCount = int(words[9]), blockSizes = words[10], blockStarts = words[11])
elif len(words) >= 9:
entry.addOption(name = words[3], score = float(words[4]), strand = words[5], thickStart = int(words[6]), thickEnd = int(words[7]), itemRgb = words[8])
elif len(words) >= 6:
entry.addOption(name = words[3], score = float(words[4]), strand = words[5])
elif len(words) >= 5:
entry.addOption(name = words[3], score = float(words[4]))
elif len(words) >= 4:
entry.addOption(name = words[3])
else:
entry.addOption(name = '.', score = int(words[3]), strand = '.')
elif format.lower().startswith('bed6'):
entry.addOption(name=words[3], score=float(words[4]), strand=words[5])
elif format.lower().startswith('strand'):
if len(words) >= 4: # properly formatted
entry.addOption(strand = words[3])
if len(words) >= 5:
entry.addOption(name = words[4])
elif format.lower().startswith('peak'):
if len(words) >= 10: # narrowpeaks
entry.addOption(name = words[3], score = int(words[4]), strand = words[5], signalValue = float(words[6]), pValue = float(words[7]), qValue = float(words[8]), peak = int(words[9]))
else: # broadpeaks
entry.addOption(name = words[3], score = int(words[4]), strand = words[5], signalValue = float(words[6]), pValue = float(words[7]), qValue = float(words[8]))
elif format.lower().startswith('summit'):
if len(words) >= 9:
entry.addOption(summit = int(words[4]), tags = int(words[5]), pValue = float(words[6]), fold = float(words[7]), fdr = float(words[8]))
else:
entry.addOption(summit = int(words[4]), tags = int(words[5]), pValue = float(words[6]), fold = float(words[7]))
elif format.lower().startswith('ccat'):
entry.addOption(summit = int(words[1]) - entry.chromStart, tags = int(words[4]), bg = int(words[5]), zscore = float(words[6]), fdr = float(words[7]), name = '.', score = int(words[4]), strand = '.')
elif format.lower().startswith('crop'):
entry.addOption(score = int(words[2]), name = '.', strand = '.')
entry.chromEnd = entry.chromStart + 1
# check if the chromosome has been seen before
tree = chroms.get(chrom)
if not tree:
tree = ival.IntervalTree()
chroms[chrom] = tree
# put the entry in the interval tree for the appropriate chromosome
iv = ival.Interval(entry.chromStart, entry.chromEnd)
tree.put(iv, entry)
except RuntimeError as e:
if not acceptHeaderRows:
raise RuntimeError('Error in BED file at row %d (%s)' % (row, e.strerror))
else:
headerRow = words
acceptHeaderRows -= 1 # count down the number of header rows that can occur
f.close()
return chroms
def writeBedFile(entries, filename, format = 'BED6', header = None):
""" Save the BED entries to a BED file.
format - the format to use for WRITING, currently only BED6 ('Optional' 6-col format) is supported.
"""
f = open(filename, 'w')
if header:
f.write(header + '\n')
for row in entries:
if format == 'Peaks':
f.write("%s\t%d\t%d\t%s\t%d\t%s\t%f" % (row.chrom, row.chromStart, row.chromEnd, row.name, row.score, row.strand, row.signalValue))
elif format == 'Limited':
f.write("%s\t%d\t%d" % (row.chrom, row.chromStart, row.chromEnd))
elif format == 'Strand':
f.write("%s\t%d\t%d" % (row.chrom, row.chromStart, row.chromEnd, row.strand, row.name))
else:
f.write("%s\t%d\t%d\t%s\t%d\t%s" % (row.chrom, row.chromStart, row.chromEnd, row.name, row.score, row.strand))
f.write("\n")
f.close()
if __name__ == '__main__':
bf = BedFile('/Users/mikael/binfpy/BIOL3014/Week7/mm10_genes.bed', 'optional')
print(bf.chroms.keys())
g = bf.generate('chr1')
print(next(g))
print(next(g))
print(next(g))
cnt = 0
for entry in bf:
cnt += 1
print(str(cnt) + '\t' + str(entry))
if cnt == 100:
break
entry1 = BedEntry('chrX', 3858266, 3858530)
print(entry1 in bf)
entry2 = BedEntry('chrX', 10047550, 10067694)
for x in bf.getOverlap(entry2):
print(x)
entry3 = BedEntry('chr9', 102699903, 102700167)
for x in bf.getClosest(entry3):
print(x)
for y in x:
print(y)
import random
class IntervalTree:
"""
Binary search tree for storing long integer intervals, and for performing queries on them.
See https://en.wikipedia.org/wiki/Interval_tree, specifically the Augmented kind.
The present implementation balances the tree by using randomisation.
"""
root = None # pointer to the root node of the binary search tree
stack = None
def __iter__(self):
self.current = self.root
self.stack = Stack()
return self
def __next__(self):
while self.current != None:
self.stack.push(self.current)
self.current = self.current.left
if self.stack.isEmpty():
raise StopIteration
self.current = self.stack.pop()
ret = self.current
self.current = self.current.right
return ret
def __len__(self):
return self.root.N
def __contains__(self, ival):
return self.get(ival) != None
def get(self, ival):
return self._get(self.root, ival)
def _get(self, node, ival):
if node == None: return None
if ival < node.ival:
return self._get(node.left, ival)
elif ival > node.ival:
return self._get(node.right, ival)
else:
return node
def isect(self, ival, node = None):
""" Look for intersecting interval in subtree rooted at specified node (root by default).
Returns node of intersecting interval. """
if self.root == None: return None
if node == None: return self.isect(ival, self.root)
while node != None:
if isect(ival, node.ival): return node
elif node.left == None: node = node.right
elif node.left.max < ival.min: node = node.right
else: node = node.left
return None
def isectall(self, ival):
""" Look for all intersecting intervals in the subtree rooted at specified node (root by default).
Returns nodes of intersecting intervals. """
return _isectall(ival, self.root)
def closest(self, query):
""" Retrieve the interval Y stored in the tree that is closest to the given interval X.
If the given interval overlaps with one or more stored intervals, one is returned:
the interval Y with the greatest Jaccard index to X. If multiple intervals are equally close,
only one is returned (the one before I think).
:param query: the interval for which the closest is sought
:return: the interval closest to the given query interval
"""
ovlap = self.isectall(query)
if len(ovlap) == 0: # overlapping intervals are not in the tree
return _closest(query, self.root)
else:
best_iv = None
best_ji = 0
for node in ovlap:
ji = jaccard(node.ival, query)
if best_iv == None or ji > best_ji:
best_iv = node
best_ji = ji
return best_iv
def put(self, ival, value = None):
nodex = self.get(ival)
if nodex:
nodex.values.add(value)
else:
self.root = self._randomizedInsert(self.root, ival, value)
def _randomizedInsert(self, node, ival, value):
if node == None: return IntervalNode(ival, value)
if random.uniform(0,1) * node.N < 1.0: return self._rootInsert(node, ival, value)
if ival < node.ival:
node.left = self._randomizedInsert(node.left, ival, value)
else:
node.right = self._randomizedInsert(node.right, ival, value)
_fix(node)
return node
def _rootInsert(self, node, ival, value):
if node == None: return IntervalNode(ival, value)
if ival < node.ival:
node.left = self._rootInsert(node.left, ival, value)
node = _rotR(node)
else:
node.right = self._rootInsert(node.right, ival, value)
node = _rotL(node)
return node
def _isectall(ival, node):
""" Look for all intersecting intervals in the subtree rooted at specified node (root by default).
Returns nodes of intersecting intervals. """
if node == None: return []
found = []
if isect(ival, node.ival):
found = [node]
if node.left and node.left.max >= ival.min:
found.extend(_isectall(ival, node.left))
if len(found) > 0 or node.left == None or node.left.max < ival.min:
found.extend(_isectall(ival, node.right))
return found
def _closest(query, cand):
""" Recursively find the interval with the minimum distance to that given.
This internal function does not guarantee that distances are sensible when overlapping
intervals exist; essentially it assumes that overlaps have been eliminated prior.
:param query: interval
:param cand: node from which search starts
:return: closest interval """
fav = None
favdist = -1
while cand != None:
if query == cand.ival: return cand
distx = query.dist(cand.ival)
if fav == None or distx <= favdist:
fav = cand
favdist = distx
if cand.left == None: cand = cand.right
elif cand.right == None: cand = cand.left
elif cand.ival.min > query.max: cand = cand.left # the smallest, indexed value (on left) is AFTER the query min
else: # no way to choose without looking in the intervals below
favleft = None
distleft = query.dist(Interval(cand.left.min, cand.left.max))
if distleft < favdist:
favleft = _closest(query, cand.left)
distleft = query.dist(favleft.ival) if favleft != None else MAX_VALUE
distright = query.dist(Interval(cand.right.min, cand.right.max))
if distright < favdist:
favright = _closest(query, cand.right)
distright = query.dist(favright.ival) if favright != None else MAX_VALUE
if distleft < distright:
return favleft if distleft < favdist else fav
else:
return favright if distright < favdist else fav
return fav
class IntervalNode:
"""
Defines the node of the interval search tree.
Manages values associated with intervals.
"""
ival = None # the actual interval
values = None # values associated with the interval
left = None # subtree on the left (lesser)
right = None # subtree on the right (greater)
N = 1 # number of nodes under (and including) this one
min = 0 # min point of subtree rooted at this node
max = 0 # max point of subtree rooted at this node
def __init__(self, interval, value = None):
self.ival = interval
self.min = interval.min
self.max = interval.max
self.values = set()
if value != None:
self.values.add(value)
def add(self, value):
if value:
self.values.add(value)
def __str__(self):
leftstr = 'o' if self.left else 'x'
rightstr = 'o' if self.right else 'x'
return leftstr + self.ival.__str__() + rightstr
def __unicode__(self):
leftstr = 'o' if self.left else 'x'
rightstr = 'o' if self.right else 'x'
return leftstr + self.ival.__unicode__() + rightstr
def size(node):
if node == None: return 0
else: return node.N
def _fix(node):
if node == None: return
node.N = 1 + size(node.left) + size(node.right)
node.min = _min3(node.ival.min, _min(node.left), _min(node.right))
node.max = _max3(node.ival.max, _max(node.left), _max(node.right))
MAX_VALUE = 9E30
MIN_VALUE = -9E30
def _min(node):
return MAX_VALUE if node == None else node.min
def _max(node):
return MIN_VALUE if node == None else node.max
def _min3(a, b, c):
return min(a, min(b, c))
def _max3(a, b, c):
return max(a, max(b, c))
def _rotR(node):
y = node.left
node.left = y.right
y.right = node
_fix(node)
_fix(y)
return y
def _rotL(node):
y = node.right
node.right = y.left
y.left = node
_fix(node)
_fix(y)
return y
class Stack:
""" A stack to support an iterator over IntervalNodes in the IntervalTree. """
def __init__(self):
self.items = []
def isEmpty(self):
return self.items == []
def push(self, item):
self.items.append(item)
def pop(self):
return self.items.pop()
def peek(self):
return self.items[len(self.items) - 1]
def size(self):
return len(self.items)
class Interval:
"""
Define a one-dimensional interval.
"""
def __init__(self, min, max):
if (min <= max):
self.min = min
self.max = max
else:
raise RuntimeError
def isect(self, that):
if (that.max < self.min): return False
if (self.max < that.min): return False
return True
def contains(self, x):
return (min <= x) and (x <= max)
def __eq__(self, other):
if not isinstance(other, Interval): return False
return True if (self.min == other.min and self.max == other.max) else False
def __ne__(self, other):
return not self.__eq__(other)
def __lt__(self, other):
if not isinstance(other, Interval): return False
return True if (self.min < other.min or (self.min == other.min and self.max < other.max)) else False
def __gt__(self, other):
if not isinstance(other, Interval): return False
return True if (self.min > other.min or (self.min == other.min and self.max > other.max)) else False
def __str__(self):
return '[' + str(self.min) + ', ' + str(self.max) +']'
def __unicode__(self):
return '[' + str(self.min) + ', ' + str(self.max) +']'
def __sizeof__(self):
return self.max - self.min
def dist(self, that):
if (self.min > that.max): return self.min - that.max # that interval is BEFORE this
if (self.max < that.min): return that.min - self.max # that interval is AFTER this
return 0
def dist(first, second):
if isinstance(first, Interval) and isinstance(second, Interval):
return first.dist(second)
return RuntimeError
def union(first, second):
if (first.isect(second)):
min = first.min if (first.min < second.min) else second.min
max = second.max if (first.max < second.max) else first.max
return Interval(min, max)
else:
raise RuntimeError
def isect(first, second):
if (first.isect(second)):
min = first.min if (first.min > second.min) else second.min
max = second.max if (first.max > second.max) else first.max
return Interval(min, max)
else:
return None
def jaccard(first, second):
if (isect(first, second)):
isect_min = first.min if (first.min > second.min) else second.min
isect_max = second.max if (first.max > second.max) else first.max
union_min = first.min if (first.min < second.min) else second.min
union_max = second.max if (first.max < second.max) else first.max
denom = union_max - union_min
if (denom > 0):
return (isect_max - isect_min) / denom
return 0
else:
return 0
if __name__ == '__main__':
a = Interval(13, 20)
b = Interval(25, 30)
c = Interval(27, 33)
d = Interval(40, 50)
e = Interval(21, 22)
f = Interval(36, 38)
g = Interval(16, 19)
h = Interval(28, 31)
i = Interval(55, 66)
j = Interval(-3, 0)
k = Interval(24, 24)
l = Interval(52, 55)
t = IntervalTree()
t.put(a, 'A')
t.put(b, 'B')
t.put(c, 'C')
t.put(d, 'D')
t.put(e, 'E')
t.put(b, 123)
t.put(b, 'blah')
t.get(d).add('x999')
t.put(i)
t.put(j)
t.put(g)
t.put(k)
print(c in t)
print(e in t)
print(t.get(a).values)
print(t.get(d).values)
print(t.get(b).values)
print(t.isect(f))
print(t.isect(g))
tryme = f
all = t.isectall(tryme)
print("Intersect with " + str(tryme) + ": ")
for n in all:
print('\t' + str(n))
print("Closest to " + str(tryme) + ": ")
print(t.closest(tryme))
print('Iterate through tree: ')
for n in t:
print('\t' + str(n))
\ No newline at end of file
"""
This following code is a modified version of twobitreader (which is under Perl Artistic License 2.0).
As per license restrictions, the code below indicates what has been modified in relation to the
standard version (retrieved from https://bitbucket.org/thesylex/twobitreader on the 16 May 2012).
No warranty is provided, express or implied
Modifications to package:
- removed download.py and __main__ because they were not used and __main__ had errors.
- removed command-line interface because the BED file functionality is implemented more extensively elsewhere
"""
"""
twobitreader
Licensed under Perl Artistic License 2.0
No warranty is provided, express or implied
"""
from array import array
from bisect import bisect_right
from errno import ENOENT, EACCES
from os import R_OK, access
try:
from os import strerror
except ImportError:
strerror = lambda x: 'strerror not supported'
from os.path import exists, getsize
import logging
import textwrap
import sys
if sys.version_info > (3,):
izip = zip
xrange = range
_CHAR_CODE = 'u'
iteritems = dict.items
else:
from itertools import izip
_CHAR_CODE = 'c'
iteritems = dict.iteritems
def safe_tostring(ary):
"""
convert arrays to strings in a Python 2.x / 3.x safe way
"""
if sys.version_info > (3,):
return ary.tounicode().encode("ascii").decode()
else:
return ary.tostring()
def true_long_type():
"""
OS X uses an 8-byte long, so make sure L (long) is the right size
and switch to I (int) if needed
"""
for type_ in ['L', 'I']:
test_array = array(type_, [0])
long_size = test_array.itemsize
if long_size == 4:
return type_
raise ImportError("Couldn't determine a valid 4-byte long type to use \
as equivalent to LONG")
LONG = true_long_type()
def byte_to_bases(x):
"""convert one byte to the four bases it encodes"""
c = (x >> 4) & 0xf
f = x & 0xf
cc = (c >> 2) & 0x3
cf = c & 0x3
fc = (f >> 2) & 0x3
ff = f & 0x3
return [bits_to_base(X) for X in (cc, cf, fc, ff)]
def bits_to_base(x):
"""convert integer representation of two bits to correct base"""
if x is 0:
return 'T'
elif x is 1:
return 'C'
elif x is 2:
return 'A'
elif x is 3:
return 'G'
else:
raise ValueError('Only integers 0-3 are valid inputs')
def base_to_bin(x):
"""
provided for user convenience
convert a nucleotide to its bit representation
"""
if x == 'T':
return '00'
elif x == 'C':
return '01'
elif x == 'A':
return '10'
elif x == 'G':
return '11'
else:
raise ValueError('Only characters \'ATGC\' are valid inputs')
def create_byte_table():
"""create BYTE_TABLE"""
d = {}
for x in xrange(2 ** 8):
d[x] = byte_to_bases(x)
return d
def split16(x):
"""
split a 16-bit number into integer representation
of its course and fine parts in binary representation
"""
c = (x >> 8) & 0xff
f = x & 0xff
return c, f
def create_twobyte_table():
"""create TWOBYTE_TABLE"""
d = {}
for x in xrange(2 ** 16):
c, f = split16(x)
d[x] = list(byte_to_bases(c)) + list(byte_to_bases(f))
return d
BYTE_TABLE = create_byte_table()
TWOBYTE_TABLE = create_twobyte_table()
def longs_to_char_array(longs, first_base_offset, last_base_offset, array_size,
more_bytes=None):
"""
takes in an array of longs (4 bytes) and converts them to bases in
a char array
you must also provide the offset in the first and last block
(note these offsets are pythonic. last_offset is not included)
and the desired array_size
If you have less than a long worth of bases at the end, you can provide
them as a string with more_bytes=
NOTE: last_base_offset is inside more_bytes not the last long, if more_bytes
is not None
returns the correct subset of the array based on provided offsets
"""
if array_size == 0:
return array(_CHAR_CODE)
elif array_size < 0:
raise ValueError('array_size must be at least 0')
if not first_base_offset in range(16):
raise ValueError('first_base_offset must be in range(16)')
if not last_base_offset in range(1, 17):
raise ValueError('last_base_offset must be in range(1, 17)')
longs_len = len(longs)
if more_bytes is None:
shorts_length = 0
else:
shorts_length = len(more_bytes)
if array_size > longs_len * 16 + 4 * shorts_length:
raise ValueError('array_size exceeds maximum possible for input')
dna = array(_CHAR_CODE, 'N' * (longs_len * 16 + 4 * shorts_length))
# translate from 32-bit blocks to bytes
# this method ensures correct endianess (byteswap as neeed)
i = 0
if longs_len > 0:
bytes_ = array('B')
bytes_.fromstring(longs.tostring())
# first block
first_block = ''.join([''.join(BYTE_TABLE[bytes_[x]]) for x in range(4)])
i = 16 - first_base_offset
if array_size < i:
i = array_size
dna[0:i] = array(_CHAR_CODE, first_block[first_base_offset:first_base_offset + i])
if longs_len > 1:
# middle blocks (implicitly skipped if they don't exist)
for byte in bytes_[4:-4]:
dna[i:i + 4] = array(_CHAR_CODE, BYTE_TABLE[byte])
i += 4
# last block
last_block = array(_CHAR_CODE, ''.join([''.join(BYTE_TABLE[bytes_[x]])
for x in range(-4, 0)]))
if more_bytes is None:
dna[i:i + last_base_offset] = last_block[0:last_base_offset]
else: # if there are more bytes, we need the whole last block
dna[i:i + 16] = last_block[0:16]
i += 16
if more_bytes is not None:
bytes_ = array('B')
bytes_.fromstring(more_bytes)
j = i
for byte in bytes_:
j = i + 4
if j > array_size:
dnabytes = array(_CHAR_CODE, BYTE_TABLE[byte])[0:(array_size - i)]
dna[i:array_size] = dnabytes
break
dna[i:i + last_base_offset] = array(_CHAR_CODE, BYTE_TABLE[byte])
i += 4
return dna[0:array_size]
class TwoBitFile(dict):
"""
python-level reader for .2bit files (i.e., from UCSC genome browser)
(note: no writing support)
TwoBitFile inherits from dict
You may access sequences by name, e.g.
>>> genome = TwoBitFile('hg18.2bit')
>>> chr20 = genome['chr20']
Sequences are returned as TwoBitSequence objects
You may access intervals by slicing or using str() to dump the entire entry
e.g.
>>> chr20[100100:100120]
'ttttcctctaagataatttttgccttaaatactattttgttcaatactaagaagtaagataacttccttttgttggta
tttgcatgttaagtttttttcc'
>>> whole_chr20 = str(chr20)
Fair warning: dumping the entire chromosome requires a lot of memory
See TwoBitSequence for more info
"""
def __init__(self, foo):
super(TwoBitFile, self).__init__()
if not exists(foo):
raise IOError(ENOENT, strerror(ENOENT), foo)
if not access(foo, R_OK):
raise IOError(EACCES, strerror(EACCES), foo)
self._filename = foo
self._file_size = getsize(foo)
self._file_handle = open(foo, 'rb')
self._load_header()
self._load_index()
for name, offset in iteritems(self._offset_dict):
self[name] = TwoBitSequence(self._file_handle, offset,
self._file_size,
self._byteswapped)
return
def __reduce__(self): # enables pickling
return (TwoBitFile, (self._filename,))
def _load_header(self):
file_handle = self._file_handle
header = array(LONG)
header.fromfile(file_handle, 4)
# check signature -- must be 0x1A412743
# if not, swap bytes
byteswapped = False
(signature, version, sequence_count, reserved) = header
if not signature == 0x1A412743:
byteswapped = True
header.byteswap()
(signature2, version, sequence_count, reserved) = header
if not signature2 == 0x1A412743:
raise TwoBitFileError('Signature in header should be ' +
'0x1A412743, instead found 0x%X' %
signature)
if not version == 0:
raise TwoBitFileError('File version in header should be 0.')
if not reserved == 0:
raise TwoBitFileError('Reserved field in header should be 0.')
self._byteswapped = byteswapped
self._sequence_count = sequence_count
def _load_index(self):
file_handle = self._file_handle
byteswapped = self._byteswapped
remaining = self._sequence_count
sequence_offsets = []
file_handle.seek(16)
while remaining > 0:
name_size = array('B')
name_size.fromfile(file_handle, 1)
if byteswapped:
name_size.byteswap()
# name = array(_CHAR_CODE)
name = array('B')
name.fromfile(file_handle, name_size[0])
name = "".join([chr(X) for X in name])
if byteswapped:
name.byteswap()
offset = array(LONG)
offset.fromfile(file_handle, 1)
if byteswapped:
offset.byteswap()
sequence_offsets.append((name, offset[0]))
remaining -= 1
self._sequence_offsets = sequence_offsets
self._offset_dict = dict(sequence_offsets)
def sequence_sizes(self):
"""returns a dictionary with the sizes of each sequence"""
d = {}
file_handle = self._file_handle
byteswapped = self._byteswapped
for name, offset in iteritems(self._offset_dict):
file_handle.seek(offset)
dna_size = array(LONG)
dna_size.fromfile(file_handle, 1)
if byteswapped:
dna_size.byteswap()
d[name] = dna_size[0]
return d
class TwoBitSequence(object):
"""
A TwoBitSequence object refers to an entry in a TwoBitFile
You may access intervals by slicing or using str() to dump the entire entry
e.g.
>>> genome = TwoBitFile('hg18.2bit')
>>> chr20 = genome['chr20']
>>> chr20[100100:100200] # slicing returns a string
'ttttcctctaagataatttttgccttaaatactattttgttcaatactaagaagtaagataacttccttttgttggta
tttgcatgttaagtttttttcc'
>>> whole_chr20 = str(chr20) # get whole chr as string
Fair warning: dumping the entire chromosome requires a lot of memory
Note that we follow python/UCSC conventions:
Coordinates are 0-based, end-open
(Note: The UCSC web-based genome browser uses 1-based closed coordinates)
If you attempt to access a slice past the end of the sequence,
it will be truncated at the end.
Your computer probably doesn't have enough memory to load a whole genome
but if you want to string-ize your TwoBitFile, here's a recipe:
x = TwoBitFile('my.2bit')
d = x.dict()
for k,v in d.items(): d[k] = str(v)
"""
def __init__(self, file_handle, offset, file_size, byteswapped=False):
self._file_size = file_size
self._file_handle = file_handle
self._original_offset = offset
self._byteswapped = byteswapped
file_handle.seek(offset)
header = array(LONG)
header.fromfile(file_handle, 2)
if byteswapped:
header.byteswap()
dna_size, n_block_count = header
self._dna_size = dna_size # number of characters, 2 bits each
self._n_bytes = (dna_size + 3) / 4 # number of bytes
# number of 32-bit fragments
self._packed_dna_size = (dna_size + 15) / 16
n_block_starts = array(LONG)
n_block_sizes = array(LONG)
n_block_starts.fromfile(file_handle, n_block_count)
if byteswapped:
n_block_starts.byteswap()
n_block_sizes.fromfile(file_handle, n_block_count)
if byteswapped:
n_block_sizes.byteswap()
self._n_block_starts = n_block_starts
self._n_block_sizes = n_block_sizes
mask_rawc = array(LONG)
mask_rawc.fromfile(file_handle, 1)
if byteswapped:
mask_rawc.byteswap()
mask_block_count = mask_rawc[0]
mask_block_starts = array(LONG)
mask_block_starts.fromfile(file_handle, mask_block_count)
if byteswapped:
mask_block_starts.byteswap()
mask_block_sizes = array(LONG)
mask_block_sizes.fromfile(file_handle, mask_block_count)
if byteswapped:
mask_block_sizes.byteswap()
self._mask_block_starts = mask_block_starts
self._mask_block_sizes = mask_block_sizes
file_handle.read(4)
self._offset = file_handle.tell()
def __len__(self):
return self._dna_size
def __getitem__(self, slice_or_key):
"""
return a sub-sequence, given a slice object
"""
step = None
if isinstance(slice_or_key, slice):
step = slice_or_key.step
if step is not None:
raise ValueError("Slicing by step not currently supported")
return self.get_slice(min_=slice_or_key.start, max_=slice_or_key.stop)
elif isinstance(slice_or_key, int):
max_ = slice_or_key + 1
if max_ == 0:
max_ = None
return self.get_slice(min_=slice_or_key, max_=max_)
def get_slice(self, min_, max_=None):
"""
get_slice returns only a sub-sequence
"""
# handle negative coordinates
dna_size = self._dna_size
if min_ is None: # for slicing e.g. [:]
min_ = 0
if max_ is not None and max_ < 0:
if max_ < -dna_size:
raise IndexError('index out of range')
max_ = dna_size + max_
if min_ is not None and min_ < 0:
if min_ < -dna_size:
raise IndexError('index out of range')
min_ = dna_size + min_
# make sure there's a proper range
if max_ is not None and min_ > max_:
return ''
if max_ == 0 or max_ == min_:
return ''
# load all the data
if max_ is None or max_ > dna_size:
max_ = dna_size
file_handle = self._file_handle
byteswapped = self._byteswapped
n_block_starts = self._n_block_starts
n_block_sizes = self._n_block_sizes
mask_block_starts = self._mask_block_starts
mask_block_sizes = self._mask_block_sizes
offset = self._offset
packed_dna_size = self._packed_dna_size
# n_bytes = self._n_bytes
# region_size is how many bases the region is
if max_ is None:
region_size = dna_size - min_
else:
region_size = max_ - min_
# start_block, end_block are the first/last 32-bit blocks we need
# blocks start at 0
start_block = min_ // 16
# jump directly to desired file location
local_offset = offset + (start_block * 4)
end_block = (max_ - 1 + 16) // 16
# don't read past seq end
file_handle.seek(local_offset)
# note we won't actually read the last base
# this is a python slice first_base_offset:16*blocks+last_base_offset
first_base_offset = min_ % 16
last_base_offset = max_ % 16
if last_base_offset == 0:
last_base_offset = 16
# +1 we still need to read end_block maybe
blocks_to_read = end_block - start_block
if (blocks_to_read + start_block) > packed_dna_size:
blocks_to_read = packed_dna_size - start_block
fourbyte_dna = array(LONG)
# remainder_seq = None
if (blocks_to_read * 4 + local_offset) > self._file_size:
fourbyte_dna.fromfile(file_handle, blocks_to_read - 1)
morebytes = file_handle.read() # read the remaining characters
# if byteswapped:
# morebytes = ''.join(reversed(morebytes))
else:
fourbyte_dna.fromfile(file_handle, blocks_to_read)
morebytes = None
if byteswapped:
fourbyte_dna.byteswap()
str_as_array = longs_to_char_array(fourbyte_dna, first_base_offset,
last_base_offset, region_size,
more_bytes=morebytes)
for start, size in izip(n_block_starts, n_block_sizes):
end = start + size
if end <= min_:
continue
if start > max_:
break
if start < min_:
start = min_
if end > max_:
end = max_
start -= min_
end -= min_
# this should actually be decoded, 00=N, 01=n
str_as_array[start:end] = array(_CHAR_CODE, 'N' * (end - start))
lower = str.lower
first_masked_region = max(0,
bisect_right(mask_block_starts, min_) - 1)
last_masked_region = min(len(mask_block_starts),
1 + bisect_right(mask_block_starts, max_,
lo=first_masked_region))
for start, size in izip(mask_block_starts[first_masked_region:
last_masked_region],
mask_block_sizes[first_masked_region:
last_masked_region]):
end = start + size
if end <= min_:
continue
if start > max_:
break
if start < min_:
start = min_
if end > max_:
end = max_
start -= min_
end -= min_
str_as_array[start:end] = array(_CHAR_CODE,
lower(safe_tostring(str_as_array[start:end])))
if not len(str_as_array) == max_ - min_:
raise RuntimeError("Sequence was the wrong size")
return safe_tostring(str_as_array)
def __str__(self):
"""
returns the entire chromosome
"""
return self.get_slice(0, None)
class TwoBitFileError(Exception):
"""
Base exception for TwoBit module
"""
def __init__(self, msg):
errtext = 'Invalid 2-bit file. ' + msg
return super(TwoBitFileError, self).__init__(errtext)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment