# -*- coding:utf-8 -*-
"""The JSONPatch module provides for the alteration of JSON data compliant to RFC6902.
The emphasis of the design combines low resource requirement with features
designed for the application of large filters onto large JSON based data
structures.
The patch list itself is defined by RFC6902 as a JSON array. The entries
could be either constructed in-memory, or imported from a persistent storage.
The export feature provides for the persistent storage of a modified patch
list for later reuse.
The module contains the following classes:
* **JSONPatch**:
The controller for the application of patches on in-memory
data structures provided by the package 'json'.
* **JSONPatchItem**:
Representation of one patch entry in accordance to RFC6902.
* **JSONPatchItemRaw**:
Representation of one patch entry read as a raw entry in accordance to RFC6902.
* **JSONPatchFilter**:
Selection filter for the application on the current patch list
entries JSONPatchItem.
* **JSONPatchException**:
Specific exception for this module.
The address of the the provided 'path' components for the entries are managed
by the class JSONPointer in accordance to RFC6901.
"""
__author__ = 'Arno-Can Uestuensoez'
__maintainer__ = 'Arno-Can Uestuensoez'
__license__ = "Artistic-License-2.0 + Forced-Fairplay-Constraints"
__copyright__ = "Copyright (C) 2015-2016 Arno-Can Uestuensoez @Ingenieurbuero Arno-Can Uestuensoez"
__version__ = '0.2.18'
__uuid__='63b597d6-4ada-4880-9f99-f5e0961351fb'
import sys
version = '{0}.{1}'.format(*sys.version_info[:2])
if not version in ('2.6','2.7',): # pragma: no cover
raise Exception("Requires Python-2.6.* or higher")
# if version < '2.7': # pragma: no cover
# raise Exception("Requires Python-2.7.* or higher")
if sys.modules.get('json'):
import json as myjson
elif sys.modules.get('ujson'):
import ujson as myjson
else:
import json as myjson
# for now the only one supported
from types import NoneType
from jsondata.JSONPointer import JSONPointer
from jsondata.JSONDataSerializer import JSONDataSerializer,MODE_SCHEMA_OFF
# default
_appname = "jsonpatch"
# Sets display for inetractive JSON/JSONschema design.
_interactive = False
#
# Operations in accordance to RFC6902
RFC6902_ADD = 1
RFC6902_COPY = 2
RFC6902_MOVE = 3
RFC6902_REMOVE = 4
RFC6902_REPLACE = 5
RFC6902_TEST = 6
#
# Mapping for reverse transformation
op2str = {
RFC6902_ADD: "add",
RFC6902_COPY: "copy",
RFC6902_MOVE: "move",
RFC6902_REMOVE: "remove",
RFC6902_REPLACE: "replace",
RFC6902_TEST: "test"
}
#
# Mapping for reverse transformation
str2op = {
"add": RFC6902_ADD,
"copy": RFC6902_COPY,
"move": RFC6902_MOVE,
"remove": RFC6902_REMOVE,
"replace": RFC6902_REPLACE,
"test": RFC6902_TEST
}
[docs]def getOp(x):
"""Converts input into corresponding enumeration.
"""
if type(x) in (int,float,):
return int(x)
elif type(x) is (str,unicode,) and x.isdigit():
return int(x)
return str2op.get(x,None)
class JSONPatchException(Exception):
pass
class JSONPatchItemException(JSONPatchException):
pass
[docs]class JSONPatchItem(object):
"""Record entry for list of patch tasks.
Attributes:
op: operations:
add, copy, move, remove, replace, test
target: JSONPointer for the modification target, see RFC6902.
value: Value, either a branch, or a leaf of the JSON data structure.
src: JSONPointer for the modification source, see RFC6902.
"""
[docs] def __init__(self,op,target,param=None):
"""Create an entry for the patch list.
Args:
op: Operation: add, copy, move, remove, replace, test
target: Target node.
param: Parameter specific for the operation:
value: add,replace, test
src: copy, move
param:=None for 'remove'
Returns:
When successful returns 'True', else returns either 'False', or
raises an exception.
Success is the complete addition only, thus one failure returns
False.
Raises:
JSONDataSerializerError:
"""
self.value = None
self.src = None
self.op = getOp(op)
self.target = JSONPointer(target)
if self.op in (RFC6902_ADD,RFC6902_REPLACE,RFC6902_TEST):
self.value = param
elif self.op is RFC6902_REMOVE:
pass
elif self.op in (RFC6902_COPY,RFC6902_MOVE):
self.op = op
self.src = param
else:
raise JSONPatchItemException("Unknown operation.")
[docs] def __call__(self, j):
"""Evaluates the related task for the provided data.
Args:
j: JSON data the task has to be
applied on.
Returns:
Returns a tuple of:
0: len of the job list
1: list of the execution status for
the tasks
Raises:
JSONPatchException:
"""
return self.apply(j)
[docs] def __eq__(self,x):
"""Compares this pointer with x.
Args:
x: A valid Pointer.
Returns:
True or False
Raises:
JSONPointerException
"""
ret = True
if type(x) == dict:
ret &= self.target == x['path']
else:
ret &= self.target == x['target']
if self.op == RFC6902_ADD:
ret &= x['op'] in ('add',RFC6902_ADD)
ret &= self.value == x['value']
elif self.op == RFC6902_REMOVE:
ret &= x['op'] in ('remove',RFC6902_REMOVE)
elif self.op == RFC6902_REPLACE:
ret &= x['op'] in ('replace',RFC6902_REPLACE)
ret &= self.value == x['value']
elif self.op == RFC6902_MOVE:
ret &= x['op'] in ('move',RFC6902_MOVE)
ret &= self.src == x['from']
elif self.op == RFC6902_COPY:
ret &= x['op'] in ('copy',RFC6902_COPY)
ret &= self.src == x['from']
elif self.op == RFC6902_TEST:
ret &= x['op'] in ('test',RFC6902_TEST)
ret &= self.value == x['value']
return ret
[docs] def __getitem__(self,key):
"""Support of various mappings.
#. self[key]
#. self[i:j:k]
#. x in self
#. for x in self
"""
if key in ('path', 'target',):
return self.target
elif key in ('op',):
return self.op
elif key in ('value','param',):
return self.value
elif key in ('from','src',):
return self.src
[docs] def __ne__(self, x):
"""Compares this pointer with x.
Args:
x: A valid Pointer.
Returns:
True or False
Raises:
JSONPointerException
"""
return not self.__eq__(x)
[docs] def __repr__(self):
"""Prints the patch string in accordance to RFC6901.
"""
ret = "{u'op': u'"+unicode(op2str[self.op])+"', u'path': u'"+unicode(self.target)+"'"
if self.op in (RFC6902_ADD,RFC6902_REPLACE,RFC6902_TEST):
if type(self.value) in (int,float):
ret += ", u'value': "+unicode(self.value)
elif type(self.value) in (dict,list):
ret += ", u'value': "+repr(self.value)
else:
ret += ", u'value': u'"+unicode(self.value)+"'"
elif self.op is RFC6902_REMOVE:
pass
elif self.op in (RFC6902_COPY,RFC6902_MOVE):
ret += ", u'from': u'"+unicode(self.src)+"'"
ret += "}"
return ret
[docs] def __str__(self):
"""Prints the patch string in accordance to RFC6901.
"""
ret = '{"op": "'+op2str[self.op]+'", "target": "'+str(self.target)
if self.op in (RFC6902_ADD,RFC6902_REPLACE,RFC6902_TEST):
if type(self.value) in (int,float):
ret += '", "value": '+str(self.value)+' }'
else:
ret += '", "value": "'+str(self.value)+'" }'
elif self.op is RFC6902_REMOVE:
ret += '" }'
elif self.op in (RFC6902_COPY,RFC6902_MOVE):
ret += '", "src": "'+str(self.src)+'" }'
return ret
[docs] def apply(self,jsondata):
"""Applies the present patch list on the provided JSON document.
Args:
jsondata: Document to be patched.
Returns:
When successful returns 'True', else raises an exception.
Or returns a tuple:
(n,lerr): n: number of present active entries
lerr: list of failed entries
Raises:
JSONPatchException:
"""
if self.op is RFC6902_ADD:
#n,b = self.target.get_node_and_child(jsondata)
nbranch = jsondata.branch_add(
self.target, # target pointer
None,
self.value) # value
return True
if isinstance(jsondata,JSONDataSerializer):
jsondata = jsondata.data
if self.op is RFC6902_REPLACE:
n,b = self.target.get_node_and_child(jsondata)
n[unicode(b)] = unicode(self.value)
elif self.op is RFC6902_TEST:
n,b = JSONPointer(self.target,False).get_node_and_child(jsondata)
if type(self.value) is str:
self.value = unicode(self.value)
if type(n) is list:
return n[b] == self.value
return n[unicode(b)] == self.value
elif self.op is RFC6902_COPY:
val = JSONPointer(self.src).get_node_or_value(jsondata)
tn,tc = self.target.get_node_and_child(jsondata)
tn[tc] = val
elif self.op is RFC6902_MOVE:
val = JSONPointer(self.src).get_node_or_value(jsondata)
sn,sc = JSONPointer(self.src).get_node_and_child(jsondata)
sn.pop(sc)
tn,tc = self.target.get_node_and_child(jsondata)
if type(tn) is list:
if len(tn)<=tc:
tn.append(val)
else:
tn[tc] = val
else:
tn[tc] = val
elif self.op is RFC6902_REMOVE:
n,b = self.target.get_node_and_child(jsondata)
n.pop(b)
return True
[docs] def repr_export(self):
"""Prints the patch string for export in accordance to RFC6901.
"""
ret = '{"op": "'+str(op2str[self.op])+'", "path": "'+str(self.target)+'"'
if self.op in (RFC6902_ADD,RFC6902_REPLACE,RFC6902_TEST):
if type(self.value) in (int,float):
ret += ', "value": '+str(self.value)
elif type(self.value) in (dict,list):
ret += ', "value": '+str(self.value)
else:
ret += ', "value": "'+str(self.value)+'"'
elif self.op is RFC6902_REMOVE:
pass
elif self.op in (RFC6902_COPY,RFC6902_MOVE):
ret += ', "from": "'+str(self.src)+'"'
ret += '}'
return ret
[docs]class JSONPatchItemRaw(JSONPatchItem):
"""Adds native patch strings or an unsorted dict for RFC6902.
"""
[docs] def __init__(self,patchstring):
"""Parse a raw patch string in accordance to RFC6902.
"""
if type(patchstring) in (str,unicode,):
ps = myjson.loads(patchstring)
sx = myjson.dumps(ps)
#print "<"+str(sx)+">"
#print "<"+str(patchstring)+">"
#l0 = len(sx.replace(" ",""))
#l1 = len(patchstring.replace(" ",""))
if len(sx.replace(" ","")) != len(patchstring.replace(" ","")):
raise JSONPatchItemException("Repetition is not compliant to RFC6902:"+str(patchstring))
elif type(patchstring) is dict:
ps = patchstring
else:
raise JSONPatchItemException("Type not supported:"+str(patchstring))
try:
target = ps['path']
op = getOp(ps['op'])
if op in (RFC6902_ADD,RFC6902_REPLACE,RFC6902_TEST):
param = ps['value']
elif op is RFC6902_REMOVE:
param = None
elif op in (RFC6902_COPY,RFC6902_MOVE):
param = ps['from']
except Exception as e:
raise JSONPatchItemException(e)
super(JSONPatchItemRaw,self).__init__(op,target,param)
[docs]class JSONPatchFilter(object):
"""Filtering capabilities on the entries of patch lists.
"""
[docs] def __init__(self,**kargs):
"""
Args:
**kargs: Filter parameters:
Common:
contain=(True|False): Contain, else equal.
type=<node-type>: Node is of type.
Paths:
branch=<branch>:
deep=(): Determines the depth of comparison.
prefix=<prefix>: Any node of prefix. If prefix is
absolute: the only and one, else None.
relative: any node prefixed by the path fragment.
Values:
val=<node-value>: Node ha the value.
Returns:
True or False
Raises:
JSONPointerException:
"""
for k,v in kargs:
if k == 'prefix':
self.prefix = v
elif k == 'branch':
self.branch = v
pass
[docs] def __eq__(self,x):
pass
[docs] def __ne__(self,x):
pass
[docs]class JSONPatch(object):
""" Representation of a JSONPatch task list for RFC6902.
Contains the defined methods from standards:
* add
* remove
* replace
* move
* copy
* test
Attributes:
patch: List of patch items.
"""
[docs] def __init__(self):
self.patch = []
"""List of patch tasks. """
self.deep = False
"""Defines copy operations, True:=deep, False:=swallow"""
#
#--- RFC6902 JSON patch files
#
[docs] def __add__(self,x=None):
"""Creates a copy of 'self' and adds a patch jobs to the task queue.
"""
if not x:
raise JSONPatchException("Missing patch entry/patch")
if isinstance(x, JSONPatchItem):
return JSONPatch(self.patch).patch.append(x)
elif isinstance(x, JSONPatch):
return JSONPatch(self.patch).patch.extend(x.patch)
else:
raise JSONPatchException("Unknown input"+type(x))
[docs] def __call__(self, j, x=None):
"""Evaluates the related task for the provided index.
Args:
x: Task index.
j: JSON data the task has to be
applied on.
Returns:
Returns a tuple of:
0: len of the job list
1: list of the execution status for
the tasks
Raises:
JSONPatchException:
"""
if type(x) is NoneType:
return self.apply(j)
if self.patch[x](j):
return 1,[]
return 1,[0]
[docs] def __eq__(self,x):
"""Compares this pointer with x.
Args:
x: A valid Pointer.
Returns:
True or False
Raises:
JSONPointerException
"""
match = len(self.patch)
if match != len(x):
return False
for p in sorted(self.patch):
for xi in sorted(x):
if p==xi:
match -= 1
continue
return match == 0
[docs] def __getitem__(self,key):
"""Support of slices, for 'iterator' refer to self.__iter__.
#. self[key]
#. self[i:j:k]
#. x in self
#. for x in self
"""
return self.patch[key]
[docs] def __iadd__(self,x=None):
"""Adds patch jobs to the task queue in place.
"""
if not x:
raise JSONPatchException("Missing patch entry/patch")
if isinstance(x, JSONPatchItem):
self.patch.append(x)
elif isinstance(x, JSONPatch):
self.patch.extend(x.patch)
else:
raise JSONPatchException("Unknown input"+type(x))
return self
[docs] def __isub__(self,x):
"""Removes the patch job from the task queue in place.
Removes one of the following type(x) variants:
int: The patch job with given index.
JSONPatchItem: The first matching entry from
the task queue.
Args:
x: Item to be removed.
Returns:
Returns resulting list without x.
Raises:
JSONPatchException:
"""
if type(x) is int:
self.patch.pop(x)
else:
self.patch.remove(x)
return self
[docs] def __iter__(self):
"""Provides an iterator foreseen for large amounts of in-memory patches.
"""
return iter(self.patch)
[docs] def __len__(self):
"""The number of outstanding patches.
"""
return len(self.patch)
[docs] def __ne__(self, x):
"""Compares this pointer with x.
Args:
x: A valid Pointer.
Returns:
True or False
Raises:
JSONPointerException
"""
return not self.__eq__(x)
[docs] def __repr__(self):
"""Prints the representation format of a JSON patch list.
"""
ret = "["
if self.patch:
if len(self.patch)>1:
for p in self.patch[:-1]:
ret += repr(p)+", "
ret += repr(self.patch[-1])
ret += "]"
return unicode(ret)
[docs] def __str__(self):
"""Prints the display format.
"""
ret = "[\n"
if self.patch:
if len(self.patch)>1:
for p in self.patch[:-1]:
ret += " "+repr(p)+",\n"
ret += " "+repr(self.patch[-1])+"\n"
ret += "]"
return str(ret)
[docs] def __sub__(self,x):
"""Removes the patch job from the task queue.
Removes one of the following type(x) variants:
int: The patch job with given index.
JSONPatchItem: The first matching entry from
the task queue.
Args:
x: Item to be removed.
Returns:
Returns resulting list without x.
Raises:
JSONPatchException:
"""
ret = JSONPatch()
if self.deep:
ret.patch = self.patch[:]
else:
ret.patch = self.patch
if type(x) is int:
ret.patch.pop(x)
else:
ret.patch.remove(x)
return ret
[docs] def apply(self,jsondata):
"""Applies the JSONPatch task.
Args:
jsondata: JSON data the joblist has to be applied on.
Returns:
Returns a tuple of:
0: len of the job list
1: list of the execution status for the tasks
Raises:
JSONPatchException:
"""
status = []
for p in self.patch:
if not p.apply(jsondata):
status.append(self.patch.index(p)) # should not be called frequently
return len(self.patch),status
[docs] def get(self,x=None):
"""
"""
ret = self.patch
#FIXME:
return ret
[docs] def patch_export(self, patchfile, schema=None, **kargs):
"""Exports the current task list.
Provided formats are:
RFC6902
Supports the formats:
RFC6902
Args:
patchfile:
JSON patch for export.
schema:
JSON-Schema for validation of the patch list.
**kargs:
validator: [default, draft3, off, ]
Sets schema validator for the data file.
The values are: default=validate, draft3=Draft3Validator,
off=None.
default:= validate
Returns:
When successful returns 'True', else raises an exception.
Raises:
JSONPatchException:
"""
try:
with open(patchfile, 'w') as fp:
fp.writelines(self.repr_export())
except Exception as e:
raise JSONPatchException("open-"+str(e),"data.dump",str(patchfile))
return True
[docs] def patch_import(self, patchfile, schemafile=None, **kargs):
"""Imports a task list.
Supports the formats:
RFC6902
Args:
patchfile:
JSON patch filename containing the list of patch operations.
schemafile:
JSON-Schema filename for validation of the patch list.
**kargs:
validator: [default, draft3, off, ]
Sets schema validator for the data file.
The values are: default=validate, draft3=Draft3Validator,
off=None.
default:= validate
Returns:
When successful returns 'True', else raises an exception.
Raises:
JSONPatchException:
"""
appname = _appname
kargs = {}
kargs['datafile'] = patchfile
kargs['schemafile'] = schemafile
kargs['validator'] = MODE_SCHEMA_OFF
for k,v in kargs.items():
if k == 'nodefaultpath':
kargs['nodefaultpath'] = True
elif k == 'pathlist':
kargs['pathlist'] = v
elif k == 'validator':
kargs['validator'] = v
elif k == 'appname':
appname = v
patchdata = JSONDataSerializer(appname,**kargs)
for pi in patchdata.data:
self += JSONPatchItemRaw(pi)
return True
[docs] def repr_export(self):
"""Prints the export representation format of a JSON patch list.
"""
ret = "["
if self.patch:
if len(self.patch)>1:
for p in self.patch[:-1]:
ret += p.repr_export()+", "
ret += self.patch[-1].repr_export()
ret += "]"
return ret