-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcalc_pi.py
52 lines (43 loc) · 1.75 KB
/
calc_pi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import sys
import subprocess
from subprocess import Popen, PIPE
def read_intervals(intervals):
interval_L = []
with open(intervals) as fp:
for line in fp:
line = line.strip("\r\n").split("\t")
chrom, start, stop = line[0], int(line[1]), int(line[2])
interval_L.append([chrom, start, stop])
return interval_L
def parse_site_pi(site_pi):
pi = 0
for i in site_pi[1:]:
site = i.strip("\r\n").split("\t")
pi += float(site[-1])
#print site
return pi
def get_uncallable_sites(chrom, start, stop):
cmd1 = ["bcftools","view","-r","%s:%i-%i"%(chrom, start, stop),"Millepora_all_GT_filt.vcf.gz"]
cmd2 = ["bcftools","query","-f",""'%POS\n'",-"]
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
p2 = subprocess.Popen(cmd2, stdin=p1.stdout, stdout=subprocess.PIPE)
callable_sites = len(p2.stdout.readlines())
uncallable_sites = (stop-start) - callable_sites
return uncallable_sites, callable_sites
def main():
input_vcf = sys.argv[1]
intervals = sys.argv[2]
interval_L = read_intervals(intervals)
for interval in interval_L:
chrom, start, stop = interval
cmd1 = ["bcftools","view","-r","%s:%i-%i"%(chrom, start, stop),input_vcf]
cmd2 = ["vcftools","--vcf","-","--site-pi","--stdout"]
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
p2 = subprocess.Popen(cmd2, stdin=p1.stdout, stdout=subprocess.PIPE)
site_pi = p2.stdout.readlines()
uncallable_sites, callable_sites = get_uncallable_sites(chrom, start, stop)
if callable_sites > 0:
print chrom, start, stop, parse_site_pi(site_pi)/((stop-start)-uncallable_sites), callable_sites
#break
if __name__ == '__main__':
main()