Source code for dataIO.textfile

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
text file IO utility module.

By default, it use "utf-8" encoding.
"""

from __future__ import print_function

import os

try:
    import chardet
except ImportError as e:
    import sys    
    err_msg = ("Warning: '%s', ``smartread`` method is not available. " 
               "install 'chardet' to activate this feature.") % e
    sys.stderr.write(err_msg)

try:
    from .py23 import int_type
except:
    from dataIO.py23 import int_type


[docs]def write(s, path, encoding="utf-8"): """Write string to text file. """ with open(path, "wb") as f: f.write(s.encode(encoding))
[docs]def writebytes(b, path): """Write binary to file. """ with open(path, "wb") as f: f.write(b)
[docs]def read(path, encoding="utf-8"): """Read string from text file. """ with open(path, "rb") as f: return f.read().decode(encoding)
[docs]def readbytes(path): """Read binary from file. """ with open(path, "rb") as f: return f.read()
[docs]def smartread(path): """Read text from file, automatically detect encoding. ``chardet`` required. """ with open(path, "rb") as f: content = f.read() result = chardet.detect(content) return content.decode(result["encoding"])
[docs]def to_utf8(path, output_path=None): """Convert any text file to utf8 encoding. """ if output_path is None: basename, ext = os.path.splitext(path) output_path = basename + "-UTF8Encode" + ext text = smartread(path) write(text, output_path) #--- Text file line reader ---
def no_strip(s): return s def left_strip(s): return s.lstrip() def right_strip(s): return s.rstrip() def both_strip(s): return s.strip() _strip_method_mapping = { "none": no_strip, "left": left_strip, "right": right_strip, "both": both_strip, }
[docs]def readlines(path, encoding="utf-8", skiplines=None, nlines=None, strip='right'): """skip n lines and fetch the next n lines. :param skiplines: default None, skip first n lines :param nlines: default None, yield next n lines :param strip: default None, available option 'left', 'right', 'both' """ strip_method = str(strip).lower() if strip_method in _strip_method_mapping: strip_func = _strip_method_mapping[strip_method] else: raise ValueError("'strip' keyword has to be one of " "None, 'left', 'right', 'both'.") with open(path, "rb") as file: if skiplines: for _ in range(skiplines): next(file) if nlines: for _ in range(nlines): yield strip_func(next(file).decode(encoding)) else: for line in file: yield strip_func(line.decode(encoding))
[docs]def readchunks(path, encoding="utf-8", skiplines=None, chunksize=None, strip='right'): """skip n lines and fetch the next n lines as a chunk, and repeat fetching. :param skiplines: default None, skip first n lines :param chunksize: default None (size-1 chunk), lines chunk size :param strip: default None, avaliable option 'left', 'right', 'both' """ strip_method = str(strip).lower() if strip_method in _strip_method_mapping: strip_func = _strip_method_mapping[strip_method] else: raise ValueError("'strip' keyword has to be one of " "None, 'left', 'right', 'both'.") with open(path, "rb") as file: if skiplines: for _ in range(skiplines): next(file) if chunksize is None: chunksize = 1 elif not isinstance(chunksize, int_type): raise ValueError("'chunksize' has to be None or an integer.") chunk = list() while 1: for _ in range(chunksize): chunk.append(strip_func(next(file).decode(encoding))) if len(chunk) < chunksize: break yield chunk chunk = list() yield chunk