modules.analyse

  1import io, gzip
  2from dataclasses import dataclass, astuple
  3
  4'''
  5analyse.py, handles the analysis of 'extract' output
  6'''
  7
  8'''
  9What this needs to do:
 10    * calculate tail length             DONE
 11    * count nucleotides                 DONE
 12    * assign strand                     DONE
 13    * assign tail type:                 DONE
 14        all G or C -> G/C               DONE
 15        Empty (len = 0) -> no_tail      DONE
 16        all T -> polyU                  DONE
 17        all A -> polyA                  DONE
 18        A or T -> mixed_AU              DONE
 19        otherwise -> other              DONE
 20    * Count terminal Us (Ts)            DONE
 21'''
 22
 23class ExtractFile:
 24    def __init__(self, path: str):
 25        self.path = path
 26
 27    #
 28    # File handling methods
 29    #
 30
 31    def _get_header(self):
 32        '''
 33        Internal method, used for reading the header of input text file.
 34
 35        Args:
 36            handle: file handle generated by one of the open(file, 'r') variants, eg. gzip.open(file, 'rt')
 37
 38        Returns:
 39            list[str, ...]: header as a \t separated list of strings
 40        '''
 41        return self.handle.readline().strip().split('\t')
 42
 43    def open_gzip(self):
 44        '''
 45        Opens a gzipped file for reading as text and stores its 1st line (header) separately
 46        '''
 47        self.handle = gzip.open(self.path, "rt")
 48        self.header = self._get_header()
 49
 50    def open_plain(self):
 51        '''
 52        Opens a plain text file for reading and stores its 1st line (header) separately
 53        '''
 54        self.handle = open(self.path, 'r')
 55        self.header = self._get_header()
 56
 57    def close(self):
 58        '''
 59        For closing the file after reading
 60        '''
 61        self.handle.close()
 62
 63    def _line_process(self, line):
 64        '''
 65        Used for processing the input lines into manageable data structures (only lists for now)
 66        '''
 67        return line.strip().split('\t')
 68
 69    def __iter__(self):
 70        '''
 71        Return object iterator
 72        '''
 73        return self
 74
 75    def __next__(self):
 76        '''
 77        Define how to get the next element, i.e. line
 78        '''
 79        line = self.handle.readline()
 80        self.line = self._line_process(self.handle.readline())
 81        if line:
 82            self.line = self._line_process(line)
 83        else:
 84            raise StopIteration
 85
 86    #
 87    # File processing/analysis methods
 88    #
 89
 90    def tail_len(self):
 91        '''
 92        Calculate tail length (CLIP3 field)
 93        '''
 94        if self.line[5] != "NA":
 95            return len(self.line[5])
 96        else:
 97            return 0
 98
 99    def count_bases(self):
