Source code for tacl.results

"""Module containing the Results class."""

import csv
import logging
import os
import re
import tempfile

import pandas as pd

from . import constants
from .text import Text, FilteredWitnessText


DELETE_FIELDNAME = 'delete'


[docs]class Results: """Class representing a set of n-gram results. Provides methods for manipulating those results in ways that maintain the same structure (CSV, same field names). Those methods that modify the fields do so in relatively minor ways that often allow for the other methods to still operate on the results. A method's modifications to the field names, if any, are specified in that method's docstring. """ def __init__(self, matches, tokenizer): self._logger = logging.getLogger(__name__) self._matches = pd.read_csv(matches, encoding='utf-8', na_filter=False) # Work around a problem with CSV files produced on Windows # being read by pandas and creating an empty row for each # actual row. self._matches = self._matches.dropna(how='all') self._tokenizer = tokenizer
[docs] def add_label_count(self): """Adds to each result row a count of the number occurrences of that n-gram across all works within the label. This count uses the highest witness count for each work. """ self._logger.info('Adding label count') self._matches.loc[:, constants.LABEL_COUNT_FIELDNAME] = 0 def add_label_count(df): # For each n-gram and label pair, we need the maximum count # among all witnesses to each work, and then the sum of those # across all works. work_maxima = df.groupby(constants.WORK_FIELDNAME, sort=False).max() df.loc[:, constants.LABEL_COUNT_FIELDNAME] = work_maxima[ constants.COUNT_FIELDNAME].sum() return df self._matches = self._matches.groupby( [constants.LABEL_FIELDNAME, constants.NGRAM_FIELDNAME], sort=False).apply(add_label_count) self._logger.info('Finished adding label count')
def _annotate_bifurcated_extend_data(self, row, smaller, larger, tokenize, join): """Returns `row` annotated with whether it should be deleted or not. An n-gram is marked for deletion if: * its label count is 1 and its constituent (n-1)-grams also have a label count of 1; or * there is a containing (n+1)-gram that has the same label count. :param row: row of witness n-grams to annotate :type row: `pandas.Series` :param smaller: rows of (n-1)-grams for this witness :type smaller: `pandas.DataFrame` :param larger: rows of (n+1)-grams for this witness :type larger: `pandas.DataFrame` :param tokenize: function to tokenize an n-gram :param join: function to join tokens :rtype: `pandas.Series` """ lcf = constants.LABEL_COUNT_FIELDNAME nf = constants.NGRAM_FIELDNAME # Escape the n-gram because str.contains interprets the supplied # string as a regular expression. ngram = row[constants.NGRAM_FIELDNAME] escaped_ngram = re.escape(ngram) label_count = row[constants.LABEL_COUNT_FIELDNAME] if label_count == 1 and not smaller.empty: # Keep a result with a label count of 1 if its # constituents do not also have a count of 1. ngram_tokens = tokenize(ngram) sub_ngram1 = join(ngram_tokens[:-1]) sub_ngram2 = join(ngram_tokens[1:]) pattern = FilteredWitnessText.get_filter_ngrams_pattern( [sub_ngram1, sub_ngram2]) if smaller[smaller[constants.NGRAM_FIELDNAME].str.match( pattern, as_indexer=True)][ constants.LABEL_COUNT_FIELDNAME].max() == 1: row[DELETE_FIELDNAME] = True elif not larger.empty and larger[larger[nf].str.contains( escaped_ngram)][lcf].max() == label_count: # Remove a result if the label count of a containing # n-gram is equal to its label count. row[DELETE_FIELDNAME] = True return row
[docs] def bifurcated_extend(self, corpus, max_size): """Replaces the results with those n-grams that contain any of the original n-grams, and that represent points at which an n-gram is a constituent of multiple larger n-grams with a lower label count. :param corpus: corpus of works to which results belong :type corpus: `Corpus` :param max_size: maximum size of n-gram results to include :type max_size: `int` """ temp_fd, temp_path = tempfile.mkstemp(text=True) try: self._prepare_bifurcated_extend_data(corpus, max_size, temp_path, temp_fd) finally: try: os.remove(temp_path) except OSError as e: msg = ('Failed to remove temporary file containing unreduced ' 'results: {}') self._logger.error(msg.format(e)) self._bifurcated_extend()
def _bifurcated_extend(self): self._matches.loc[:, DELETE_FIELDNAME] = False tokenize = self._tokenizer.tokenize join = self._tokenizer.joiner.join new_results = [] group_cols = [constants.WORK_FIELDNAME, constants.SIGLUM_FIELDNAME, constants.SIZE_FIELDNAME] grouped = self._matches.groupby(group_cols, sort=False) for (work, siglum, size), group in grouped: try: smaller_grams = grouped.get_group((work, siglum, size-1)) except KeyError: smaller_grams = pd.DataFrame() try: larger_grams = grouped.get_group((work, siglum, size+1)) except KeyError: larger_grams = pd.DataFrame() group = group.apply(self._annotate_bifurcated_extend_data, axis=1, args=(smaller_grams, larger_grams, tokenize, join)) new_results.append(group[~group['delete']]) all_cols = constants.QUERY_FIELDNAMES + \ [constants.LABEL_COUNT_FIELDNAME, DELETE_FIELDNAME] self._matches = pd.concat(new_results, ignore_index=True).reindex( columns=all_cols) del self._matches[DELETE_FIELDNAME]
[docs] def collapse_witnesses(self): """Groups together witnesses for the same n-gram and work that has the same count, and outputs a single row for each group. This output replaces the siglum field with a sigla field that provides a space-separated list of the witness sigla. Due to this, it is not necessarily possible to run other Results methods on results that have had their witnesses collapsed. """ # In order to allow for additional columns to be present in # the input data (such as label count), copy the siglum # information into a new final column, then put the sigla # information into the siglum field and finally rename it. # # This means that in merge_sigla below, the column names are # reversed from what would be expected. self._matches.loc[:, constants.SIGLA_FIELDNAME] = \ self._matches[constants.SIGLUM_FIELDNAME] # This code makes the not unwarranted assumption that the same # n-gram means the same size and that the same work means the # same label. grouped = self._matches.groupby( [constants.WORK_FIELDNAME, constants.NGRAM_FIELDNAME, constants.COUNT_FIELDNAME], sort=False) def merge_sigla(df): # Take the first result row; only the siglum should differ # between them, and there may only be one row. merged = df[0:1] sigla = [] for siglum in list(df[constants.SIGLA_FIELDNAME]): if ' ' in siglum: siglum = '"{}"'.format(siglum) sigla.append(siglum) sigla.sort(key=lambda x: x.strip('"')) merged[constants.SIGLUM_FIELDNAME] = ' '.join(sigla) return merged self._matches = grouped.apply(merge_sigla) del self._matches[constants.SIGLA_FIELDNAME] self._matches.rename(columns={constants.SIGLUM_FIELDNAME: constants.SIGLA_FIELDNAME}, inplace=True)
[docs] def csv(self, fh): """Writes the report data to `fh` in CSV format and returns it. :param fh: file to write data to :type fh: file object :rtype: file object """ self._matches.to_csv(fh, encoding='utf-8', float_format='%d', index=False) return fh
[docs] def extend(self, corpus): """Adds rows for all longer forms of n-grams in the results that are present in the witnesses. This works with both diff and intersect results. :param corpus: corpus of works to which results belong :type corpus: `Corpus` """ self._logger.info('Extending results') if self._matches.empty: return highest_n = self._matches[constants.SIZE_FIELDNAME].max() if highest_n == 1: self._logger.warning( 'Extending results that contain only 1-grams is unsupported; ' 'the original results will be used') return # Determine if we are dealing with diff or intersect # results. In the latter case, we need to perform a reciprocal # remove as the final stage (since extended n-grams may exist # in works from more than one label). This test will think # that intersect results that have had all but one label # removed are difference results, which will cause the results # to be potentially incorrect. is_intersect = self._is_intersect_results(self._matches) # Supply the extender with only matches on the largest # n-grams. matches = self._matches[ self._matches[constants.SIZE_FIELDNAME] == highest_n] extended_matches = pd.DataFrame() cols = [constants.WORK_FIELDNAME, constants.SIGLUM_FIELDNAME, constants.LABEL_FIELDNAME] for index, (work, siglum, label) in \ matches[cols].drop_duplicates().iterrows(): extended_ngrams = self._generate_extended_ngrams( matches, work, siglum, label, corpus, highest_n) extended_matches = pd.concat( [extended_matches, self._generate_extended_matches( extended_ngrams, highest_n, work, siglum, label)]) extended_ngrams = None extended_matches = extended_matches.reindex_axis( constants.QUERY_FIELDNAMES, axis=1) if is_intersect: extended_matches = self._reciprocal_remove(extended_matches) self._matches = self._matches.append(extended_matches)
def _generate_extended_matches(self, extended_ngrams, highest_n, work, siglum, label): """Returns extended match data derived from `extended_ngrams`. This extended match data are the counts for all intermediate n-grams within each extended n-gram. :param extended_ngrams: extended n-grams :type extended_ngrams: `list` of `str` :param highest_n: the highest degree of n-grams in the original results :type highest_n: `int` :param work: name of the work bearing `extended_ngrams` :type work: `str` :param siglum: siglum of the text bearing `extended_ngrams` :type siglum: `str` :param label: label associated with the text :type label: `str` :rtype: `pandas.DataFrame` """ # Add data for each n-gram within each extended n-gram. Since # this treats each extended piece of text separately, the same # n-gram may be generated more than once, so the complete set # of new possible matches for this filename needs to combine # the counts for such. rows_list = [] for extended_ngram in extended_ngrams: text = Text(extended_ngram, self._tokenizer) for size, ngrams in text.get_ngrams(highest_n+1, len(text.get_tokens())): data = [{constants.WORK_FIELDNAME: work, constants.SIGLUM_FIELDNAME: siglum, constants.LABEL_FIELDNAME: label, constants.SIZE_FIELDNAME: size, constants.NGRAM_FIELDNAME: ngram, constants.COUNT_FIELDNAME: count} for ngram, count in ngrams.items()] rows_list.extend(data) self._logger.debug('Number of extended results: {}'.format( len(rows_list))) extended_matches = pd.DataFrame(rows_list) rows_list = None self._logger.debug('Finished generating intermediate extended matches') # extended_matches may be an empty DataFrame, in which case # manipulating it on the basis of non-existing columns is not # going to go well. groupby_fields = [constants.NGRAM_FIELDNAME, constants.WORK_FIELDNAME, constants.SIGLUM_FIELDNAME, constants.SIZE_FIELDNAME, constants.LABEL_FIELDNAME] if constants.NGRAM_FIELDNAME in extended_matches: extended_matches = extended_matches.groupby( groupby_fields, sort=False).sum().reset_index() return extended_matches def _generate_extended_ngrams(self, matches, work, siglum, label, corpus, highest_n): """Returns the n-grams of the largest size that exist in `siglum` witness to `work` under `label`, generated from adding together overlapping n-grams in `matches`. :param matches: n-gram matches :type matches: `pandas.DataFrame` :param work: name of work whose results are being processed :type work: `str` :param siglum: siglum of witness whose results are being processed :type siglum: `str` :param label: label of witness whose results are being processed :type label: `str` :param corpus: corpus to which `filename` belongs :type corpus: `Corpus` :param highest_n: highest degree of n-gram in `matches` :type highest_n: `int` :rtype: `list` of `str` """ # For large result sets, this method may involve a lot of # processing within the for loop, so optimise even small # things, such as aliasing dotted calls here and below. t_join = self._tokenizer.joiner.join witness_matches = matches[ (matches[constants.WORK_FIELDNAME] == work) & (matches[constants.SIGLUM_FIELDNAME] == siglum) & (matches[constants.LABEL_FIELDNAME] == label)] text = corpus.get_witness(work, siglum).get_token_content() ngrams = [tuple(self._tokenizer.tokenize(ngram)) for ngram in list(witness_matches[constants.NGRAM_FIELDNAME])] # Go through the list of n-grams, and create a list of # extended n-grams by joining two n-grams together that # overlap (a[-overlap:] == b[:-1]) and checking that the result # occurs in text. working_ngrams = ngrams[:] extended_ngrams = set(ngrams) new_working_ngrams = [] overlap = highest_n - 1 # Create an index of n-grams by their overlapping portion, # pointing to the non-overlapping token. ngram_index = {} for ngram in ngrams: values = ngram_index.setdefault(ngram[:-1], []) values.append(ngram[-1:]) extended_add = extended_ngrams.add new_working_append = new_working_ngrams.append ngram_size = highest_n while working_ngrams: removals = set() ngram_size += 1 self._logger.debug( 'Iterating over {} n-grams to produce {}-grams'.format( len(working_ngrams), ngram_size)) for base in working_ngrams: remove_base = False base_overlap = base[-overlap:] for next_token in ngram_index.get(base_overlap, []): extension = base + next_token if t_join(extension) in text: extended_add(extension) new_working_append(extension) remove_base = True if remove_base: # Remove base from extended_ngrams, because it is # now encompassed by extension. removals.add(base) extended_ngrams -= removals working_ngrams = new_working_ngrams[:] new_working_ngrams = [] new_working_append = new_working_ngrams.append extended_ngrams = sorted(extended_ngrams, key=len, reverse=True) extended_ngrams = [t_join(ngram) for ngram in extended_ngrams] self._logger.debug('Generated {} extended n-grams'.format( len(extended_ngrams))) self._logger.debug('Longest generated n-gram: {}'.format( extended_ngrams[0])) # In order to get the counts correct in the next step of the # process, these n-grams must be overlaid over the text and # repeated as many times as there are matches. N-grams that do # not match (and they may not match on previously matched # parts of the text) are discarded. ngrams = [] for ngram in extended_ngrams: # Remove from the text those parts that match. Replace # them with a double space, which should prevent any # incorrect match on the text from each side of the match # that is now contiguous. text, count = re.subn(re.escape(ngram), ' ', text) ngrams.extend([ngram] * count) self._logger.debug('Aligned extended n-grams with the text; ' '{} distinct n-grams exist'.format(len(ngrams))) return ngrams def _generate_filter_ngrams(self, data, min_size): """Returns the n-grams in `data` that do not contain any other n-gram in `data`. :param data: n-gram results data :type data: `pandas.DataFrame` :param min_size: minimum n-gram size in `data` :type min_size: `int` :rtype: `list` of `str` """ max_size = data[constants.SIZE_FIELDNAME].max() kept_ngrams = list(data[data[constants.SIZE_FIELDNAME] == min_size][ constants.NGRAM_FIELDNAME]) for size in range(min_size+1, max_size+1): pattern = FilteredWitnessText.get_filter_ngrams_pattern( kept_ngrams) potential_ngrams = list(data[data[constants.SIZE_FIELDNAME] == size][constants.NGRAM_FIELDNAME]) kept_ngrams.extend([ngram for ngram in potential_ngrams if pattern.search(ngram) is None]) return kept_ngrams def _generate_substrings(self, ngram, size): """Returns a list of all substrings of `ngram`. :param ngram: n-gram to generate substrings of :type ngram: `str` :param size: size of `ngram` :type size: `int` :rtype: `list` """ text = Text(ngram, self._tokenizer) substrings = [] for sub_size, ngrams in text.get_ngrams(1, size-1): for sub_ngram, count in ngrams.items(): substrings.extend([sub_ngram] * count) return substrings @staticmethod def _is_intersect_results(results): """Returns False if `results` has an n-gram that exists in only one label, True otherwise. :param results: results to analyze :type results: `pandas.DataFrame` :rtype: `bool` """ sample = results.iloc[0] ngram = sample[constants.NGRAM_FIELDNAME] label = sample[constants.LABEL_FIELDNAME] return not(results[ (results[constants.NGRAM_FIELDNAME] == ngram) & (results[constants.LABEL_FIELDNAME] != label)].empty) def _prepare_bifurcated_extend_data(self, corpus, max_size, temp_path, temp_fd): # It might be wondered why this whole derivation of n-grams # anew from the source text is required, when an extended set # of results could just be passed through to the final # filtering process. # # The answer is that, for diff results, the filtering done by # diff-reduce can remove many n-grams that are picked up in # this process, and which may well prove important in the # context of a bifurcated extend. self._matches.sort_values(by=[constants.SIZE_FIELDNAME], ascending=True, inplace=True) group_cols = [constants.WORK_FIELDNAME, constants.SIGLUM_FIELDNAME, constants.LABEL_FIELDNAME] # Output a CSV file containing the possible n-grams to include # in the final output. self._logger.debug('Writing filtered n-grams to temporary CSV file ' 'at {}'.format(temp_path)) with open(temp_fd, 'w', encoding='utf-8', newline='') as fh: writer = csv.writer(fh) writer.writerow(constants.QUERY_FIELDNAMES) for (text, siglum, label), group in self._matches.groupby( group_cols, sort=False): min_size = group[constants.SIZE_FIELDNAME].min() filter_ngrams = self._generate_filter_ngrams(group, min_size) witness = corpus.get_witness(text, siglum, FilteredWitnessText) for size, ngrams in witness.get_ngrams(min_size, max_size, filter_ngrams): rows = [[ngram, size, text, siglum, count, label] for ngram, count in ngrams.items()] writer.writerows(rows) self._matches = pd.read_csv(temp_path, encoding='utf-8', na_filter=False) self.add_label_count()
[docs] def prune_by_ngram(self, ngrams): """Removes results rows whose n-gram is in `ngrams`. :param ngrams: n-grams to remove :type ngrams: `list` of `str` """ self._logger.info('Pruning results by n-gram') self._matches = self._matches[ ~self._matches[constants.NGRAM_FIELDNAME].isin(ngrams)]
[docs] def prune_by_ngram_count(self, minimum=None, maximum=None): """Removes results rows whose total n-gram count (across all works bearing this n-gram) is outside the range specified by `minimum` and `maximum`. For each text, the count used as part of the sum across all works is the maximum count across the witnesses for that work. :param minimum: minimum n-gram count :type minimum: `int` :param maximum: maximum n-gram count :type maximum: `int` """ self._logger.info('Pruning results by n-gram count') def calculate_total(group): work_grouped = group.groupby(constants.WORK_FIELDNAME, sort=False) total_count = work_grouped[constants.COUNT_FIELDNAME].max().sum() group['total_count'] = pd.Series([total_count] * len(group.index), index=group.index) return group self._matches = self._matches.groupby( constants.NGRAM_FIELDNAME, sort=False).apply(calculate_total) if minimum: self._matches = self._matches[ self._matches['total_count'] >= minimum] if maximum: self._matches = self._matches[ self._matches['total_count'] <= maximum] del self._matches['total_count']
[docs] def prune_by_ngram_count_per_work(self, minimum=None, maximum=None): """Removes results rows if the n-gram count for all works bearing that n-gram is outside the range specified by `minimum` and `maximum`. That is, if a single witness of a single work has an n-gram count that falls within the specified range, all result rows for that n-gram are kept. :param minimum: minimum n-gram count :type minimum: `int` :param maximum: maximum n-gram count :type maximum: `int` """ self._logger.info('Pruning results by n-gram count per work') if minimum and maximum: keep_ngrams = set(self._matches[ (self._matches[constants.COUNT_FIELDNAME] >= minimum) & (self._matches[constants.COUNT_FIELDNAME] <= maximum)][ constants.NGRAM_FIELDNAME]) self._matches = self._matches[self._matches[ constants.NGRAM_FIELDNAME].isin(keep_ngrams)] elif minimum: keep_ngrams = set(self._matches[ self._matches[constants.COUNT_FIELDNAME] >= minimum][ constants.NGRAM_FIELDNAME]) self._matches = self._matches[self._matches[ constants.NGRAM_FIELDNAME].isin(keep_ngrams)] elif maximum: keep_ngrams = set(self._matches[ self._matches[constants.COUNT_FIELDNAME] <= maximum][ constants.NGRAM_FIELDNAME]) self._matches = self._matches[self._matches[ constants.NGRAM_FIELDNAME].isin(keep_ngrams)]
[docs] def prune_by_ngram_size(self, minimum=None, maximum=None): """Removes results rows whose n-gram size is outside the range specified by `minimum` and `maximum`. :param minimum: minimum n-gram size :type minimum: `int` :param maximum: maximum n-gram size :type maximum: `int` """ self._logger.info('Pruning results by n-gram size') if minimum: self._matches = self._matches[ self._matches[constants.SIZE_FIELDNAME] >= minimum] if maximum: self._matches = self._matches[ self._matches[constants.SIZE_FIELDNAME] <= maximum]
[docs] def prune_by_work_count(self, minimum=None, maximum=None): """Removes results rows for n-grams that are not attested in a number of works in the range specified by `minimum` and `maximum`. Work here encompasses all witnesses, so that the same n-gram appearing in multiple witnesses of the same work are counted as a single work. :param minimum: minimum number of works :type minimum: `int` :param maximum: maximum number of works :type maximum: `int` """ self._logger.info('Pruning results by work count') count_fieldname = 'tmp_count' filtered = self._matches[self._matches[constants.COUNT_FIELDNAME] > 0] grouped = filtered.groupby(constants.NGRAM_FIELDNAME, sort=False) counts = pd.DataFrame(grouped[constants.WORK_FIELDNAME].nunique()) counts.rename(columns={constants.WORK_FIELDNAME: count_fieldname}, inplace=True) if minimum: counts = counts[counts[count_fieldname] >= minimum] if maximum: counts = counts[counts[count_fieldname] <= maximum] self._matches = pd.merge(self._matches, counts, left_on=constants.NGRAM_FIELDNAME, right_index=True) del self._matches[count_fieldname]
[docs] def reciprocal_remove(self): """Removes results rows for which the n-gram is not present in at least one text in each labelled set of texts.""" self._logger.info( 'Removing n-grams that are not attested in all labels') self._matches = self._reciprocal_remove(self._matches)
def _reciprocal_remove(self, matches): number_labels = matches[constants.LABEL_FIELDNAME].nunique() filtered = matches[matches[constants.COUNT_FIELDNAME] > 0] grouped = filtered.groupby(constants.NGRAM_FIELDNAME, sort=False) return grouped.filter( lambda x: x[constants.LABEL_FIELDNAME].nunique() == number_labels)
[docs] def reduce(self): """Removes results rows whose n-grams are contained in larger n-grams.""" self._logger.info('Reducing the n-grams') # This does not make use of any pandas functionality; it # probably could, and if so ought to. data = {} labels = {} # Derive a convenient data structure from the rows. for row_index, row in self._matches.iterrows(): work = row[constants.WORK_FIELDNAME] siglum = row[constants.SIGLUM_FIELDNAME] labels[work] = row[constants.LABEL_FIELDNAME] witness_data = data.setdefault((work, siglum), {}) witness_data[row[constants.NGRAM_FIELDNAME]] = { 'count': int(row[constants.COUNT_FIELDNAME]), 'size': int(row[constants.SIZE_FIELDNAME])} for witness_data in data.values(): ngrams = list(witness_data.keys()) ngrams.sort(key=lambda ngram: witness_data[ngram]['size'], reverse=True) for ngram in ngrams: if witness_data[ngram]['count'] > 0: self._reduce_by_ngram(witness_data, ngram) # Recreate rows from the modified data structure. rows = [] for (work, siglum), witness_data in data.items(): for ngram, ngram_data in witness_data.items(): count = ngram_data['count'] if count > 0: rows.append( {constants.NGRAM_FIELDNAME: ngram, constants.SIZE_FIELDNAME: ngram_data['size'], constants.WORK_FIELDNAME: work, constants.SIGLUM_FIELDNAME: siglum, constants.COUNT_FIELDNAME: count, constants.LABEL_FIELDNAME: labels[work]}) if rows: self._matches = pd.DataFrame( rows, columns=constants.QUERY_FIELDNAMES) else: self._matches = pd.DataFrame()
def _reduce_by_ngram(self, data, ngram): """Lowers the counts of all n-grams in `data` that are substrings of `ngram` by `ngram`\'s count. Modifies `data` in place. :param data: row data dictionary for the current text :type data: `dict` :param ngram: n-gram being reduced :type ngram: `str` """ # Find all substrings of `ngram` and reduce their count by the # count of `ngram`. Substrings may not exist in `data`. count = data[ngram]['count'] for substring in self._generate_substrings(ngram, data[ngram]['size']): try: substring_data = data[substring] except KeyError: continue else: substring_data['count'] -= count
[docs] def remove_label(self, label): """Removes all results rows associated with `label`. :param label: label to filter results on :type label: `str` """ self._logger.info('Removing label "{}"'.format(label)) count = self._matches[constants.LABEL_FIELDNAME].value_counts()[label] self._matches = self._matches[ self._matches[constants.LABEL_FIELDNAME] != label] self._logger.info('Removed {} labelled results'.format(count))
[docs] def sort(self): """Sorts all results rows. Sorts by: size (descending), n-gram, count (descending), label, text name, siglum. """ self._matches.sort_values( by=[constants.SIZE_FIELDNAME, constants.NGRAM_FIELDNAME, constants.COUNT_FIELDNAME, constants.LABEL_FIELDNAME, constants.WORK_FIELDNAME, constants.SIGLUM_FIELDNAME], ascending=[False, True, False, True, True, True], inplace=True)
[docs] def zero_fill(self, corpus, catalogue): """Adds rows to the results to ensure that, for every n-gram that is attested in at least one witness, every witness for that text has a row, with added rows having a count of zero. :param corpus: corpus containing the texts appearing in the results :type corpus: `Corpus` :param catalogue: catalogue used in the generation of the results :type catalogue: `Catalogue` """ zero_rows = [] # Get all of the texts, and their witnesses, for each label. data = {} for work, label in iter(catalogue.items()): data.setdefault(label, {})[work] = [] for siglum in corpus.get_sigla(work): data[label][work].append(siglum) grouping_cols = [constants.LABEL_FIELDNAME, constants.NGRAM_FIELDNAME, constants.SIZE_FIELDNAME, constants.WORK_FIELDNAME] grouped = self._matches.groupby(grouping_cols, sort=False) for (label, ngram, size, work), group in grouped: row_data = { constants.NGRAM_FIELDNAME: ngram, constants.LABEL_FIELDNAME: label, constants.SIZE_FIELDNAME: size, constants.COUNT_FIELDNAME: 0, constants.WORK_FIELDNAME: work, } for siglum in data[label][work]: if group[group[constants.SIGLUM_FIELDNAME] == siglum].empty: row_data[constants.SIGLUM_FIELDNAME] = siglum zero_rows.append(row_data) zero_df = pd.DataFrame(zero_rows, columns=constants.QUERY_FIELDNAMES) self._matches = pd.concat([self._matches, zero_df])