#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
text file IO utility module.
By default, it use "utf-8" encoding.
"""
from __future__ import print_function
import os
try:
import chardet
except ImportError as e:
import sys
err_msg = ("Warning: '%s', ``smartread`` method is not available. "
"install 'chardet' to activate this feature.") % e
sys.stderr.write(err_msg)
try:
from .py23 import int_type
except:
from dataIO.py23 import int_type
[docs]def write(s, path, encoding="utf-8"):
"""Write string to text file.
"""
with open(path, "wb") as f:
f.write(s.encode(encoding))
[docs]def writebytes(b, path):
"""Write binary to file.
"""
with open(path, "wb") as f:
f.write(b)
[docs]def read(path, encoding="utf-8"):
"""Read string from text file.
"""
with open(path, "rb") as f:
return f.read().decode(encoding)
[docs]def readbytes(path):
"""Read binary from file.
"""
with open(path, "rb") as f:
return f.read()
[docs]def smartread(path):
"""Read text from file, automatically detect encoding. ``chardet`` required.
"""
with open(path, "rb") as f:
content = f.read()
result = chardet.detect(content)
return content.decode(result["encoding"])
[docs]def to_utf8(path, output_path=None):
"""Convert any text file to utf8 encoding.
"""
if output_path is None:
basename, ext = os.path.splitext(path)
output_path = basename + "-UTF8Encode" + ext
text = smartread(path)
write(text, output_path)
#--- Text file line reader ---
def no_strip(s):
return s
def left_strip(s):
return s.lstrip()
def right_strip(s):
return s.rstrip()
def both_strip(s):
return s.strip()
_strip_method_mapping = {
"none": no_strip,
"left": left_strip,
"right": right_strip,
"both": both_strip,
}
[docs]def readlines(path, encoding="utf-8", skiplines=None, nlines=None, strip='right'):
"""skip n lines and fetch the next n lines.
:param skiplines: default None, skip first n lines
:param nlines: default None, yield next n lines
:param strip: default None, available option 'left', 'right', 'both'
"""
strip_method = str(strip).lower()
if strip_method in _strip_method_mapping:
strip_func = _strip_method_mapping[strip_method]
else:
raise ValueError("'strip' keyword has to be one of "
"None, 'left', 'right', 'both'.")
with open(path, "rb") as file:
if skiplines:
for _ in range(skiplines):
next(file)
if nlines:
for _ in range(nlines):
yield strip_func(next(file).decode(encoding))
else:
for line in file:
yield strip_func(line.decode(encoding))
[docs]def readchunks(path, encoding="utf-8", skiplines=None, chunksize=None, strip='right'):
"""skip n lines and fetch the next n lines as a chunk, and repeat fetching.
:param skiplines: default None, skip first n lines
:param chunksize: default None (size-1 chunk), lines chunk size
:param strip: default None, avaliable option 'left', 'right', 'both'
"""
strip_method = str(strip).lower()
if strip_method in _strip_method_mapping:
strip_func = _strip_method_mapping[strip_method]
else:
raise ValueError("'strip' keyword has to be one of "
"None, 'left', 'right', 'both'.")
with open(path, "rb") as file:
if skiplines:
for _ in range(skiplines):
next(file)
if chunksize is None:
chunksize = 1
elif not isinstance(chunksize, int_type):
raise ValueError("'chunksize' has to be None or an integer.")
chunk = list()
while 1:
for _ in range(chunksize):
chunk.append(strip_func(next(file).decode(encoding)))
if len(chunk) < chunksize:
break
yield chunk
chunk = list()
yield chunk