100        '''
101        Count bases for 3' tail sequence
102        '''
103        tail_seq = self.line[5]
104
105        if tail_seq != "NA":
106            return tuple([tail_seq.count('A'),
107                          tail_seq.count('C'),
108                          tail_seq.count('G'),
109                          tail_seq.count('T')])
110        else:
111            return tuple([0, 0, 0, 0])
112
113    def get_strand(self):
114        '''
115        Get mapping strand from SAM flags. Flags are stored as `int:12-bit`
116        format, eg. 123:000001111011.
117
118        Outputs the Entrez strand convention, i.e. + = 1, - = -1
119        '''
120        flags = bin(int(self.line[1].split(':')[0]))
121
122        # 5th bit is "REVERSE - SEQ is reverse complemented"
123        return -1 if flags[-5] else 1
124
125    def tail_type(self) -> str:
126        '''
127        Assign a tail type based on it's base composition
128
129        Args:
130            base_counts (tuple): (A, C, G, T)
131
132        Returns:
133            str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC,
134                other
135        '''
136        base_counts = self.count_bases()
137        total = sum(base_counts)
138        if total == 0:
139            return "no_tail"
140
141        elif base_counts[3] == total:
142            return "polyU"
143
144        elif base_counts[0] == total:
145            return "polyA"
146
147        elif base_counts[0] + base_counts[3] == total:
148            return "mixed_AU"
149
150        elif base_counts[2] + base_counts[3] == total:
151            return "mixed_GC"
152
153        else:
154            return "other"
155
156    def count_Us(self):
157        tail = self.line[5]
158        u_count = 0
159
160        for i in range(1, len(tail) + 1):
161            if tail[-i] == 'T':
162                u_count += 1
163            else:
164                break
165
166        return u_count
167
168    def header_format(self):
169        '''
170        Prepare header for printing to file
171        '''
172        outhdr = '\t'.join(self.header + ["TAIL_LEN", 
173                                          "A", "C", "G", "U",
174                                          "STRAND", "TYPE",
175                                          "U_COUNT"]) + '\n'
176
177        return outhdr
178
179    def line_format(self):
180        '''
181        Prepare line for writing to file
182        '''
183        return '\t'.join(self.line + [str(self.tail_len()),
184                                      '\t'.join([str(i) for i in self.count_bases()]),
185                                      str(self.get_strand()),
186                                      self.tail_type(),
187                                      str(self.count_Us())]) + '\n'
188
189def analyser(infile: str, out_name: str, briefmode: bool) -> None:
190
191    f = ExtractFile(infile)
192    f.open_gzip()
193
194    if briefmode:
195        with gzip.open(f"{out_name}.tsv.gz", "wt") as fo:
196
197            fo.write(f.header_format())
198
199            for line in f:
200                if f.tail_type() != "no_tail":
201                    fo.write(f.line_format())
202    else:
203        with gzip.open(f"{out_name}.tsv.gz", "wt") as fo:
204
205            fo.write(f.header_format())
206
207            for line in f:
208                fo.write(f.line_format())
209
210    f.close()
211
212    return None
class ExtractFile:
 24class ExtractFile:
 25    def __init__(self, path: str):
 26        self.path = path
 27
 28    #
 29    # File handling methods
 30    #
 31
 32    def _get_header(self):
 33        '''
 34        Internal method, used for reading the header of input text file.
 35
 36        Args:
 37            handle: file handle generated by one of the open(file, 'r') variants, eg. gzip.open(file, 'rt')
 38
 39        Returns:
 40            list[str, ...]: header as a \t separated list of strings
 41        '''
 42        return self.handle.readline().strip().split('\t')
 43
 44    def open_gzip(self):
 45        '''
 46        Opens a gzipped file for reading as text and stores its 1st line (header) separately
 47        '''
 48        self.handle = gzip.open(self.path, "rt")
 49        self.header = self._get_header()
 50
 51    def open_plain(self):
 52        '''
 53        Opens a plain text file for reading and stores its 1st line (header) separately
 54        '''
 55        self.handle = open(self.path, 'r')
 56        self.header = self._get_header()
 57
 58    def close(self):
 59        '''
 60        For closing the file after reading
 61        '''
 62        self.handle.close()
 63
 64    def _line_process(self, line):
 65        '''
 66        Used for processing the input lines into manageable data structures (only lists for now)
 67        '''
 68        return line.strip().split('\t')
 69
 70    def __iter__(self):
 71        '''
 72        Return object iterator
 73        '''
 74        return self
 75
 76    def __next__(self):
 77        '''
 78        Define how to get the next element, i.e. line
 79        '''
 80        line = self.handle.readline()
 81        self.line = self._line_process(self.handle.readline())
 82        if line:
 83            self.line = self._line_process(line)
 84        else:
 85            raise StopIteration
 86
 87    #
 88    # File processing/analysis methods
 89    #
 90
 91    def tail_len(self):
 92        '''
 93        Calculate tail length (CLIP3 field)
 94        '''
 95        if self.line[5] != "NA":
 96            return len(self.line[5])
 97        else:
 98            return 0
 99
100    def count_bases(self):
101        '''
102        Count bases for 3' tail sequence
103        '''
104        tail_seq = self.line[5]
105
106        if tail_seq != "NA":
107            return tuple([tail_seq.count('A'),
108                          tail_seq.count('C'),
109                          tail_seq.count('G'),
110                          tail_seq.count('T')])
111        else:
112            return tuple([0, 0, 0, 0])
113
114    def get_strand(self):
115        '''
116        Get mapping strand from SAM flags. Flags are stored as `int:12-bit`
117        format, eg. 123:000001111011.
118
119        Outputs the Entrez strand convention, i.e. + = 1, - = -1
120        '''
121        flags = bin(int(self.line[1].split(':')[0]))
122
123        # 5th bit is "REVERSE - SEQ is reverse complemented"
124        return -1 if flags[-5] else 1
125
126    def tail_type(self) -> str:
127        '''
128        Assign a tail type based on it's base composition
129
130        Args:
131            base_counts (tuple): (A, C, G, T)
132
133        Returns:
134            str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC,
135                other
136        '''
137        base_counts = self.count_bases()
138        total = sum(base_counts)
139        if total == 0:
140            return "no_tail"
141
142        elif base_counts[3] == total:
143            return "polyU"
144
145        elif base_counts[0] == total:
146            return "polyA"
147
148        elif base_counts[0] + base_counts[3] == total:
149            return "mixed_AU"
150
151        elif base_counts[2] + base_counts[3] == total:
152            return "mixed_GC"
153
154        else:
155            return "other"
156
157    def count_Us(self):
158        tail = self.line[5]
159        u_count = 0
160
161        for i in range(1, len(tail) + 1):
162            if tail[-i] == 'T':
163                u_count += 1
164            else:
165                break
166
167        return u_count
168
169    def header_format(self):
170        '''
171        Prepare header for printing to file
172        '''
173        outhdr = '\t'.join(self.header + ["TAIL_LEN", 
174                                          "A", "C", "G", "U",
175                                          "STRAND", "TYPE",
176                                          "U_COUNT"]) + '\n'
177
178        return outhdr
179
180    def line_format(self):
181        '''
182        Prepare line for writing to file
183        '''
184        return '\t'.join(self.line + [str(self.tail_len()),
185                                      '\t'.join([str(i) for i in self.count_bases()]),
186                                      str(self.get_strand()),
187                                      self.tail_type(),
188                                      str(self.count_Us())]) + '\n'
ExtractFile(path: str)
25    def __init__(self, path: str):
26        self.path = path
path
def open_gzip(self):
44    def open_gzip(self):
45        '''
46        Opens a gzipped file for reading as text and stores its 1st line (header) separately
47        '''
48        self.handle = gzip.open(self.path, "rt")
49        self.header = self._get_header()

