1 import sys, os, random
2 import unittest
3 import tempfile
4 from operator import itemgetter
5 from functools import partial
6
7 try:
8 sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
9 except:
10 sys.path.insert(0, os.path.dirname(os.path.abspath(".")))
11
12 from bedops import bedmap, overlap_as_array
13 from . import parseBED
14
17 if not ( hasattr(self, "anchor_fn") and os.path.exists(self.anchor_fn) ):
18 with tempfile.NamedTemporaryFile(delete=False) as rfd:
19 rfd.write("\n".join( ("chr1\t10\t100\ta1\t1.3\t+",
20 "chr1\t200\t290\ta2\t22.2\t+",
21 "chr4\t5\t95\ta3\t43.5\t-\n") ))
22 self.anchors = rfd.name
23
24 if not ( hasattr(self, "t1") and os.path.exists(self.t1) ):
25 with tempfile.NamedTemporaryFile(delete=False) as mfd:
26 mfd.write("\n".join( ("chr1\t15\t30\tt11\t1",
27 "chr1\t25\t30\tmp2\t0.5",
28 "chr3\t55\t80\tmp2\t1",
29 "chr4\t20\t56\tmp3\t23.3\n") ))
30 self.t1 = mfd.name
31 self.anchor_len = 90
32 self.B = random.choice((1, 2, 3, 5, 6, 9))
33 self.B = 5
34
36 original_anchors = map(parseBED, open(self.anchors))
37 bdemp_res = list( bedmap(self.anchors, self.t1, onlyOverlapping=False) )
38
39 self.assertTrue( all( map(lambda a: a[2]-a[1] == self.anchor_len, original_anchors) ) )
40 self.assertEqual( original_anchors, map(parseBED, map(itemgetter(0), bdemp_res)) )
41
42 for anchor_str, ft_str in bdemp_res:
43 anchor = parseBED(anchor_str)
44 ft_lst = map(parseBED, filter(bool, ft_str.split(";")))
45
46
47 self.assertTrue( all( map(lambda ft: max(ft[1], anchor[1]) < min(ft[2], anchor[2]), ft_lst) ) )
48
49 x_s = overlap_as_array(anchor, ft_str.split(";"), bin=self.B, parseFeature=partial(parseBED, use_score=True))
50 x_c = overlap_as_array(anchor, ft_str.split(";"), bin=self.B, parseFeature=partial(parseBED, use_score=False))
51
52
53
54 print x_c
55
56 if anchor[5] == "-":
57 print ft_lst
58 ft_lst = map(lambda (_ch, _s, _e, _n, _sc): (_ch, anchor[1]+anchor[2]-_e, anchor[2]+anchor[1]-_s, _n, _sc), ft_lst)
59 print ft_lst
60 print
61
62 self.assertEqual( x_s.shape[0], x_c.shape[0] )
63 for i in range(x_c.shape[0]):
64 s = anchor[1] + self.B*i
65 e = anchor[1] + self.B*(i+1)
66
67 sel_feats = filter(lambda f: max(s, f[1]) < min(e, f[2]), ft_lst)
68 overlap = map(lambda f: min(e, f[2]) - max(s, f[1]), sel_feats)
69 if anchor[5] == "-":
70 print i, s, e, sel_feats, overlap
71 score = map(itemgetter(4), sel_feats)
72
73
74 self.assertEqual( sum(overlap), x_c[i] )
75 self.assertAlmostEqual( x_s[i], sum( map(lambda (s,o): s*o, zip(score, overlap)) ), places=4 )
76
77
78
79
80 if __name__ == '__main__':
81 unittest.main()
82