Source code for rosetta.text.gensim_helpers

"""
Helper objects/functions specifically for use with Gensim.
"""
import pandas as pd
from gensim import corpora

from .. import common


[docs]class StreamerCorpus(object): """ A "corpus type" object built with token streams and dictionaries. Depending on your method for streaming tokens, this could be slow... Before modeling, it's usually better to serialize this corpus using: self.to_corpus_plus(fname) or gensim.corpora.SvmLightCorpus.serialize(path, self) """ def __init__(self, streamer, dictionary, doc_id=None, limit=None): """ Stream token lists from pre-defined path lists. Parameters ---------- streamer : Streamer compatible object. Method streamer.token_stream() returns a stream of lists of words. dictionary : gensim.corpora.Dictionary object doc_id : Iterable over strings Limit all streaming results to docs with these doc_ids limit : Integer Limit all streaming results to this many """ self.streamer = streamer self.dictionary = dictionary self.doc_id = doc_id self.limit = limit def __iter__(self): """ Returns an iterator of "corpus type" over text files. """ token_stream = self.streamer.token_stream( doc_id=self.doc_id, limit=self.limit, cache_list=['doc_id']) for token_list in token_stream: yield self.dictionary.doc2bow(token_list)
[docs] def serialize(self, fname): """ Save to svmlight (plus) format, generating files: fname, fname.index, fname.doc_id """ # Make the corpus and .index file corpora.SvmLightCorpus.serialize(fname, self) # Make the .doc_id file # Streamer cached the doc_id while streaming with open(fname + '.doc_id', 'w') as f: f.write('\n'.join(self.streamer.doc_id_cache))
[docs]class SvmLightPlusCorpus(corpora.SvmLightCorpus): """ Extends gensim.corpora.SvmLightCorpus, providing methods to work with (e.g. filter by) doc_ids. """ def __init__(self, fname, doc_id=None, doc_id_filter=None, limit=None): """ Parameters ---------- fname : Path Contains the .svmlight bag-of-words text file doc_id : Iterable Stream these doc_ids exactly, in the order given. doc_id_filter : Iterable Stream doc_ids in intersection of fname.doc_id and doc_id_filter limit : Integer Equivalent to initializing with the first limit rows of fname and fname.doc_id. """ corpora.SvmLightCorpus.__init__(self, fname) self.limit = limit # All possible doc_id in the corpus self.doc_id_all = common.get_list_from_filerows(fname + '.doc_id') self.doc_id_all = self.doc_id_all[: limit] self.doc_id_all_set = set(self.doc_id_all) # Set self.doc_id if doc_id_filter is not None: assert doc_id is None, "Can't pass both doc_id and doc_id_filter" self.doc_id = [ id for id in doc_id_filter if str(id) in self.doc_id_all_set] elif doc_id is not None: self.doc_id = doc_id else: self.doc_id = self.doc_id_all @property def doc_id(self): return self._doc_id @doc_id.setter def doc_id(self, iterable): # Called whenever you set self.doc_id = something self._doc_id = [str(id) for id in iterable] self.doc_id_set = set(self._doc_id) if not self.doc_id_set.issubset(self.doc_id_all_set): raise ValueError( "Attempt to set self.doc_id to values not contained in the" " corpus .doc_id file") def __iter__(self): """ Returns a gensim-compatible corpus. Parameters ---------- doc_id : Iterable over Strings Return info dicts iff doc_id in doc_id """ base_iterable = corpora.SvmLightCorpus.__iter__(self) for i, row in enumerate(base_iterable): if i == self.limit: raise StopIteration if self.doc_id_all[i] in self.doc_id_set: yield row
[docs] def serialize(self, fname, **kwargs): """ Save to svmlight (plus) format, generating files: fname, fname.index, fname.doc_id Parameters ---------- fname : String Path to save the bag-of-words file at kwargs : Additional keyword arguments Passed to SvmLightCorpus.serialize """ # Make the corpus and .index file corpora.SvmLightCorpus.serialize(fname, self, **kwargs) # Make the .doc_id file with open(fname + '.doc_id', 'w') as f: f.write('\n'.join(self.streamer.doc_id))
@classmethod
[docs] def from_streamer_dict( self, streamer, dictionary, fname, doc_id=None, limit=None): """ Initialize from a Streamer and gensim.corpora.dictionary, serializing the corpus (to disk) in SvmLightPlus format, then returning a SvmLightPlusCorpus. Parameters ---------- streamer : Streamer compatible object. Method streamer.token_stream() returns a stream of lists of words. dictionary : gensim.corpora.Dictionary object fname : String Path to save the bag-of-words file at doc_id : Iterable over strings Limit all streaming results to docs with these doc_ids limit : Integer Limit all streaming results to this many Returns ------- corpus : SvmLightCorpus """ streamer_corpus = StreamerCorpus( streamer, dictionary, doc_id=doc_id, limit=limit) streamer_corpus.serialize(fname) return SvmLightPlusCorpus(fname, doc_id=doc_id, limit=limit)
[docs]def get_words_docfreq(dictionary): """ Returns a df with token id, doc freq as columns and words as index. """ id2token = dict(dictionary.items()) words_df = pd.DataFrame( {id2token[tokenid]: [tokenid, docfreq] for tokenid, docfreq in dictionary.dfs.iteritems()}, index=['tokenid', 'docfreq']).T words_df = words_df.sort_index(by='docfreq', ascending=False) return words_df
[docs]def get_topics_df(corpus, lda): """ Creates a delimited file with doc_id and topics scores. """ topics_df = pd.concat( (pd.Series(dict(doc)) for doc in lda[corpus]), axis=1).fillna(0).T topics_df = topics_df.rename( columns={i: 'topic_' + str(i) for i in topics_df.columns}) if hasattr(corpus, 'doc_id'): topics_df.index = corpus.doc_id topics_df.index.name = 'doc_id' return topics_df