modules.analyse
1import io, gzip 2from dataclasses import dataclass, astuple 3 4''' 5analyse.py, handles the analysis of 'extract' output 6''' 7 8''' 9What this needs to do: 10 * calculate tail length DONE 11 * count nucleotides DONE 12 * assign strand DONE 13 * assign tail type: DONE 14 all G or C -> G/C DONE 15 Empty (len = 0) -> no_tail DONE 16 all T -> polyU DONE 17 all A -> polyA DONE 18 A or T -> mixed_AU DONE 19 otherwise -> other DONE 20 * Count terminal Us (Ts) DONE 21''' 22 23class ExtractFile: 24 def __init__(self, path: str): 25 self.path = path 26 27 # 28 # File handling methods 29 # 30 31 def _get_header(self): 32 ''' 33 Internal method, used for reading the header of input text file. 34 35 Args: 36 handle: file handle generated by one of the open(file, 'r') variants, eg. gzip.open(file, 'rt') 37 38 Returns: 39 list[str, ...]: header as a \t separated list of strings 40 ''' 41 return self.handle.readline().strip().split('\t') 42 43 def open_gzip(self): 44 ''' 45 Opens a gzipped file for reading as text and stores its 1st line (header) separately 46 ''' 47 self.handle = gzip.open(self.path, "rt") 48 self.header = self._get_header() 49 50 def open_plain(self): 51 ''' 52 Opens a plain text file for reading and stores its 1st line (header) separately 53 ''' 54 self.handle = open(self.path, 'r') 55 self.header = self._get_header() 56 57 def close(self): 58 ''' 59 For closing the file after reading 60 ''' 61 self.handle.close() 62 63 def _line_process(self, line): 64 ''' 65 Used for processing the input lines into manageable data structures (only lists for now) 66 ''' 67 return line.strip().split('\t') 68 69 def __iter__(self): 70 ''' 71 Return object iterator 72 ''' 73 return self 74 75 def __next__(self): 76 ''' 77 Define how to get the next element, i.e. line 78 ''' 79 line = self.handle.readline() 80 self.line = self._line_process(self.handle.readline()) 81 if line: 82 self.line = self._line_process(line) 83 else: 84 raise StopIteration 85 86 # 87 # File processing/analysis methods 88 # 89 90 def tail_len(self): 91 ''' 92 Calculate tail length (CLIP3 field) 93 ''' 94 if self.line[5] != "NA": 95 return len(self.line[5]) 96 else: 97 return 0 98 99 def count_bases(self): 100 ''' 101 Count bases for 3' tail sequence 102 ''' 103 tail_seq = self.line[5] 104 105 if tail_seq != "NA": 106 return tuple([tail_seq.count('A'), 107 tail_seq.count('C'), 108 tail_seq.count('G'), 109 tail_seq.count('T')]) 110 else: 111 return tuple([0, 0, 0, 0]) 112 113 def get_strand(self): 114 ''' 115 Get mapping strand from SAM flags. Flags are stored as `int:12-bit` 116 format, eg. 123:000001111011. 117 118 Outputs the Entrez strand convention, i.e. + = 1, - = -1 119 ''' 120 flags = bin(int(self.line[1].split(':')[0])) 121 122 # 5th bit is "REVERSE - SEQ is reverse complemented" 123 return -1 if flags[-5] else 1 124 125 def tail_type(self) -> str: 126 ''' 127 Assign a tail type based on it's base composition 128 129 Args: 130 base_counts (tuple): (A, C, G, T) 131 132 Returns: 133 str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC, 134 other 135 ''' 136 base_counts = self.count_bases() 137 total = sum(base_counts) 138 if total == 0: 139 return "no_tail" 140 141 elif base_counts[3] == total: 142 return "polyU" 143 144 elif base_counts[0] == total: 145 return "polyA" 146 147 elif base_counts[0] + base_counts[3] == total: 148 return "mixed_AU" 149 150 elif base_counts[2] + base_counts[3] == total: 151 return "mixed_GC" 152 153 else: 154 return "other" 155 156 def count_Us(self): 157 tail = self.line[5] 158 u_count = 0 159 160 for i in range(1, len(tail) + 1): 161 if tail[-i] == 'T': 162 u_count += 1 163 else: 164 break 165 166 return u_count 167 168 def header_format(self): 169 ''' 170 Prepare header for printing to file 171 ''' 172 outhdr = '\t'.join(self.header + ["TAIL_LEN", 173 "A", "C", "G", "U", 174 "STRAND", "TYPE", 175 "U_COUNT"]) + '\n' 176 177 return outhdr 178 179 def line_format(self): 180 ''' 181 Prepare line for writing to file 182 ''' 183 return '\t'.join(self.line + [str(self.tail_len()), 184 '\t'.join([str(i) for i in self.count_bases()]), 185 str(self.get_strand()), 186 self.tail_type(), 187 str(self.count_Us())]) + '\n' 188 189def analyser(infile: str, out_name: str, briefmode: bool) -> None: 190 191 f = ExtractFile(infile) 192 f.open_gzip() 193 194 if briefmode: 195 with gzip.open(f"{out_name}.tsv.gz", "wt") as fo: 196 197 fo.write(f.header_format()) 198 199 for line in f: 200 if f.tail_type() != "no_tail": 201 fo.write(f.line_format()) 202 else: 203 with gzip.open(f"{out_name}.tsv.gz", "wt") as fo: 204 205 fo.write(f.header_format()) 206 207 for line in f: 208 fo.write(f.line_format()) 209 210 f.close() 211 212 return None
class
ExtractFile:
24class ExtractFile: 25 def __init__(self, path: str): 26 self.path = path 27 28 # 29 # File handling methods 30 # 31 32 def _get_header(self): 33 ''' 34 Internal method, used for reading the header of input text file. 35 36 Args: 37 handle: file handle generated by one of the open(file, 'r') variants, eg. gzip.open(file, 'rt') 38 39 Returns: 40 list[str, ...]: header as a \t separated list of strings 41 ''' 42 return self.handle.readline().strip().split('\t') 43 44 def open_gzip(self): 45 ''' 46 Opens a gzipped file for reading as text and stores its 1st line (header) separately 47 ''' 48 self.handle = gzip.open(self.path, "rt") 49 self.header = self._get_header() 50 51 def open_plain(self): 52 ''' 53 Opens a plain text file for reading and stores its 1st line (header) separately 54 ''' 55 self.handle = open(self.path, 'r') 56 self.header = self._get_header() 57 58 def close(self): 59 ''' 60 For closing the file after reading 61 ''' 62 self.handle.close() 63 64 def _line_process(self, line): 65 ''' 66 Used for processing the input lines into manageable data structures (only lists for now) 67 ''' 68 return line.strip().split('\t') 69 70 def __iter__(self): 71 ''' 72 Return object iterator 73 ''' 74 return self 75 76 def __next__(self): 77 ''' 78 Define how to get the next element, i.e. line 79 ''' 80 line = self.handle.readline() 81 self.line = self._line_process(self.handle.readline()) 82 if line: 83 self.line = self._line_process(line) 84 else: 85 raise StopIteration 86 87 # 88 # File processing/analysis methods 89 # 90 91 def tail_len(self): 92 ''' 93 Calculate tail length (CLIP3 field) 94 ''' 95 if self.line[5] != "NA": 96 return len(self.line[5]) 97 else: 98 return 0 99 100 def count_bases(self): 101 ''' 102 Count bases for 3' tail sequence 103 ''' 104 tail_seq = self.line[5] 105 106 if tail_seq != "NA": 107 return tuple([tail_seq.count('A'), 108 tail_seq.count('C'), 109 tail_seq.count('G'), 110 tail_seq.count('T')]) 111 else: 112 return tuple([0, 0, 0, 0]) 113 114 def get_strand(self): 115 ''' 116 Get mapping strand from SAM flags. Flags are stored as `int:12-bit` 117 format, eg. 123:000001111011. 118 119 Outputs the Entrez strand convention, i.e. + = 1, - = -1 120 ''' 121 flags = bin(int(self.line[1].split(':')[0])) 122 123 # 5th bit is "REVERSE - SEQ is reverse complemented" 124 return -1 if flags[-5] else 1 125 126 def tail_type(self) -> str: 127 ''' 128 Assign a tail type based on it's base composition 129 130 Args: 131 base_counts (tuple): (A, C, G, T) 132 133 Returns: 134 str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC, 135 other 136 ''' 137 base_counts = self.count_bases() 138 total = sum(base_counts) 139 if total == 0: 140 return "no_tail" 141 142 elif base_counts[3] == total: 143 return "polyU" 144 145 elif base_counts[0] == total: 146 return "polyA" 147 148 elif base_counts[0] + base_counts[3] == total: 149 return "mixed_AU" 150 151 elif base_counts[2] + base_counts[3] == total: 152 return "mixed_GC" 153 154 else: 155 return "other" 156 157 def count_Us(self): 158 tail = self.line[5] 159 u_count = 0 160 161 for i in range(1, len(tail) + 1): 162 if tail[-i] == 'T': 163 u_count += 1 164 else: 165 break 166 167 return u_count 168 169 def header_format(self): 170 ''' 171 Prepare header for printing to file 172 ''' 173 outhdr = '\t'.join(self.header + ["TAIL_LEN", 174 "A", "C", "G", "U", 175 "STRAND", "TYPE", 176 "U_COUNT"]) + '\n' 177 178 return outhdr 179 180 def line_format(self): 181 ''' 182 Prepare line for writing to file 183 ''' 184 return '\t'.join(self.line + [str(self.tail_len()), 185 '\t'.join([str(i) for i in self.count_bases()]), 186 str(self.get_strand()), 187 self.tail_type(), 188 str(self.count_Us())]) + '\n'
def
open_gzip(self):
44 def open_gzip(self): 45 ''' 46 Opens a gzipped file for reading as text and stores its 1st line (header) separately 47 ''' 48 self.handle = gzip.open(self.path, "rt") 49 self.header = self._get_header()
Opens a gzipped file for reading as text and stores its 1st line (header) separately
def
open_plain(self):
51 def open_plain(self): 52 ''' 53 Opens a plain text file for reading and stores its 1st line (header) separately 54 ''' 55 self.handle = open(self.path, 'r') 56 self.header = self._get_header()
Opens a plain text file for reading and stores its 1st line (header) separately
def
tail_len(self):
91 def tail_len(self): 92 ''' 93 Calculate tail length (CLIP3 field) 94 ''' 95 if self.line[5] != "NA": 96 return len(self.line[5]) 97 else: 98 return 0
Calculate tail length (CLIP3 field)
def
count_bases(self):
100 def count_bases(self): 101 ''' 102 Count bases for 3' tail sequence 103 ''' 104 tail_seq = self.line[5] 105 106 if tail_seq != "NA": 107 return tuple([tail_seq.count('A'), 108 tail_seq.count('C'), 109 tail_seq.count('G'), 110 tail_seq.count('T')]) 111 else: 112 return tuple([0, 0, 0, 0])
Count bases for 3' tail sequence
def
get_strand(self):
114 def get_strand(self): 115 ''' 116 Get mapping strand from SAM flags. Flags are stored as `int:12-bit` 117 format, eg. 123:000001111011. 118 119 Outputs the Entrez strand convention, i.e. + = 1, - = -1 120 ''' 121 flags = bin(int(self.line[1].split(':')[0])) 122 123 # 5th bit is "REVERSE - SEQ is reverse complemented" 124 return -1 if flags[-5] else 1
Get mapping strand from SAM flags. Flags are stored as int:12-bit
format, eg. 123:000001111011.
Outputs the Entrez strand convention, i.e. + = 1, - = -1
def
tail_type(self) -> str:
126 def tail_type(self) -> str: 127 ''' 128 Assign a tail type based on it's base composition 129 130 Args: 131 base_counts (tuple): (A, C, G, T) 132 133 Returns: 134 str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC, 135 other 136 ''' 137 base_counts = self.count_bases() 138 total = sum(base_counts) 139 if total == 0: 140 return "no_tail" 141 142 elif base_counts[3] == total: 143 return "polyU" 144 145 elif base_counts[0] == total: 146 return "polyA" 147 148 elif base_counts[0] + base_counts[3] == total: 149 return "mixed_AU" 150 151 elif base_counts[2] + base_counts[3] == total: 152 return "mixed_GC" 153 154 else: 155 return "other"
Assign a tail type based on it's base composition
Args: base_counts (tuple): (A, C, G, T)
Returns: str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC, other
def
header_format(self):
169 def header_format(self): 170 ''' 171 Prepare header for printing to file 172 ''' 173 outhdr = '\t'.join(self.header + ["TAIL_LEN", 174 "A", "C", "G", "U", 175 "STRAND", "TYPE", 176 "U_COUNT"]) + '\n' 177 178 return outhdr
Prepare header for printing to file
def
line_format(self):
180 def line_format(self): 181 ''' 182 Prepare line for writing to file 183 ''' 184 return '\t'.join(self.line + [str(self.tail_len()), 185 '\t'.join([str(i) for i in self.count_bases()]), 186 str(self.get_strand()), 187 self.tail_type(), 188 str(self.count_Us())]) + '\n'
Prepare line for writing to file
def
analyser(infile: str, out_name: str, briefmode: bool) -> None:
190def analyser(infile: str, out_name: str, briefmode: bool) -> None: 191 192 f = ExtractFile(infile) 193 f.open_gzip() 194 195 if briefmode: 196 with gzip.open(f"{out_name}.tsv.gz", "wt") as fo: 197 198 fo.write(f.header_format()) 199 200 for line in f: 201 if f.tail_type() != "no_tail": 202 fo.write(f.line_format()) 203 else: 204 with gzip.open(f"{out_name}.tsv.gz", "wt") as fo: 205 206 fo.write(f.header_format()) 207 208 for line in f: 209 fo.write(f.line_format()) 210 211 f.close() 212 213 return None