Source code for monk.manipulation
# -*- coding: utf-8 -*-
#
# Monk is a lightweight schema/query framework for document databases.
# Copyright © 2011 Andrey Mikhaylenko
#
# This file is part of Monk.
#
# Monk is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Monk is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Monk. If not, see <http://gnu.org/licenses/>.
"""
Data manipulation
=================
.. attribute:: VALUE_MERGERS
Default series of mergers:
* :class:`TypeMerger`
* :class:`DictMerger`
* :class:`ListMerger`
* :class:`FuncMerger`
* :class:`AnyMerger`
"""
import types
[docs]class ValueMerger(object):
""" Base class for value mergers.
"""
def __init__(self, spec, value):
self.spec = spec
self.value = value
[docs] def check(self):
""" Returns ``True`` if this merger can handle given spec/value pair,
otherwise returns ``False``.
Subclasses must overload this method.
"""
raise NotImplementedError
[docs] def process(self):
""" Returns a merged version or `self.spec` and `self.value`.
Subclasses must overload this method.
"""
raise NotImplementedError
[docs]class TypeMerger(ValueMerger):
""" Type definition. Preserves empty values.
Example::
>>> TypeMerger(int, None).process()
None
>>> TypeMerger(int, 123).process()
123
"""
def check(self):
return isinstance(self.spec, type)
def process(self):
# there's no default value for this key, just a restriction on type
return self.value
[docs]class DictMerger(ValueMerger):
""" Nested dictionary.
Example::
>>> DictMerger({'a': 123}, {}).process()
{'a': 123}
>>> DictMerger({'a': 123}, {'a': 456}).process()
{'a': 456}
"""
def check(self):
return self.spec == dict or isinstance(self.spec, dict)
def process(self):
if self.value is not None and not isinstance(self.value, dict):
# bogus value; will not pass validation but should be preserved
return self.value
return merged(self.spec or {}, self.value or {})
[docs]class ListMerger(ValueMerger):
""" Nested list.
"""
def check(self):
return self.spec == list or isinstance(self.spec, list)
def process(self):
item_spec = self.spec[0] if self.spec else None
if isinstance(item_spec, type):
return []
elif isinstance(item_spec, dict):
# list of dictionaries
if self.value:
return [merged(item_spec, item) for item in self.value]
else:
return []
elif item_spec == None:
# any value is accepted as list item
return self.value
else:
# probably default list item like [1]
return self.value
[docs]class FuncMerger(ValueMerger):
""" Default value is obtained from a function with no arguments.
It is expected that the callable does not have side effects.
Example::
>>> FuncMerger(lambda: 123, None).process()
123
>>> FuncMerger(lambda: 123, 456).process()
456
"""
def check(self):
func_types = types.FunctionType, types.BuiltinFunctionType
return isinstance(self.spec, func_types)
def process(self):
if self.value is None:
return self.spec()
else:
return self.value
[docs]class AnyMerger(ValueMerger):
""" Any value from spec that can be checked for type.
"""
def check(self):
return True
def process(self):
if self.value is None:
return self.spec
else:
return self.value
VALUE_MERGERS = TypeMerger, DictMerger, ListMerger, FuncMerger, AnyMerger
[docs]def merge_value(spec, value, mergers):
""" Returns a merged value based on given spec and data, using given
sequence of mergers.
The mergers are polled expected to be subclasses of :class:`ValueMerger`.
They are polled one by one; the first one that agrees to process given
value is used to produce the result.
Example::
>>> merge_value({'a': 123}, {}, [DictMerger])
{'a': 123}
>>> merge_value({'a': 123}, {'a': 456}, [DictMerger])
{'a': 456}
"""
for merger_class in mergers:
merger = merger_class(spec, value)
if merger.check():
return merger.process()
return value
[docs]def merged(spec, data, value_processor=None, mergers=VALUE_MERGERS):
""" Returns a dictionary based on `spec` + `data`.
Does not validate values. If `data` overrides a default value, it is
trusted. The result can be validated later with
:func:`~monk.validation.validate_structure`.
Note that a key/value pair is added from `spec` either if `data` does not
define this key at all, or if the value is ``None``. This behaviour may not
be suitable for all cases and therefore may change in the future.
You can fine-tune the process by changing the list of mergers.
:param spec:
`dict`. A document structure specification.
:param data:
`dict`. Overrides some or all default values from the spec.
:param value_processor:
function, must take one argument and return the modified value.
:param mergers:
`tuple`. An ordered series of :class:`ValueMerger` subclasses.
Default is :attr:`VALUE_MERGERS`. The mergers are passed to
:func:`merge_value`.
"""
result = {}
if not isinstance(data, dict):
raise TypeError('data must be a dictionary')
for key in set(spec.keys() + data.keys()):
if key in spec:
value = merge_value(spec[key], data.get(key), mergers=mergers)
else:
# never mind if there are nested structures: anyway we cannot check
# them as they aren't in the spec
value = data[key]
if value_processor:
value = value_processor(value)
result[key] = value
return result