Opens a gzipped file for reading as text and stores its 1st line (header) separately

def open_plain(self):
51    def open_plain(self):
52        '''
53        Opens a plain text file for reading and stores its 1st line (header) separately
54        '''
55        self.handle = open(self.path, 'r')
56        self.header = self._get_header()

Opens a plain text file for reading and stores its 1st line (header) separately

def close(self):
58    def close(self):
59        '''
60        For closing the file after reading
61        '''
62        self.handle.close()

For closing the file after reading

def tail_len(self):
91    def tail_len(self):
92        '''
93        Calculate tail length (CLIP3 field)
94        '''
95        if self.line[5] != "NA":
96            return len(self.line[5])
97        else:
98            return 0

Calculate tail length (CLIP3 field)

def count_bases(self):
100    def count_bases(self):
101        '''
102        Count bases for 3' tail sequence
103        '''
104        tail_seq = self.line[5]
105
106        if tail_seq != "NA":
107            return tuple([tail_seq.count('A'),
108                          tail_seq.count('C'),
109                          tail_seq.count('G'),
110                          tail_seq.count('T')])
111        else:
112            return tuple([0, 0, 0, 0])

Count bases for 3' tail sequence

def get_strand(self):
114    def get_strand(self):
115        '''
116        Get mapping strand from SAM flags. Flags are stored as `int:12-bit`
117        format, eg. 123:000001111011.
118
119        Outputs the Entrez strand convention, i.e. + = 1, - = -1
120        '''
121        flags = bin(int(self.line[1].split(':')[0]))
122
123        # 5th bit is "REVERSE - SEQ is reverse complemented"
124        return -1 if flags[-5] else 1

Get mapping strand from SAM flags. Flags are stored as int:12-bit format, eg. 123:000001111011.

Outputs the Entrez strand convention, i.e. + = 1, - = -1

def tail_type(self) -> str:
126    def tail_type(self) -> str:
127        '''
128        Assign a tail type based on it's base composition
129
130        Args:
131            base_counts (tuple): (A, C, G, T)
132
133        Returns:
134            str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC,
135                other
136        '''
137        base_counts = self.count_bases()
138        total = sum(base_counts)
139        if total == 0:
140            return "no_tail"
141
142        elif base_counts[3] == total:
143            return "polyU"
144
145        elif base_counts[0] == total:
146            return "polyA"
147
148        elif base_counts[0] + base_counts[3] == total:
149            return "mixed_AU"
150
151        elif base_counts[2] + base_counts[3] == total:
152            return "mixed_GC"
153
154        else:
155            return "other"

Assign a tail type based on it's base composition

Args: base_counts (tuple): (A, C, G, T)

Returns: str: tail type, one of: no_tail, polyU, polyA, mixed_AU, mixed_GC, other

def count_Us(self):
157    def count_Us(self):
158        tail = self.line[5]
159        u_count = 0
160
161        for i in range(1, len(tail) + 1):
162            if tail[-i] == 'T':
163                u_count += 1
164            else:
165                break
166
167        return u_count
def header_format(self):
169    def header_format(self):
170        '''
171        Prepare header for printing to file
172        '''
173        outhdr = '\t'.join(self.header + ["TAIL_LEN", 
174                                          "A", "C", "G", "U",
175                                          "STRAND", "TYPE",
176                                          "U_COUNT"]) + '\n'
177
178        return outhdr

Prepare header for printing to file

def line_format(self):
180    def line_format(self):
181        '''
182        Prepare line for writing to file
183        '''
184        return '\t'.join(self.line + [str(self.tail_len()),
185                                      '\t'.join([str(i) for i in self.count_bases()]),
186                                      str(self.get_strand()),
187                                      self.tail_type(),
188                                      str(self.count_Us())]) + '\n'

Prepare line for writing to file

def analyser(infile: str, out_name: str, briefmode: bool) -> None:
190def analyser(infile: str, out_name: str, briefmode: bool) -> None:
191
192    f = ExtractFile(infile)
193    f.open_gzip()
194
195    if briefmode:
196        with gzip.open(f"{out_name}.tsv.gz", "wt") as fo:
197
198            fo.write(f.header_format())
199
200            for line in f:
201                if f.tail_type() != "no_tail":
202                    fo.write(f.line_format())
203    else:
204        with gzip.open(f"{out_name}.tsv.gz", "wt") as fo:
205
206            fo.write(f.header_format())
207
208            for line in f:
209                fo.write(f.line_format())
210
211    f.close()
212
213    return None