import functools
import json
import warnings
from collections.abc import Mapping, MutableMapping, Sequence
from urllib import parse as urlparse
from urllib.parse import unquote
from urllib.request import urlopen
from . import proxytypes # noqa: F401
from .proxytypes import LazyProxy
try:
# If requests >=1.0 is available, we will use it
import requests
if not callable(requests.Response.json):
requests = None
except ImportError:
requests = None
__version__ = "1.1.0"
[docs]
class JsonRefError(Exception):
def __init__(self, message, reference, uri="", base_uri="", path=(), cause=None):
self.message = message
self.reference = reference
self.uri = uri
self.base_uri = base_uri
self.path = path
self.cause = self.__cause__ = cause
def __repr__(self):
return "<%s: %r>" % (self.__class__.__name__, self.message)
def __str__(self):
return str(self.message)
[docs]
class JsonRef(LazyProxy):
"""
A lazy loading proxy to the dereferenced data pointed to by a JSON
Reference object.
"""
__notproxied__ = ("__reference__",)
@classmethod
def replace_refs(
cls, obj, base_uri="", loader=None, jsonschema=False, load_on_repr=True
):
"""
.. deprecated:: 0.4
Use :func:`replace_refs` instead.
Returns a deep copy of `obj` with all contained JSON reference objects
replaced with :class:`JsonRef` instances.
:param obj: If this is a JSON reference object, a :class:`JsonRef`
instance will be created. If `obj` is not a JSON reference object,
a deep copy of it will be created with all contained JSON
reference objects replaced by :class:`JsonRef` instances
:param base_uri: URI to resolve relative references against
:param loader: Callable that takes a URI and returns the parsed JSON
(defaults to global ``jsonloader``)
:param jsonschema: Flag to turn on `JSON Schema mode
<http://json-schema.org/latest/json-schema-core.html#anchor25>`_.
'id' keyword changes the `base_uri` for references contained within
the object
:param load_on_repr: If set to ``False``, :func:`repr` call on a
:class:`JsonRef` object will not cause the reference to be loaded
if it hasn't already. (defaults to ``True``)
"""
return replace_refs(
obj,
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
)
def __init__(
self,
refobj,
base_uri="",
loader=None,
jsonschema=False,
load_on_repr=True,
merge_props=False,
_path=(),
_store=None,
):
if not isinstance(refobj.get("$ref"), str):
raise ValueError("Not a valid json reference object: %s" % refobj)
self.__reference__ = refobj
self.base_uri = base_uri
self.loader = loader or jsonloader
self.jsonschema = jsonschema
self.load_on_repr = load_on_repr
self.merge_props = merge_props
self.path = _path
self.store = _store # Use the same object to be shared with children
if self.store is None:
self.store = URIDict()
@property
def _ref_kwargs(self):
return dict(
base_uri=self.base_uri,
loader=self.loader,
jsonschema=self.jsonschema,
load_on_repr=self.load_on_repr,
merge_props=self.merge_props,
path=self.path,
store=self.store,
)
@property
def full_uri(self):
return urlparse.urljoin(self.base_uri, self.__reference__["$ref"])
def callback(self):
uri, fragment = urlparse.urldefrag(self.full_uri)
# If we already looked this up, return a reference to the same object
if uri not in self.store:
# Remote ref
try:
base_doc = self.loader(uri)
except Exception as e:
raise self._error(
"%s: %s" % (e.__class__.__name__, str(e)), cause=e
) from e
base_doc = _replace_refs(
base_doc, **{**self._ref_kwargs, "base_uri": uri, "recursing": False}
)
else:
base_doc = self.store[uri]
result = self.resolve_pointer(base_doc, fragment)
if result is self:
raise self._error("Reference refers directly to itself.")
if hasattr(result, "__subject__"):
result = result.__subject__
if (
self.merge_props
and isinstance(result, Mapping)
and len(self.__reference__) > 1
):
result = {
**result,
**{k: v for k, v in self.__reference__.items() if k != "$ref"},
}
return result
def resolve_pointer(self, document, pointer):
"""
Resolve a json pointer ``pointer`` within the referenced ``document``.
:argument document: the referent document
:argument str pointer: a json pointer URI fragment to resolve within it
"""
parts = unquote(pointer.lstrip("/")).split("/") if pointer else []
for part in parts:
part = part.replace("~1", "/").replace("~0", "~")
if isinstance(document, Sequence):
# Try to turn an array index to an int
try:
part = int(part)
except ValueError:
pass
# If a reference points inside itself, it must mean inside reference object, not the referent data
if document is self:
document = self.__reference__
try:
document = document[part]
except (TypeError, LookupError) as e:
raise self._error(
"Unresolvable JSON pointer: %r" % pointer, cause=e
) from e
return document
def _error(self, message, cause=None):
message = "Error while resolving `{}`: {}".format(self.full_uri, message)
return JsonRefError(
message,
self.__reference__,
uri=self.full_uri,
base_uri=self.base_uri,
path=self.path,
cause=cause,
)
def __repr__(self):
if hasattr(self, "cache") or self.load_on_repr:
return repr(self.__subject__)
return "JsonRef(%r)" % self.__reference__
class URIDict(MutableMapping):
"""
Dictionary which uses normalized URIs as keys.
"""
def normalize(self, uri):
return urlparse.urlsplit(uri).geturl()
def __init__(self, *args, **kwargs):
self.store = dict()
self.store.update(*args, **kwargs)
def __getitem__(self, uri):
return self.store[self.normalize(uri)]
def __setitem__(self, uri, value):
self.store[self.normalize(uri)] = value
def __delitem__(self, uri):
del self.store[self.normalize(uri)]
def __iter__(self):
return iter(self.store)
def __len__(self):
return len(self.store)
def __repr__(self):
return repr(self.store)
[docs]
def jsonloader(uri, **kwargs):
"""
Provides a callable which takes a URI, and returns the loaded JSON referred
to by that URI. Uses :mod:`requests` if available for HTTP URIs, and falls
back to :mod:`urllib`.
"""
scheme = urlparse.urlsplit(uri).scheme
if scheme in ["http", "https"] and requests:
# Prefer requests, it has better encoding detection
resp = requests.get(uri)
# If the http server doesn't respond normally then raise exception
# e.g. 404, 500 error
resp.raise_for_status()
try:
result = resp.json(**kwargs)
except TypeError:
warnings.warn("requests >=1.2 required for custom kwargs to json.loads")
result = resp.json()
else:
# Otherwise, pass off to urllib and assume utf-8
with urlopen(uri) as content:
result = json.loads(content.read().decode("utf-8"), **kwargs)
return result
def _walk_refs(obj, func, replace=False, _processed=None):
# Keep track of already processed items to prevent recursion
_processed = _processed or {}
if type(obj) is JsonRef:
oid = id(obj)
if oid in _processed:
return _processed[oid]
r = func(obj)
obj = r if replace else obj
_processed[oid] = obj
if isinstance(obj, Mapping):
for k, v in obj.items():
r = _walk_refs(v, func, replace=replace, _processed=_processed)
if replace:
obj[k] = r
elif isinstance(obj, Sequence) and not isinstance(obj, str):
for i, v in enumerate(obj):
r = _walk_refs(v, func, replace=replace, _processed=_processed)
if replace:
obj[i] = r
return obj
[docs]
def replace_refs(
obj,
base_uri="",
loader=jsonloader,
jsonschema=False,
load_on_repr=True,
merge_props=False,
proxies=True,
lazy_load=True,
):
"""
Returns a deep copy of `obj` with all contained JSON reference objects
replaced with :class:`JsonRef` instances.
:param obj: If this is a JSON reference object, a :class:`JsonRef`
instance will be created. If `obj` is not a JSON reference object,
a deep copy of it will be created with all contained JSON
reference objects replaced by :class:`JsonRef` instances
:param base_uri: URI to resolve relative references against
:param loader: Callable that takes a URI and returns the parsed JSON
(defaults to global ``jsonloader``, a :class:`JsonLoader` instance)
:param jsonschema: Flag to turn on `JSON Schema mode
<http://json-schema.org/latest/json-schema-core.html#anchor25>`_.
'id' or '$id' keyword changes the `base_uri` for references contained
within the object
:param load_on_repr: If set to ``False``, :func:`repr` call on a
:class:`JsonRef` object will not cause the reference to be loaded
if it hasn't already. (defaults to ``True``)
:param merge_props: When ``True``, JSON reference objects that
have extra keys other than '$ref' in them will be merged into the
document resolved by the reference (if it is a dictionary.) NOTE: This
is not part of the JSON Reference spec, and may not behave the same as
other libraries.
:param proxies: If `True`, references will be replaced with transparent
proxy objects. Otherwise, they will be replaced directly with the
referred data. (defaults to ``True``)
:param lazy_load: When proxy objects are used, and this is `True`, the
references will not be resolved until that section of the JSON
document is accessed. (defaults to ``True``)
"""
result = _replace_refs(
obj,
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
merge_props=merge_props,
store=URIDict(),
path=(),
recursing=False,
)
if not proxies:
_walk_refs(result, lambda r: r.__subject__, replace=True)
elif not lazy_load:
_walk_refs(result, lambda r: r.__subject__)
return result
def _replace_refs(
obj,
*,
base_uri,
loader,
jsonschema,
load_on_repr,
merge_props,
store,
path,
recursing,
):
base_uri, frag = urlparse.urldefrag(base_uri)
store_uri = None # If this does not get set, we won't store the result
if not frag and not recursing:
store_uri = base_uri
if jsonschema and isinstance(obj, Mapping):
# id changed to $id in later jsonschema versions
id_ = obj.get("$id") or obj.get("id")
if isinstance(id_, str):
base_uri = urlparse.urljoin(base_uri, id_)
store_uri = base_uri
# First recursively iterate through our object, replacing children with JsonRefs
if isinstance(obj, Mapping):
obj = {
k: _replace_refs(
v,
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
merge_props=merge_props,
store=store,
path=path + (k,),
recursing=True,
)
for k, v in obj.items()
}
elif isinstance(obj, Sequence) and not isinstance(obj, str):
obj = [
_replace_refs(
v,
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
merge_props=merge_props,
store=store,
path=path + (i,),
recursing=True,
)
for i, v in enumerate(obj)
]
# If this object itself was a reference, replace it with a JsonRef
if isinstance(obj, Mapping) and isinstance(obj.get("$ref"), str):
obj = JsonRef(
obj,
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
merge_props=merge_props,
_path=path,
_store=store,
)
# Store the document with all references replaced in our cache
if store_uri is not None:
store[store_uri] = obj
return obj
[docs]
def load(
fp,
base_uri="",
loader=None,
jsonschema=False,
load_on_repr=True,
merge_props=False,
proxies=True,
lazy_load=True,
**kwargs,
):
"""
Drop in replacement for :func:`json.load`, where JSON references are
proxied to their referent data.
:param fp: File-like object containing JSON document
:param **kwargs: This function takes any of the keyword arguments from
:func:`replace_refs`. Any other keyword arguments will be passed to
:func:`json.load`
"""
if loader is None:
loader = functools.partial(jsonloader, **kwargs)
return replace_refs(
json.load(fp, **kwargs),
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
merge_props=merge_props,
proxies=proxies,
lazy_load=lazy_load,
)
[docs]
def loads(
s,
base_uri="",
loader=None,
jsonschema=False,
load_on_repr=True,
merge_props=False,
proxies=True,
lazy_load=True,
**kwargs,
):
"""
Drop in replacement for :func:`json.loads`, where JSON references are
proxied to their referent data.
:param s: String containing JSON document
:param **kwargs: This function takes any of the keyword arguments from
:func:`replace_refs`. Any other keyword arguments will be passed to
:func:`json.loads`
"""
if loader is None:
loader = functools.partial(jsonloader, **kwargs)
return replace_refs(
json.loads(s, **kwargs),
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
merge_props=merge_props,
proxies=proxies,
lazy_load=lazy_load,
)
[docs]
def load_uri(
uri,
base_uri=None,
loader=None,
jsonschema=False,
load_on_repr=True,
merge_props=False,
proxies=True,
lazy_load=True,
):
"""
Load JSON data from ``uri`` with JSON references proxied to their referent
data.
:param uri: URI to fetch the JSON from
:param **kwargs: This function takes any of the keyword arguments from
:func:`replace_refs`
"""
if loader is None:
loader = jsonloader
if base_uri is None:
base_uri = uri
return replace_refs(
loader(uri),
base_uri=base_uri,
loader=loader,
jsonschema=jsonschema,
load_on_repr=load_on_repr,
merge_props=merge_props,
proxies=proxies,
lazy_load=lazy_load,
)
[docs]
def dump(obj, fp, **kwargs):
"""
Serialize `obj`, which may contain :class:`JsonRef` objects, as a JSON
formatted stream to file-like `fp`. `JsonRef` objects will be dumped as the
original reference object they were created from.
:param obj: Object to serialize
:param fp: File-like to output JSON string
:param kwargs: Keyword arguments are the same as to :func:`json.dump`
"""
# Strangely, json.dumps does not use the custom serialization from our
# encoder on python 2.7+. Instead, just write json.dumps output to a file.
fp.write(dumps(obj, **kwargs))
[docs]
def dumps(obj, **kwargs):
"""
Serialize `obj`, which may contain :class:`JsonRef` objects, to a JSON
formatted string. `JsonRef` objects will be dumped as the original
reference object they were created from.
:param obj: Object to serialize
:param kwargs: Keyword arguments are the same as to :func:`json.dumps`
"""
kwargs["cls"] = _ref_encoder_factory(kwargs.get("cls", json.JSONEncoder))
return json.dumps(obj, **kwargs)
def _ref_encoder_factory(cls):
class JSONRefEncoder(cls):
def default(self, o):
if hasattr(o, "__reference__"):
return o.__reference__
return super(JSONRefEncoder, cls).default(o)
# Python 2.6 doesn't work with the default method
def _iterencode(self, o, *args, **kwargs):
if hasattr(o, "__reference__"):
o = o.__reference__
return super(JSONRefEncoder, self)._iterencode(o, *args, **kwargs)
# Pypy doesn't work with either of the other methods
def _encode(self, o, *args, **kwargs):
if hasattr(o, "__reference__"):
o = o.__reference__
return super(JSONRefEncoder, self)._encode(o, *args, **kwargs)
return JSONRefEncoder