Source code for transit.reader
## Copyright 2014 Cognitect. All Rights Reserved.
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS-IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
import json
import sosjson
import msgpack
from decoder import Decoder
from collections import OrderedDict
[docs]class Reader(object):
"""The top-level object for reading in Transit data and converting it to
Python objects. During initialization, you must specify the protocol used
for unmarshalling the data- json or msgpack.
"""
def __init__(self, protocol="json"):
if protocol in ("json", "json_verbose"):
self.reader = JsonUnmarshaler()
elif protocol == "msgpack":
self.reader = MsgPackUnmarshaler()
self.unpacker = self.reader.unpacker
else:
raise ValueError("'" + protocol + "' is not a supported. " +
"Protocol must be:" +
"'json', 'json_verbose', or 'msgpack'.")
[docs] def read(self, stream):
"""Given a readable file descriptor object (something `load`able by
msgpack or json), read the data, and return the Python representation
of the contents. One-shot reader.
"""
return self.reader.load(stream)
[docs] def register(self, key_or_tag, f_val):
"""Register a custom transit tag and decoder/parser function for use
during reads.
"""
self.reader.decoder.register(key_or_tag, f_val)
[docs] def readeach(self, stream, **kwargs):
"""Temporary hook for API while streaming reads are in experimental
phase. Read each object from stream as available with generator.
JSON blocks indefinitely waiting on JSON entities to arrive. MsgPack
requires unpacker property to be fed stream using unpacker.feed()
method.
"""
for o in self.reader.loadeach(stream):
yield o
[docs]class JsonUnmarshaler(object):
"""The top-level Unmarshaler used by the Reader for JSON payloads. While
you may use this directly, it is strongly discouraged.
"""
def __init__(self):
self.decoder = Decoder()
[docs] def load(self, stream):
return self.decoder.decode(json.load(stream,
object_pairs_hook=OrderedDict))
[docs] def loadeach(self, stream):
for o in sosjson.items(stream, object_pairs_hook=OrderedDict):
yield self.decoder.decode(o)
[docs]class MsgPackUnmarshaler(object):
"""The top-level Unmarshaler used by the Reader for MsgPack payloads.
While you may use this directly, it is strongly discouraged.
"""
def __init__(self):
self.decoder = Decoder()
self.unpacker = msgpack.Unpacker(object_pairs_hook=OrderedDict)
[docs] def load(self, stream):
return self.decoder.decode(msgpack.load(stream,
object_pairs_hook=OrderedDict))
[docs] def loadeach(self, stream):
for o in self.unpacker:
yield self.decoder.decode(o)