from __future__ import absolute_import
import collections
import datetime
import inspect
import logging
import jsonschema
import six
from . import formats
from . import lib
[docs]class SchemaDict(dict):
def __init__(self, origin, *args, **kwargs):
super(SchemaDict, self).__init__(*args, **kwargs)
self.origin = origin
[docs]class Set(set):
def __init__(self, items=None):
if not items:
super(Set, self).__init__()
elif isinstance(items, (list, tuple, set)):
super(Set, self).__init__(items)
else:
super(Set, self).__init__([items])
[docs]def constraint(code, hint=None):
def decorate(func):
func._constraint = code
func._constraint_hint = hint
return func
return decorate
[docs]class Schema(object):
_creation_index = 0
_attrs = None
_fields = None
_parent = None
_logger = None
def __init__(self, _jsonschema=None, **attrs):
self._creation_index = Schema._creation_index
Schema._creation_index += 1
if _jsonschema:
self._jsonschema = _jsonschema
self._attrs = dict(self._attrs or {}, **attrs)
self._attrs['hidden'] = self._attrs.get('hidden', False)
self._attrs['dialect'] = Set(self._attrs.get('dialect'))
self._attrs['tags'] = Set(self._attrs.get('tags'))
self._attrs['exclude'] = Set(self._attrs.get('exclude'))
self._attrs['exclude_tags'] = Set(self._attrs.get('exclude_tags'))
self._validators = self._get_validators()
if self._validators and 'constraints' not in self._attrs:
self._attrs['constraints'] = dict([
(v._constraint, v._constraint_hint)
for v in self._validators.values()
])
def _get_validators(self):
validators = inspect.getmembers(
self, lambda x: hasattr(x, '_constraint')
)
return dict([(v._constraint, v) for n, v in validators])
[docs] def get_attr(self, name, default=None, expected=None):
if self.has_attr(name, expected):
return getattr(self, name)
return default
[docs] def has_attr(self, name, expected=None):
if not hasattr(self, name):
return False
if expected:
if isinstance(getattr(self, name), expected):
return True
raise TypeError(
'Invalid type of \'%s\', expected: %s' % expected
)
return True
@property
def parent(self):
return self._parent or self
@property
def root(self):
root = self
while root._parent:
root = root._parent
return root
@property
def exclude_tags(self):
tags = self._attrs['exclude_tags'].copy()
if self._parent:
tags.update(self._parent.exclude_tags)
return tags
@property
def dialect(self):
return self.root._attrs.get('dialect')
@property
def is_excluded(self):
return self.fieldname in self.parent.exclude or \
self.tags & self.exclude_tags or \
not self.is_exclusive or \
self.hidden
@property
def exclusive(self):
return self._attrs.get('exclusive')
@property
def is_exclusive(self):
exclusive = self.parent.exclusive
return exclusive is None or self.fieldname in exclusive
@property
def fieldname(self):
return self._attrs.get('fieldname')
@property
def logger(self):
if getattr(self, '_logger', None):
return self._logger
logclass = self._attrs.get(
'logger', 'pyrs.schema.'+self.__class__.__name__
)
self._logger = logging.getLogger(logclass)
return self._logger
[docs] def getter(self, func):
self._attrs['getvalue'] = func
return func
[docs] def setter(self, func):
self._attrs['setvalue'] = func
return func
[docs] def get_jsonschema(self):
return SchemaDict(self, self._jsonschema)
[docs] def to_raw(self, value):
"""Convert the value to a dict of primitives"""
return value
[docs] def to_python(self, value):
"""Convert the value to a real python object"""
return value
@classmethod
[docs] def mixin(cls, src):
for name, member in inspect.getmembers(src):
if name.startswith('__'):
continue
if inspect.ismethod(member):
member = member.__func__
setattr(cls, name, member)
return src
def __getattr__(self, name):
if name not in (self._attrs or ()):
raise AttributeError(
"'%s' object has no attribute '%s'" % (
self.__class__.__name__, name
)
)
return self._attrs[name]
def __contains__(self, name):
return name in self._attrs
def __eq__(self, other):
if isinstance(other, dict):
return self.get_jsonschema() == other
if isinstance(other, Schema):
return self.get_jsonschema() == other.get_jsonschema()
return id(self) == id(other)
@six.add_metaclass(DeclarativeMetaclass)
[docs]class Base(Schema):
_type = None
_attrs = None
_definitions = None
_version = None
def __init__(self, **attrs):
super(Base, self).__init__(**attrs)
if self._fields:
for field in self._fields.values():
field._parent = self
if self._definitions:
for field in self._definitions.values():
field._parent = self
[docs] def get_jsonschema(self):
_type = self.get_attr('type', self._type)
schema = SchemaDict(self)
if _type:
schema['type'] = _type
if self.get_attr("null"):
schema["type"] = lib.ensure_list(_type) + ["null"]
if self.get_attr("id"):
schema["id"] = self.get_attr("id")
if self.get_attr("enum"):
schema["enum"] = self.get_attr("enum")
if self.get_attr("format"):
schema["format"] = self.get_attr("format")
if self.get_attr("title"):
schema["title"] = self.get_attr("title")
if self.get_attr("description"):
schema["description"] = self.get_attr("description")
if self.get_attr("default"):
schema["default"] = self.get_attr("default")
if self.get_attr("constraints"):
schema["constraints"] = self.get_attr("constraints")
if self._version:
schema["version"] = self._version
if self._definitions:
definitions = collections.OrderedDict()
for name, prop in self._definitions.items():
definitions[prop.get_attr("name", name)] = \
prop.get_jsonschema()
schema["definitions"] = definitions
return schema
def _types_msg(instance, types, hint=''):
reprs = []
for type in types:
try:
reprs.append(repr(type["name"]))
except Exception:
reprs.append(repr(type))
return "%r is not of type %s%s" % (instance, ", ".join(reprs), hint)
def _validate_type_draft4(validator, types, instance, schema):
if isinstance(types, six.string_types):
types = [types]
if 'null' in types and instance is None:
return
if (
'string' in types and
'string' in schema.get('type') and
schema.get('format') in [
'date', 'datetime', 'time', 'duration', 'timestamp'
]
):
if isinstance(instance, six.string_types):
return
if schema.get('format') == 'date' and \
isinstance(instance, datetime.date):
return
if schema.get('format') == 'datetime' and \
isinstance(instance, datetime.datetime):
return
if schema.get('format') == 'time' and \
isinstance(instance, datetime.time):
return
if schema.get('format') == 'timestamp' and \
isinstance(instance, (datetime.timedelta, int, float)):
return
if schema.get('format') == 'duration' and \
isinstance(instance, (datetime.timedelta, int, float)):
return
json_format_name = schema.get('format')
datetime_type_name = json_format_name.replace('-', '')
hint = ' (for format %r strings, use a datetime.%s)' % (
json_format_name, datetime_type_name
)
yield jsonschema.ValidationError(
_types_msg(instance, types, hint)
)
if not any(validator.is_type(instance, type) for type in types):
yield jsonschema.ValidationError(_types_msg(instance, types))
def _validate_constraints(validator, constraints, instance, schema):
for constraint, message in constraints.items():
for ex in (schema.origin._validators[constraint](instance) or ()):
yield jsonschema.ValidationError(
ex.message or message,
validator_value=ex.against or constraint
)
def _make_validator(schema):
format_checker = jsonschema.FormatChecker(formats.draft4_format_checkers)
validator_funcs = jsonschema.Draft4Validator.VALIDATORS
validator_funcs['type'] = _validate_type_draft4
validator_funcs['constraints'] = _validate_constraints
meta_schema = jsonschema.Draft4Validator.META_SCHEMA
validator_cls = jsonschema.validators.create(
meta_schema=meta_schema,
validators=validator_funcs,
version="draft4",
)
validator_cls.check_schema(schema)
return validator_cls(schema, format_checker=format_checker)