Source code for json_merger.merger

# -*- coding: utf-8 -*-
#
# This file is part of Inspirehep.
# Copyright (C) 2016 CERN.
#
# Inspirehep is free software; you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 2 of the
# License, or (at your option) any later version.
#
# Inspirehep is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Inspirehep; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307, USA.
#
# In applying this license, CERN does not
# waive the privileges and immunities granted to it by virtue of its status
# as an Intergovernmental Organization or submit itself to any jurisdiction.

"""Definition for JSON merger class."""

from __future__ import absolute_import, print_function

import copy

from .comparator import DefaultComparator
from .dict_merger import SkipListsMerger
from .errors import MergeError
from .list_unify import ListUnifier
from .utils import (
    get_conf_set_for_key_path, get_dotted_key_path,
    get_obj_at_key_path, set_obj_at_key_path
)

PLACEHOLDER_STR = '#$PLACEHOLDER$#'


[docs]class Merger(object): """Class that merges two JSON objects that share a common ancestor. This class treats by default all lists as being lists of entities and offers support for matching their elements by their content, by specifing per-field comparator classes. """ def __init__(self, root, head, update, default_dict_merge_op, default_list_merge_op, list_dict_ops=None, list_merge_ops=None, comparators=None, data_lists=None): """ Args: root: A common ancestor of the two objects being merged. head: One of the objects that is being merged. Refers to the version that is currently in use. (e.g. a displayed database record) update: The second object that is being merged. Refers to an update that needs to be integrated with the in-use version. default_dict_merge_op (:class:`json_merger.config.DictMergerOps` class attribute): Default strategy for merging regular non list JSON values (strings, numbers, other objects). default_list_merge_op (:class:`json_merger.config.UnifierOps` class attribute): Default strategy for merging two lists of entities. dict_merge_ops: Defines custom strategies for merging dict of entities. Dict formatted as: * keys -- a config string * values -- a class attribute of :class:`json_merger.config.DictMergerOps` list_merge_ops: Defines custom strategies for merging lists of entities. Dict formatted as: * keys -- a config string * values -- a class attribute of :class:`json_merger.config.UnifierOps` comparators: Defines classes used for rendering entities in list fields as equal. Dict formatted as: * keys -- a config string * values -- a class that extends :class:`json_merger.comparator.BaseComparator` data_lists: List of config strings defining the lists that are not treated as lists of entities. Note: A configuration string represents the path towards a list field in the object sepparated with dots. Example: Configuration strings can be: For ``{'lst': [{'id': 0, 'tags': ['foo', 'bar']}]}``: * the config string for the top level list is ``'lst'`` * the config string for the tags lists is ``'lst.tags'`` """ self.comparators = comparators or {} self.data_lists = set(data_lists or []) self.list_dict_ops = list_dict_ops or {} self.list_merge_ops = list_merge_ops or {} self.default_dict_merge_op = default_dict_merge_op self.default_list_merge_op = default_list_merge_op self.root = copy.deepcopy(root) self.head = copy.deepcopy(head) self.update = copy.deepcopy(update) self.head_stats = {} self.update_stats = {} self.conflicts = [] self.merged_root = None self.aligned_root = copy.deepcopy(root) self.aligned_head = copy.deepcopy(head) self.aligned_update = copy.deepcopy(update) def merge(self): """Populates result members. Performs the merge algorithm using the specified config and fills in the members that provide stats about the merging procedure. Attributes: merged_root: The result of the merge. aligned_root, aligned_head, aligned_update: Copies of root, head and update in which all matched list entities have the same list index for easier diff viewing. head_stats, update_stats: Stats for each list field present in the head or update objects. Instance of :class:`json_merger.stats.ListMatchStats` conflicts: List of :class:`json_merger.conflict.Conflict` instances that occured during the merge. Raises: :class:`json_merger.errors.MergeError` : If conflicts occur during the call. Example: >>> from json_merger import Merger >>> # We compare people by their name >>> from json_merger.comparator import PrimaryKeyComparator >>> from json_merger.config import DictMergerOps, UnifierOps >>> from json_merger.errors import MergeError >>> # Use this only for doctest :) >>> from pprint import pprint as pp >>> >>> root = {'people': [{'name': 'Jimmy', 'age': 30}]} >>> head = {'people': [{'name': 'Jimmy', 'age': 31}, ... {'name': 'George'}]} >>> update = {'people': [{'name': 'John'}, ... {'name': 'Jimmy', 'age': 32}]} >>> >>> class NameComparator(PrimaryKeyComparator): ... # Two objects are the same entitity if they have the ... # same name. ... primary_key_fields = ['name'] >>> m = Merger(root, head, update, ... DictMergerOps.FALLBACK_KEEP_HEAD, ... UnifierOps.KEEP_UPDATE_AND_HEAD_ENTITIES_HEAD_FIRST, ... comparators = {'people': NameComparator}) >>> # We do a merge >>> try: ... m.merge() ... except MergeError as e: ... # Conflicts are the same thing as the exception content. ... assert e.content == m.conflicts >>> # This is how the lists are aligned: >>> pp(m.aligned_root['people'], width=60) ['#$PLACEHOLDER$#', {'age': 30, 'name': 'Jimmy'}, '#$PLACEHOLDER$#'] >>> pp(m.aligned_head['people'], width=60) ['#$PLACEHOLDER$#', {'age': 31, 'name': 'Jimmy'}, {'name': 'George'}] >>> pp(m.aligned_update['people'], width=60) [{'name': 'John'}, {'age': 32, 'name': 'Jimmy'}, '#$PLACEHOLDER$#'] >>> # This is the end result of the merge: >>> pp(m.merged_root, width=60) {'people': [{'name': 'John'}, {'age': 31, 'name': 'Jimmy'}, {'name': 'George'}]} >>> # With some conflicts: >>> pp(m.conflicts, width=60) [('SET_FIELD', ('people', 1, 'age'), 32)] >>> # And some stats: >>> pp(m.head_stats[('people',)].in_result) [{'age': 31, 'name': 'Jimmy'}, {'name': 'George'}] >>> pp(m.update_stats[('people',)].not_in_result) [] Note: Even if conflicts occur, merged_root, aligned_root, aligned_head and aligned_update are always populated by following the startegies set for the merger instance. """ self.merged_root = self._recursive_merge(self.root, self.head, self.update) if self.conflicts: raise MergeError('Conflicts Occurred in Merge Process', self.conflicts) def _recursive_merge(self, root, head, update, key_path=()): dotted_key_path = get_dotted_key_path(key_path, filter_int_keys=True) if (isinstance(head, list) and isinstance(update, list) and dotted_key_path not in self.data_lists): # In this case we are merging two lists of objects. lists_to_unify = [()] if not isinstance(root, list): root = [] else: # Otherwise we merge everything but the lists using DictMergerOps. m = self._merge_objects(root, head, update, key_path) root = m.merged_root lists_to_unify = m.skipped_lists for list_field in lists_to_unify: absolute_key_path = key_path + list_field root_l = get_obj_at_key_path(root, list_field, []) head_l = get_obj_at_key_path(head, list_field, []) update_l = get_obj_at_key_path(update, list_field, []) unifier = self._unify_lists(root_l, head_l, update_l, absolute_key_path) new_list = [] for idx, objects in enumerate(unifier.unified): root_obj, head_obj, update_obj = objects new_obj = self._recursive_merge(root_obj, head_obj, update_obj, absolute_key_path + (idx, )) new_list.append(new_obj) root = set_obj_at_key_path(root, list_field, new_list) self._build_aligned_lists_and_stats(unifier, absolute_key_path) return root def _merge_objects(self, root, head, update, key_path): data_lists = get_conf_set_for_key_path(self.data_lists, key_path) object_merger = SkipListsMerger(root, head, update, self.default_dict_merge_op, data_lists, self.list_dict_ops, key_path) try: object_merger.merge() except MergeError as e: self.conflicts.extend(c.with_prefix(key_path) for c in e.content) return object_merger def _unify_lists(self, root, head, update, key_path): dotted_key_path = get_dotted_key_path(key_path, True) operation = self.list_merge_ops.get(dotted_key_path, self.default_list_merge_op) comparator_cls = self.comparators.get(dotted_key_path, DefaultComparator) list_unifier = ListUnifier(root, head, update, operation, comparator_cls) try: list_unifier.unify() except MergeError as e: self.conflicts.extend(c.with_prefix(key_path) for c in e.content) return list_unifier def _build_aligned_lists_and_stats(self, list_unifier, key_path): root_list = [] head_list = [] update_list = [] for root_obj, head_obj, update_obj in list_unifier.unified: # Cast NOTHING objects to a placeholder so we reserialize back to # JSON if needed. root_list.append(root_obj or PLACEHOLDER_STR) head_list.append(head_obj or PLACEHOLDER_STR) update_list.append(update_obj or PLACEHOLDER_STR) # Try to put back the list if the key path existed in the first place. self.aligned_root = set_obj_at_key_path(self.aligned_root, key_path, root_list, False) self.aligned_head = set_obj_at_key_path(self.aligned_head, key_path, head_list, False) self.aligned_update = set_obj_at_key_path(self.aligned_update, key_path, update_list, False) # Also copy over the stats. self.head_stats[key_path] = list_unifier.head_stats self.update_stats[key_path] = list_unifier.update_stats