Source code for mw.lib.reverts.database

import random
from itertools import chain

from . import defaults
from ...types import Timestamp
from ...util import none_or
from .dummy_checksum import DummyChecksum
from .functions import detect

HEX = "1234567890abcdef"

def random_sha1():
    return ''.join(random.choice(HEX) for i in range(40))

"""
Simple constant used in order to not do weird things with a dummy revision.
"""


[docs]def check_row(db, rev_row, **kwargs): """ Checks whether a revision (database row) was reverted (identity) and returns a named tuple of Revert(reverting, reverteds, reverted_to). :Parameters: db : :class:`mw.database.DB` A database connection to make use of. rev_row : dict a revision row containing 'rev_id' and 'rev_page' or 'page_id' radius : int a positive integer indicating the the maximum number of revisions that can be reverted check_archive : bool should the archive table be checked for reverting revisions? before : `Timestamp` if set, limits the search for *reverting* revisions to those which were saved before this timestamp """ # extract rev_id, sha1, page_id if 'rev_id' in rev_row: rev_id = rev_row['rev_id'] else: raise TypeError("rev_row must have 'rev_id'") if 'page_id' in rev_row: page_id = rev_row['page_id'] elif 'rev_page' in rev_row: page_id = rev_row['rev_page'] else: raise TypeError("rev_row must have 'page_id' or 'rev_page'") # run the regular check return check(db, rev_id, page_id=page_id, **kwargs)
[docs]def check(db, rev_id, page_id=None, radius=defaults.RADIUS, check_archive=False, before=None, window=None): """ Checks whether a revision was reverted (identity) and returns a named tuple of Revert(reverting, reverteds, reverted_to). :Parameters: db : `mw.database.DB` A database connection to make use of. rev_id : int the ID of the revision to check page_id : int the ID of the page the revision occupies (slower if not provided) radius : int a positive integer indicating the maximum number of revisions that can be reverted check_archive : bool should the archive table be checked for reverting revisions? before : `Timestamp` if set, limits the search for *reverting* revisions to those which were saved before this timestamp window : int if set, limits the search for *reverting* revisions to those which were saved within `window` seconds after the reverted edit """ if not hasattr(db, "revisions") and hasattr(db, "all_revisions"): raise TypeError("db wrong type. Expected a mw.database.DB.") rev_id = int(rev_id) radius = int(radius) if radius < 1: raise TypeError("invalid radius. Expected a positive integer.") page_id = none_or(page_id, int) check_archive = bool(check_archive) before = none_or(before, Timestamp) # If we are searching the archive, we'll need to use `all_revisions`. if check_archive: dbrevs = db.all_revisions else: dbrevs = db.revisions # If we don't have the sha1 or page_id, we're going to need to look them up if page_id is None: row = dbrevs.get(rev_id=rev_id) page_id = row['rev_page'] # Load history and current rev current_and_past_revs = list(dbrevs.query( page_id=page_id, limit=radius + 1, before_id=rev_id + 1, # Ensures that we capture the current revision direction="older" )) try: # Extract current rev and reorder history current_rev, past_revs = ( current_and_past_revs[0], # Current rev is the first one returned reversed(current_and_past_revs[1:]) # The rest are past revs, but they are in the wrong order ) except IndexError: # Only way to get here is if there isn't enough history. Couldn't be # reverted. Just return None. return None if window is not None and before is None: before = Timestamp(current_rev['rev_timestamp']) + window # Load future revisions future_revs = dbrevs.query( page_id=page_id, limit=radius, after_id=rev_id, before=before, direction="newer" ) # Convert to an iterable of (checksum, rev) pairs for detect() to consume checksum_revisions = chain( ((rev['rev_sha1'] if rev['rev_sha1'] is not None \ else DummyChecksum(), rev) for rev in past_revs), [(current_rev['rev_sha1'] or DummyChecksum(), current_rev)], ((rev['rev_sha1'] if rev['rev_sha1'] is not None \ else DummyChecksum(), rev) for rev in future_revs) ) for revert in detect(checksum_revisions, radius=radius): # Check that this is a relevant revert if rev_id in [rev['rev_id'] for rev in revert.reverteds]: return revert return None