-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontig_stats.py
249 lines (202 loc) · 8.08 KB
/
contig_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
### Boas Pucker ###
### v1.2 ###
### bpucker@cebitec.uni-bielefeld.de ###
import re, sys, os
# --- end of imports --- #
__usage__ = """ python contig_stats.py\n
--input <FILENAME>\n
optional:
--min_contig_len <INTEGER> [default=500]\n
--out <FULL_PATH_TO_OUTPUT_DIRECTORY>
bug reports and feature requests: bpucker@cebitec.uni-bielefeld.de
Please cite: Pucker et al., 2016. doi:10.1371/journal.pone.0164321
"""
def calculate_formal_contig_stats( filename ):
"""! @brief calculates some formal stats of the given multiple fasta file (assembly)
@param filename (string) full path to a assembly output file (multiple fasta file)
@return (dictionary) contains all formal stats of the analyzed assembly
@author Boas Pucker
"""
print "calculation of formal assembly stats ... please wait!"
number_of_bases_without_N = 0 #counts all bases without N
number_of_gc = 0 #counts occurences of G or C in sequence
contig_lengths = [] #lengths of all contigs in the assembly; used for calculation of min, max and mean
with open( filename, 'r' ) as f:
first_line = f.readline()
line = f.readline()
sequence = ""
counter = 1
while line:
if line[0] == '>': #new header => evaluate current sequence and set back to empty string
for base in sequence.upper():
if base == 'G' or base == 'C':
number_of_gc += 1
number_of_bases_without_N += 1
elif base == 'A' or base == 'T':
number_of_bases_without_N += 1
contig_lengths.append( len( sequence ) )
sequence = ""
else:
sequence += line.strip()
line = f.readline()
counter += 1
if counter % 1000 == 0:
print str( counter/1000 ) + ' x1000 lines processed'
#place block from new header here again (for last sequence in file)
for base in sequence.upper():
if base == 'G' or base == 'C':
number_of_gc += 1
number_of_bases_without_N += 1
elif base == 'A' or base == 'T':
number_of_bases_without_N += 1
contig_lengths.append( len( sequence ) )
# --- calculate remaining stats --- #
number_of_contigs = len( contig_lengths ) #counts number of contigs / scaffolds in this assembly
total_number_of_bases = sum( contig_lengths ) #counts all bases in the assembyl
mean_contig_length = total_number_of_bases / number_of_contigs #average contig lengths
minimal_contig_length = min( contig_lengths )
maximal_contig_length = max( contig_lengths )
# --- sort list of contig length decreasing --- #
sorted_contig_lengths = sorted( contig_lengths )[::-1] #invert to get it decreasing
N25 = False
N50 = False
N75 = False
N90 = False
cum_length = total_number_of_bases
for contig_length in sorted_contig_lengths:
cum_length -= contig_length
if cum_length <= 0.1 * total_number_of_bases:
if not N90:
N90 = contig_length
elif cum_length <= 0.25 * total_number_of_bases:
if not N75:
N75 = contig_length
elif cum_length <= 0.5 * total_number_of_bases:
if not N50:
N50 = contig_length
elif cum_length <= 0.75 * total_number_of_bases:
if not N25:
N25 = contig_length
stats = { 'number_of_contigs': number_of_contigs,
'mean_contig_length': mean_contig_length,
'minimal_contig_length': minimal_contig_length,
'maximal_contig_length': maximal_contig_length,
'total_number_of_bases': total_number_of_bases,
'number_of_bases_without_N': number_of_bases_without_N,
'gc_content': float( number_of_gc ) /number_of_bases_without_N,
'N25': N25,
'N50': N50,
'N75': N75,
'N90': N90
}
print "calculation of formal assembly stats done."
return stats
def write_evaluation_to_file( outputfile, formal_stats, assembly_name ):
"""! @brief writes all calculated evaluation results to file
@param prefix (string) path to the ouput loction of all files
@param outputfile (string)) only name of file for result output
@param formal_stats (dictionary) contains some statistics about the assembly
@param assembly_name (string) the name of the currently processed assembly
"""
print "writing results to file ... please wait!"
with open( outputfile, 'w' ) as out:
out.write( 'assembly name: ' + assembly_name + '\n\n' )
out.write( 'number of contigs:\t' + str( formal_stats['number_of_contigs'] ) + '\n' )
out.write( 'average contig length:\t' + str( formal_stats['mean_contig_length'] ) + '\n' )
out.write( 'minimal contig length:\t' + str( formal_stats['minimal_contig_length'] ) + '\n' )
out.write( 'maximal contig length:\t' + str( formal_stats['maximal_contig_length'] ) + '\n\n' )
out.write( 'total number of bases:\t' + str( formal_stats['total_number_of_bases'] ) + '\n' )
out.write( 'total number of bases without Ns:\t' + str( formal_stats['number_of_bases_without_N'] ) + '\n' )
out.write( 'GC content:\t' + str( formal_stats['gc_content'] ) + '\n\n' )
out.write( 'N25:\t' + str( formal_stats['N25'] ) + '\n' )
out.write( 'N50:\t' + str( formal_stats['N50'] ) + '\n' )
out.write( 'N75:\t' + str( formal_stats['N75'] ) + '\n' )
out.write( 'N90:\t' + str( formal_stats['N90'] ) + '\n\n' )
print "all results written to file."
def clean_assembly_file( input_file, output_file, cutoff ):
"""! @brief removes small contigs and cleans contig name to 'contig_<INTEGER>' """
print "cleaning contig names and removing small contigs ... please wait!"
with open( output_file, "w" ) as out:
with open( input_file, "r" ) as f:
line = f.readline()
try:
try:
try:
try:
try:
try:
header = re.findall( "contig_\d+", line )[0]
except:
header = re.findall( "contig\d+", line )[0]
except:
header = re.findall( "scaffold\d+", line )[0]
except:
header = re.findall( "C\d+", line )[0]
except:
header = re.findall( "NODE_\d+", line )[0]
except:
header = re.findall( "seq\d+", line )[0]
except:
header = line.strip()[1:]
seq = ""
while line:
if line[0] == '>':
if len( seq ) >= cutoff:
out.write( '>' + header + '\n' + seq + '\n' )
seq = ""
try:
try:
try:
try:
try:
try:
header = re.findall( "contig_\d+", line )[0]
except:
header = re.findall( "contig\d+", line )[0]
except:
header = re.findall( "scaffold\d+", line )[0]
except:
header = re.findall( "C\d+", line )[0]
except:
header = re.findall( "NODE_\d+", line )[0]
except:
header = re.findall( "seq\d+", line )[0]
except:
header = line.strip()[1:]
else:
seq += line.strip()
line = f.readline()
if len( seq ) >= cutoff:
out.write( '>' + header + '\n' + seq + '\n' )
def main( arguments ):
"""! @brief runs all parts of this script """
raw_assembly_file = arguments[ arguments.index( '--input' ) + 1 ]
if '--min_contig_len' in arguments:
cutoff = int( arguments[ arguments.index( '--min_contig_len' ) + 1 ] )
else:
cutoff = 500
if '--out' in arguments:
output_dir = arguments[ arguments.index( '--out' ) + 1 ]
if output_dir[-1] != '/':
output_dir += "/"
if not os.path.exists( output_dir ):
os.makedirs( output_dir )
clean_assembly_filename = output_dir + raw_assembly_file.split('/')[-1] + '_trimmed.fasta'
stats_outputfile = output_dir + raw_assembly_file.split('/')[-1] + '_stats.txt'
else:
clean_assembly_filename = raw_assembly_file + '_trimmed.fasta'
stats_outputfile = clean_assembly_filename.replace( "_trimmed.fasta", "_stats.txt" )
# --- cleaning assembly --- #
clean_assembly_file( raw_assembly_file, clean_assembly_filename, cutoff )
# --- calculating assembly stats --- #
formal_assembly_stats = calculate_formal_contig_stats( clean_assembly_filename )
assembly_name = '.'.join( clean_assembly_filename.split('/')[-1].split('.')[:-1] )
# ---- write all results of the evaluation to file --- #
write_evaluation_to_file( stats_outputfile, formal_assembly_stats, assembly_name )
# --- this part is used for the setting of all given parameters and thereby keeps track of previous usage of this script ---- #
if __name__ == '__main__':
if '--input' in sys.argv:
main( sys.argv )
else:
sys.exit( __usage__ )
print "all done!"