"""
Classes for streaming tokens/info from files/sparse files etc...
"""
from collections import Counter
from random import shuffle
import re
from functools import partial
import abc
import sys
import os
from scipy import sparse
try:
import MySQLdb
import MySQLdb.cursors
HAS_MYSQLDB = True
except ImportError:
HAS_MYSQLDB = False
import pymongo
from rosetta.parallel.parallel_easy import imap_easy, parallel_apply
from .. import common
from ..common import lazyprop, smart_open, DocIDError
from . import filefilter, text_processors
[docs]class BaseStreamer(object):
"""
Base class...don't use this directly.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
[docs] def info_stream(self, **kwargs):
"""
Abstract method. All derived classes will implement to
return an interator over the text documents with processing
as appropriate.
"""
return
@abc.abstractmethod
def record_stream(self, **kwargs):
return
[docs] def single_stream(self, item, cache_list=None, **kwargs):
"""
Stream a single item from source.
Parameters
----------
item : String
The single item to pull from info and stream.
cache_list : List of strings
Cache these items on every iteration
kwargs : Keyword args
Passed on to self.info_stream
"""
cache_list = [] if cache_list is None else cache_list
# Initialize the cached items as attributes
for cache_item in cache_list:
self.__dict__[cache_item + '_cache'] = []
# Iterate through self.info_stream and pull off required information.
if kwargs:
stream = self.info_stream(**kwargs)
else:
stream = self.info_stream()
for i, info in enumerate(stream):
for cache_item in cache_list:
self.__dict__[cache_item + '_cache'].append(info[cache_item])
yield info[item]
[docs] def token_stream(self, cache_list=None, **kwargs):
"""
Returns an iterator over tokens with possible caching of other info.
Parameters
----------
cache_list : List of strings.
Cache these items as they appear
E.g. self.token_stream('doc_id', 'tokens') caches
info['doc_id'] and info['tokens'] (assuming both are available).
kwargs : Keyword args
Passed on to self.info_stream
"""
return self.single_stream('tokens', cache_list=cache_list, **kwargs)
[docs] def to_vw(self, out_stream=sys.stdout, n_jobs=1, raise_on_bad_id=True,
cache_list=None):
"""
Write our filestream to a VW (Vowpal Wabbit) formatted file.
Parameters
----------
out_stream : stream, buffer or open file object
n_jobs : Integer
number of CPU jobs
cache_list : List of strings.
Cache these items as they appear
E.g. self.token_stream('doc_id', 'tokens') caches
info['doc_id'] and info['tokens'] (assuming both are available).
Notes:
-----
self.info_stream must have a 'doc_id' field, as this is used to index
the lines in the vw file.
"""
assert self.tokenizer, 'tokenizer must be defined to use .to_vw()'
cache_list = [] if cache_list is None else cache_list
# Initialize the cached items as attributes
for cache_item in cache_list:
self.__dict__[cache_item + '_cache'] = []
formatter = text_processors.VWFormatter()
func = partial(_to_sstr, tokenizer=self.tokenizer, formatter=formatter,
raise_on_bad_id=raise_on_bad_id, streamer=self,
cache_list=cache_list)
parallel_apply(func, self.record_stream(), n_jobs,
sep='\n', out_stream=out_stream)
[docs] def to_scipysparse(self, cache_list=None, **kwargs):
"""
Returns a scipy sparse matrix representing the collection of documents
as a bag of words, one (sparse) row per document.
Translation from column to token are stored into the cache
Parameters
----------
cache_list : List of strings.
Cache these items as they appear
E.g. self.token_stream('doc_id', 'tokens') caches
info['doc_id'] and info['tokens'] (assuming both are available).
kwargs : Keyword args
Passed on to self.info_stream
"""
token_col = dict()
n_col = -1
row_ind = list()
col_ind = list()
data = list()
# iterate through each document's tokens list and build up the
# sparse matrix row by row.
for row, tokens in enumerate(self.token_stream(cache_list, **kwargs)):
doc_counts = Counter(tokens)
for token, count in doc_counts.items():
if token not in token_col:
n_col += 1
token_col[token] = n_col
row_ind.append(row)
col_ind.append(token_col[token])
data.append(count)
self.__dict__['token_col_map'] = token_col
return sparse.csr_matrix((data, (row_ind, col_ind)),
shape=(row + 1, n_col + 1))
[docs]class VWStreamer(BaseStreamer):
"""
For streaming from a single VW file. Since the VW file format does not
preserve token order, all tokens are unordered.
"""
def __init__(self, sfile=None, cache_sfile=False,
limit=None, shuffle=False):
"""
Parameters
----------
sfile : File path or buffer
Points to a sparse (VW) formatted file.
cache_sfile : Boolean
If True, cache the sfile in memory. CAREFUL!!!
limit : Integer
Only return this many results
shuffle : Boolean
If True, shuffle paths once (and only once) before streaming
"""
self.sfile = sfile
self.cache_sfile = cache_sfile
self.limit = limit
self.shuffle = shuffle
self.formatter = text_processors.VWFormatter()
if cache_sfile:
self.source = self._cached_stream
self._init_cached_stream()
else:
assert not shuffle, "Can only shuffle a cached stream"
self.source = self._sfile_stream
def _init_cached_stream(self):
records = {}
for record_dict in self._sfile_stream():
doc_id = record_dict['doc_id']
records[doc_id] = record_dict
# Set self.records and self.doc_id
self.records = records
doc_id = records.keys()
if self.shuffle:
shuffle(doc_id)
self.doc_id = doc_id
def _cached_stream(self, doc_id=None):
records = self.records
if doc_id is None:
for i, doc in enumerate(self.doc_id):
record_dict = self.records[doc]
if i == self.limit:
raise StopIteration
yield record_dict
else:
if (self.limit is not None) and self.cache_sfile:
raise ValueError(
"Cannot use both self.limit and doc_id with cached stream")
for doc in doc_id:
yield records[doc]
def _sfile_stream(self, doc_id=None):
"""
Stream record_dict from an sfile that sits on disk.
"""
# Open file if path. If buffer or StringIO, passthrough.
with smart_open(self.sfile, 'rb') as infile:
if doc_id is not None:
doc_id = set(doc_id)
for i, line in enumerate(infile):
if i == self.limit:
raise StopIteration
record_dict = self.formatter.sstr_to_dict(line)
if doc_id is not None:
if record_dict['doc_id'] not in doc_id:
continue
yield record_dict
[docs] def record_stream(self, doc_id=None):
"""
Returns an iterator over record dicts.
Parameters
----------
doc_id : Iterable over Strings
Return record dicts iff doc_id in doc_id
"""
source = self.source(doc_id=doc_id)
for record_dict in source:
yield record_dict
[docs] def info_stream(self, doc_id=None):
"""
Returns an iterator over info dicts.
Parameters
----------
doc_id : Iterable over Strings
Return info dicts iff doc_id in doc_id
"""
# Read record_dict and convert to info by adding tokens
for record_dict in self.record_stream(doc_id):
record_dict['tokens'] = self.formatter._dict_to_tokens(record_dict)
yield record_dict
[docs]class TextFileStreamer(BaseStreamer):
"""
For streaming from text files.
"""
def __init__(
self, text_base_path=None, path_list=None, file_type='*',
name_strip=r'\..*', tokenizer=None, tokenizer_func=None, limit=None,
shuffle=True):
"""
Parameters
----------
text_base_path : string or None
Base path to dir containing files.
path_list : list or None
explicit list of paths to be used
file_type : String
String to filter files with. E.g. '*.txt'.
Note that the filenames will be converted to lowercase before
this comparison.
name_strip : raw string
Regex to strip doc_id.
tokenizer : Subclass of BaseTokenizer
Should have a text_to_token_list method. Try using MakeTokenizer
to convert a function to a valid tokenizer.
tokenizer_func : Function
Transforms a string (representing one file) to a list of strings
(the 'tokens').
limit : int or None
Limit for number of docs processed.
shuffle : Boolean
If True, shuffle paths once (and only once) before streaming
"""
self.text_base_path = text_base_path
self.path_list = path_list
self.file_type = file_type
self.name_strip = name_strip
self.limit = limit
self.tokenizer = tokenizer
self.tokenizer_func = tokenizer_func
self.shuffle = shuffle
assert (text_base_path is None) or (path_list is None)
assert (tokenizer is None) or (tokenizer_func is None)
if tokenizer_func:
self.tokenizer = text_processors.MakeTokenizer(tokenizer_func)
@lazyprop
def paths(self):
"""
Get all paths that we will use.
"""
if self.text_base_path:
paths = filefilter.get_paths(
self.text_base_path, file_type=self.file_type)
if self.shuffle:
shuffle(paths)
if self.limit:
paths = paths[: self.limit]
else:
paths = self.path_list
return paths
@lazyprop
def doc_id(self):
"""
Get doc_id corresponding to all paths.
"""
regex = re.compile(self.name_strip)
doc_id = [
regex.sub('', filefilter.path_to_name(p, strip_ext=False))
for p in self.paths]
return doc_id
@lazyprop
def _doc_id_to_path(self):
"""
Build the dictionary mapping doc_id to path. doc_id is based on
the filename.
"""
return dict(zip(self.doc_id, self.paths))
@lazyprop
def file_stat(self):
"""
Builds a dictionary of os.stats file info. Currently included
last modification time & last access time (seconds since epoch),
size (in bytes).
"""
return [self._file_stat(p) for p in self.paths]
def _file_stat(self, path):
"""
Retrieves os.stats info for file.
"""
return {'mtime': os.path.getmtime(path), 'atime': os.path.getatime(path),
'size': os.path.getsize(path)}
def record_stream(self, paths=None, doc_id=None, limit=None):
return self.info_stream(paths, doc_id, limit)
[docs] def info_stream(self, paths=None, doc_id=None, limit=None):
"""
Returns an iterator over paths yielding dictionaries with information
about the file contained within.
Parameters
----------
paths : list of strings
doc_id : list of strings or ints
limit : Integer
Use limit in place of self.limit.
"""
if limit is None:
limit = self.limit
if doc_id is not None:
paths = [self._doc_id_to_path[str(doc)] for doc in doc_id]
elif paths is None:
paths = self.paths
for index, onepath in enumerate(paths):
if index == limit:
raise StopIteration
with open(onepath, 'r') as f:
text = f.read()
doc_id = re.sub(
self.name_strip, '', filefilter.path_to_name
(onepath, strip_ext=False))
stat_dict = self._file_stat(onepath)
info_dict = {'text': text, 'cached_path': onepath,
'doc_id': doc_id}
info_dict.update(stat_dict)
if self.tokenizer:
info_dict['tokens'] = (
self.tokenizer.text_to_token_list(text))
yield info_dict
[docs] def to_vw(self, outfile, n_jobs=1, chunksize=1000, raise_on_bad_id=True):
"""
Write our filestream to a VW (Vowpal Wabbit) formatted file.
Parameters
----------
outfile : filepath or buffer
n_jobs : Integer
Use n_jobs different jobs to do the processing. Set = 4 for 4
jobs. Set = -1 to use all available, -2 for all except 1,...
chunksize : Integer
Workers process this many jobs at once before pickling and sending
results to master. If this is too low, communication overhead
will dominate. If this is too high, jobs will not be distributed
evenly.
raise_on_bad_id : Boolean
If True, raise DocIDError when the doc_id (formed by self) is not
a valid VW "Tag". I.e. contains :, |, ', or whitespace.
If False, print warning.
"""
# Note: This is similar to rosetta/cmd/files_to_vw.py
# This implementation is more complicated, due to the fact that a
# streamer specifies the method to extract doc_id from a stream.
# To be faithful to the streamer, we must therefore use the streamer
# to stream the files. This requires a combination of imap_easy and
# a chunker.
#
# Create an iterator over chunks of paths
path_group_iter = common.grouper(self.paths, chunksize)
formatter = text_processors.VWFormatter()
func = partial(_group_to_sstr, self, formatter, raise_on_bad_id)
# Process one group at a time...set imap_easy chunksize arg to 1
# since each group contains many paths.
results_iterator = imap_easy(func, path_group_iter, n_jobs, 1)
with smart_open(outfile, 'w') as open_outfile:
for group_results in results_iterator:
for sstr in group_results:
open_outfile.write(sstr + '\n')
[docs]class TextIterStreamer(BaseStreamer):
"""
For streaming text.
"""
def __init__(self, text_iter, tokenizer=None, tokenizer_func=None):
"""
Parameters
----------
text_iter : iterator or iterable of dictionaries
Each dict must contain key:value "text": text_string, but can contain
other metadata key:values for cacheing (ex/ see self.token_stream).
tokenizer : Subclass of BaseTokenizer
Should have a text_to_token_list method. Try using MakeTokenizer
to convert a function to a valid tokenizer.
tokenizer_func : Function
Transforms a string (representing one file) to a list of strings
(the 'tokens').
"""
self.text_iter = text_iter
self.tokenizer = tokenizer
self.tokenizer_func = tokenizer_func
assert (tokenizer is None) or (tokenizer_func is None)
if tokenizer_func:
self.tokenizer = text_processors.MakeTokenizer(tokenizer_func)
[docs] def record_stream(self):
"""
Yields a dict from self.streamer
"""
for info in self.text_iter:
yield info
[docs] def info_stream(self):
"""
Yields a dict from self.streamer as well as "tokens".
"""
for info in self.record_stream():
info['tokens'] = self.tokenizer.text_to_token_list(info['text'])
yield info
[docs]class DBStreamer(BaseStreamer):
"""
Database streamer base class
"""
__metaclass__ = abc.ABCMeta
def __init__(
self, db_setup, tokenizer=None, tokenizer_func=None):
"""
Parameters
----------
db_setup: A dictionary containing parameters needed to connect to, and
query the database. The required parameters are documented in each
subclass, but at minimum you will need information about the host,
username/password, and the query that will be executed. The query
must return a 'text' field in its dictionary.
tokenizer : Subclass of BaseTokenizer
Should have a text_to_token_list method. Try using MakeTokenizer
to convert a function to a valid tokenizer.
tokenizer_func : Function
Transforms a string (representing one file) to a list of strings
(the 'tokens').
"""
self.db_setup = db_setup
self.tokenizer = tokenizer
self.tokenizer_func = tokenizer_func
self.cursor = None
assert (tokenizer is None) or (tokenizer_func is None)
if tokenizer_func:
self.tokenizer = text_processors.MakeTokenizer(tokenizer_func)
@abc.abstractmethod
[docs] def connect(self):
"""
Open connection to database.
sets the classes cursor object.
"""
return
@abc.abstractmethod
[docs] def disconnect(self):
"""
Close connection to database
"""
return
@abc.abstractmethod
[docs] def iterate_over_query(self):
"""
Return an iterator over query result.
We suggest that the entire query result not be returned and that
iteration is controlled on server side, but this method does not
guarantee that. This method must return a dictionary, which at
least has the key 'text' in it, containing the next to be tokenized.
"""
return
def record_stream(self):
for info in self.iterate_over_query():
yield info
[docs] def info_stream(self):
"""
Yields a dict from self.executing the query as well as "tokens".
"""
for info in self.record_stream():
info['tokens'] = self.tokenizer.text_to_token_list(info['text'])
yield info
[docs]class MySQLStreamer(DBStreamer):
"""
Subclass of DBStreamer to connect to a MySQL database and iterate over
query results. db_setup is expected to be a dictionary containing
host, user, password, database, and query. The query itself must return
a column named text.
Example:
db_setup = {}
db_setup['host'] = 'hostname'
db_setup['user'] = 'username'
db_setup['password'] = 'password'
db_setup['database'] = 'database'
db_setup['query'] = 'select
id as doc_id,
body as text
from tablename
where length(body) > 100'
my_tokenizer = TokenizerBasic()
stream = MySQLStreamer(db_setup=db_setup, tokenizer=my_tokenizer)
for text in stream.info_stream(cache_list=['doc_id']):
print text['doc_id'], text['tokens']
"""
def __init__(self, *args, **kwargs):
if not HAS_MYSQLDB:
raise ImportError("MySQLdb was not importable, therefore\
MySQLStreamer cannot be used.")
super(MySQLStreamer, self).__init__(*args, **kwargs)
def connect(self):
try:
_host = self.db_setup['host']
_user = self.db_setup['user']
_password = self.db_setup['password']
_db = self.db_setup['database']
except:
raise common.BadDataError("MySQLStreamer expects db_setup to have \
host, user, password, and database fields")
connection = MySQLdb.connect(
host=_host, user=_user,
passwd=_password, db=_db,
cursorclass=MySQLdb.cursors.SSDictCursor)
self.cursor = connection.cursor()
def disconnect(self):
if self.cursor:
self.cursor.close()
self.cursor = None
def iterate_over_query(self):
if not self.cursor:
self.connect()
try:
_query = self.db_setup['query']
except:
raise common.BadDataError("MySQLStreamer expects db_setup \
to have a query field")
self.cursor.execute(_query)
for result in self.cursor:
if 'text' not in result:
raise common.BadDataError("The query must return a text field")
yield result
[docs]class MongoStreamer(DBStreamer):
"""
Subclass of DBStreamer to connect to a Mongo database and iterate over
query results. db_setup is expected to be a dictionary containing
host, database, collection, query, and text_key. Additionally an optional
limit parameter is allowed.
The query itself must return a column named text_key which is passed on
as 'text' to the iterator.
In addition, because it is difficult to rename mongo fields (similar
to the SQL 'AS' syntax), we allow a translation dictionary to be
passed in, which translates keys in the mongo dictionary result names
k to be passed into the result as v for key value pairs {k : v}.
Currently we don't deal with nested documents.
Example:
db_setup = {}
db_setup['host'] = 'localhost'
db_setup['database'] = 'places'
db_setup['collection'] = 'opentable'
db_setup['query'] = {}
db_setup['limit'] = 5
db_setup['text_key'] = 'desc'
db_setup['translations'] = {'_id' : 'doc_id'}
# In this example, we assume that the collection has a field named
# desc, holding the text to be analyzed, and a field named _id which
# will be translated to doc_id and stored in the cache.
my_tokenizer = TokenizerBasic()
stream = MongoStreamer(db_setup=db_setup, tokenizer=my_tokenizer)
for text in stream.info_stream(cache_list=['doc_id']):
print text['doc_id'], text['tokens']
"""
def connect(self):
try:
_host = self.db_setup['host']
_db = self.db_setup['database']
_col = self.db_setup['collection']
if 'port' in self.db_setup:
_port = self.db_setup['port']
else:
_port = None
except:
raise common.BadDataError("MongoStreamer expects db_setup to have \
host and database fields")
client = pymongo.MongoClient(_host, _port)
db = client[_db]
col = db[_col]
self.cursor = col
def disconnect(self):
self.cursor = None
def iterate_over_query(self):
if not self.cursor:
self.connect()
try:
_query = self.db_setup['query']
_text_key = self.db_setup['text_key']
if 'limit' in self.db_setup:
_limit = self.db_setup['limit']
else:
_limit = None
if 'translations' in self.db_setup:
_translate = self.db_setup['translations']
else:
_translate = None
except:
raise common.BadDataError("MySQLStreamer expects db_setup \
to have a query and text_key field")
results = self.cursor.find(_query)
if _limit:
results = results.limit(_limit)
for result in results:
if _text_key not in result:
raise common.BadDataError("The query must return the \
specified text field")
result['text'] = result[_text_key]
if _translate:
for k, v in _translate.items():
result[v] = result[k]
yield result
def _group_to_sstr(streamer, formatter, raise_on_bad_id, path_group):
"""
Return a list of sstr's (sparse string representations). One for every
path in path_group.
"""
# grouper might append None to the last group if this one is shorter
path_group = (p for p in path_group if p is not None)
group_results = []
info_stream = streamer.info_stream(paths=path_group)
for info_dict in info_stream:
doc_id = info_dict['doc_id']
tokens = info_dict['tokens']
feature_values = Counter(tokens)
try:
tok_sstr = formatter.get_sstr(
feature_values, importance=1, doc_id=doc_id)
except DocIDError as e:
msg = e.message + "\npath = %s\n" % info_dict['cached_path']
if raise_on_bad_id:
raise DocIDError(msg)
else:
msg = "WARNING: " + msg
sys.stderr.write(msg)
group_results.append(tok_sstr)
return group_results
def _to_sstr(record_dict, tokenizer, formatter, raise_on_bad_id,
streamer, cache_list=None):
"""
Yield a list of sstr's (sparse string representations); takes yield
instances of STREAMER.record_stream(), tokenizes the text, get 'doc_id'
and returns sstr.
"""
cache_list = [] if cache_list is None else cache_list
doc_id = str(record_dict['doc_id'])
tokens = tokenizer.text_to_token_list(record_dict['text'])
feature_values = Counter(tokens)
try:
tok_sstr = formatter.get_sstr(
feature_values, importance=1, doc_id=doc_id)
for cache_item in cache_list:
streamer.__dict__[cache_item + '_cache'].append(
record_dict[cache_item])
except DocIDError as e:
msg = e.message + "\doc_id = %s\n" % info_dict['doc_id']
if raise_on_bad_id:
raise DocIDError(msg)
else:
msg = "WARNING: " + msg
sys.stderr.write(msg)
return tok_sstr