Source code for pyrs.schema.types

"""
This module introduce the basic schema types.
"""
import collections
import datetime

import isodate
import six

from . import base
from . import exceptions
from . import formats


[docs]class Any(base.Base): pass
[docs]class String(base.Base): """ String specific arguments: pattern: The value of this keyword MUST be a string. This string SHOULD be a valid regular expression, according to the ECMA 262 regular expression dialect. A string instance is considered valid if the regular expression matches the instance successfully. Recall: regular expressions are not implicitly anchored. minlen (int >=0): The value of this keyword MUST be an integer. This integer MUST be greater than, or equal to, 0. A string instance is valid against this keyword if its length is greater than, or equal to, the value of this keyword. maxlen (int >=minlen): The value of this keyword MUST be an integer. This integer MUST be greater than, or equal to, 0. A string instance is valid against this keyword if its length is less than, or equal to, the value of this keyword. blank (bool): The value of `blank` MUST be a boolean. Successful validation depends on presence and value of `min_len`. If `min_len` is present and its value is greather than 0 this keyword has no effect. If `min_len` is not present or its value is 0 the value of `min_len` will be set to 1. """ _type = 'string'
[docs] def get_jsonschema(self): schema = super(String, self).get_jsonschema() if self.has_attr('pattern', six.string_types): schema['pattern'] = self.get_attr('pattern') if self.has_attr('min_len', int): schema['minLength'] = self.get_attr('min_len') if not self.get_attr('blank', default=True, expected=bool): schema['minLength'] = max(self.get_attr('min_len', 0), 1) if self.has_attr('max_len', int): schema['maxLength'] = self.get_attr('max_len') return schema
[docs]class Number(base.Base): """ Number specific agruments: maximum, exclusive_max: The value of `maximum` MUST be a number. The value of `exclusive_max` MUST be a boolean. If `exclusive_max` is present, `maximum` MUST also be present. Successful validation depends on the presence and value of `exclusive_max`. If it iss is not present, or has boolean value false, then the instance is valid if it is lower than, or equal to, the value of `maximum`. If `exclusive_max` has boolean value true, the instance is valid if it is strictly lower than the value of `maximum` minimum, exclusive_min: The value of `minimum` MUST be a number. The value of `exclusive_min` MUST be a boolean. If `exclusive_min` is present, `minimum` MUST also be present. Successful validation depends on the presence and value of `exclusive_min`. If it iss is not present, or has boolean value false, then the instance is valid if it is greater than, or equal to, the value of `minimum`. If `exclusive_min` is present and has boolean value true, the instance is valid if it is strictly greater than the value of `minimum`. multiple: The value MUST be an number. This number MUST be strictly greater than 0. A numeric instance is valid against `multiple` if the result of the division of the instance by this keyword's value is an integer. """ _type = 'number'
[docs] def get_jsonschema(self): schema = super(Number, self).get_jsonschema() if self.has_attr('multiple', int): schema['multipleOf'] = self.get_attr('multiple') if self.has_attr('maximum', int): schema['maximum'] = self.get_attr('maximum') if self.has_attr('minimum', int): schema['minimum'] = self.get_attr('minimum') if self.has_attr('exclusive_max', bool) and 'maximum' in schema: schema['exclusiveMaximum'] = self.get_attr('exclusive_max') if self.has_attr('exclusive_min', bool) and 'minimum' in schema: schema['exclusiveMinimum'] = self.get_attr('exclusive_min') return schema
[docs]class Integer(Number): """ Integer specific agruments: maximum, exclusive_max: The value of `maximum` MUST be a number. The value of `exclusive_max` MUST be a boolean. If "exclusiveMaximum" is present, "maximum" MUST also be present. Successful validation depends on the presence and value of `exclusive_max`. If it iss is not present, or has boolean value false, then the instance is valid if it is lower than, or equal to, the value of `maximum`. If `exclusive_max` has boolean value true, the instance is valid if it is strictly lower than the value of `maximum` minimum, exclusive_min: The value of `minimum` MUST be a number. The value of `exclusive_min` MUST be a boolean. If "exclusiveMinimum" is present, "minimum" MUST also be present. Successful validation depends on the presence and value of `exclusive_min`. If it iss is not present, or has boolean value false, then the instance is valid if it is greater than, or equal to, the value of `minimum`. If `exclusive_min` is present and has boolean value true, the instance is valid if it is strictly greater than the value of `minimum`. multiple: The value MUST be an number. This number MUST be strictly greater than 0. A numeric instance is valid against `multiple` if the result of the division of the instance by this keyword's value is an integer. """ _type = "integer"
[docs]class Boolean(base.Base): _type = 'boolean'
[docs]class Array(base.Base): """ Successful validation of an array instance with regards to these two keywords is determined as follows: if "items" is not present, or its value is an object, validation of the instance always succeeds, regardless of the value of "additional" if the value of "additional" is boolean value true or an object, validation of the instance always succeeds; if the value of "additional" is boolean value false and the value of "items" is an array, the instance is valid if its size is less than, or equal to, the size of "items". Array specific options: min_items: An array instance is valid against "min_items" if its size is greater than, or equal to, the value of this keyword. max_items: An array instance is valid against "max_items" if its size is less than, or equal to, the value of this keyword. unique_items: If this keyword has boolean value false, the instance validates successfully. If it has boolean value true, the instance validates successfully if all of its elements are unique. """ _type = 'array' def __init__(self, **attrs): super(Array, self).__init__(**attrs) if self.get_attr('items'): if not isinstance(self.items, list): self.items._parent = self else: for item in self.items: item._parent = self if isinstance(self.get_attr('additional'), base.Schema): self.additional._parent = self
[docs] def get_jsonschema(self): schema = super(Array, self).get_jsonschema() if self.has_attr('additional'): if isinstance(self.get_attr('additional'), base.Schema): schema['additionalItems'] = \ self.get_attr('additional').get_jsonschema() elif isinstance(self.get_attr('additional'), bool): schema['additionalItems'] = self.get_attr('additional') else: raise TypeError('The additional should be bool or schema') if self.get_attr('max_items') is not None: schema['maxItems'] = self.get_attr('max_items') if self.get_attr('min_items') is not None: schema['minItems'] = self.get_attr('min_items') if self.get_attr('unique_items') is not None: schema['uniqueItems'] = self.get_attr('unique_items') if self.get_attr('items'): if isinstance(self.items, list): schema['items'] = [s.get_jsonschema() for s in self.items] else: schema['items'] = self.items.get_jsonschema() return schema
[docs]class Object(base.Base): """Declarative schema object Object specific attributes: additional: boolean value: enable or disable extra items on the object schema: items which are valid against the schema allowed to extend **false by default** min_properties: An object instance is valid against `min_properties` if its number of properties is greater than, or equal to, the value. max_properties: An object instance is valid against `max_properties` if its number of properties is less than, or equal to, the value. pattern: Should be a dict where the keys are valid regular excpressions and the values are schema instances. The object instance is valid if the extra properties (which are not listed as property) valid against the schema while name is match on the pattern. Be careful, the pattern sould be explicit as possible, if the pattern match on any normal property the validation should be successful against them as well. A normal object should looks like the following: .. code:: python class Translation(types.Object): keyword = types.String() value = types.String() class Attrs: additional = False patterns = { 'value_[a-z]{2}': types.String() } """ _type = "object" _attrs = {'additional': False} def __init__(self, extend=None, **attrs): super(Object, self).__init__(**attrs) if extend: self.extend(extend)
[docs] def get_jsonschema(self): schema = super(Object, self).get_jsonschema() if self.get_attr('additional') is not None: if isinstance(self.get_attr('additional'), bool): schema['additionalProperties'] = self.get_attr('additional') else: schema['additionalProperties'] = \ self.get_attr('additional').get_jsonschema() if self.get_attr('min_properties') is not None: schema['minProperties'] = self.get_attr('min_properties') if self.get_attr('max_properties') is not None: schema['maxProperties'] = self.get_attr('max_properties') if self.get_attr('patterns'): patterns = collections.OrderedDict() for reg, pattern in self.get_attr('patterns').items(): patterns[reg] = pattern.get_jsonschema() schema['patternProperties'] = patterns self._update_jsonschema_properties(schema) return schema
def _update_jsonschema_properties(self, schema): if not self._fields: return required = [] properties = collections.OrderedDict() for key, prop in self._fields.items(): if prop.is_excluded: continue name = prop.get_attr("name", key) properties[name] = prop.get_jsonschema() if prop.get_attr('required'): required.append(name) schema["properties"] = properties if required: schema['required'] = sorted(required) pass @property def fields(self): return self._fields
[docs] def extend(self, properties): """Extending the exist same with new properties. If you want to extending with an other schema, you should use the other schame `properties` """ for fieldname, prop in properties.items(): prop._parent = self prop._attrs['fieldname'] = fieldname self._fields.update(properties)
[docs] def to_python(self, value): """Convert the value to a real python object""" value = value.copy() res = {} errors = [] for fieldname, field in self._fields.items(): name = field.get_attr('name', fieldname) if name in value: try: res[fieldname] = field.to_python(value.pop(name)) except exceptions.ValidationErrors as ex: self._update_errors_by_exception(errors, ex, name) res.update(value) for fieldname in self._fields: field = self._fields.get(fieldname) if fieldname not in res and field.has_attr('fallback'): res[fieldname] = field.get_attr('fallback') if field.has_attr('setvalue'): if hasattr(field.setvalue, '__func__'): res[fieldname] = field.setvalue(res) else: res[fieldname] = field.setvalue(field, res) self._raise_exception_when_errors(errors, value) return res
[docs] def to_raw(self, value): """Convert the value to a JSON compatible value""" if value is None: return None res = {} src = value.copy() errors = [] for fieldname in list(set(src) & set(self._fields)): field = self._fields.get(fieldname) name = field.get_attr('name', fieldname) if field.get_attr('hidden'): del src[fieldname] continue try: res[name] = field.to_raw(src.pop(fieldname)) except exceptions.ValidationErrors as ex: self._update_errors_by_exception(errors, ex, name) res.update(src) for fieldname in self._fields: field = self._fields.get(fieldname) name = field.get_attr('name', fieldname) if name not in res and field.has_attr('fallback'): res[name] = field.get_attr('fallback') if field.has_attr('getvalue'): if hasattr(field.getvalue, '__func__'): res[name] = field.getvalue(value) else: res[name] = field.getvalue(field, value) if field.is_excluded and name in res: del res[name] self._raise_exception_when_errors(errors, value) return res
def _update_errors_by_exception(self, errors, ex, name): for error in ex.errors: if error['path']: error['path'] = name+'.'+error['path'] errors.append(error) def _raise_exception_when_errors(self, errors, value): if errors: raise exceptions.ValidationErrors( '%s validation error(s) raised' % len(errors), value=value, errors=errors )
[docs]class Date(String): _attrs = {'format': 'date'}
[docs] def to_python(self, value): if value is None: return None if isinstance(value, datetime.date): return value try: return isodate.parse_date(value) except (isodate.ISO8601Error, TypeError): raise exceptions.ValidationError( "Invalid date value '%s'" % value, value=value, invalid='format', against='date' )
[docs] def to_raw(self, value): if value is None: return None if isinstance(value, datetime.date): return isodate.date_isoformat(value) if isinstance(value, six.string_types): self.to_python(value) return value raise exceptions.ValidationError( "Invalid date value '%s' and type %s" % (value, type(value)), value=value, invalid='type', against='date' )
[docs]class Time(String): _attrs = {'format': 'time'}
[docs] def to_python(self, value): if value is None: return None if isinstance(value, datetime.time): return value try: return isodate.parse_time(value) except (isodate.ISO8601Error, TypeError): raise exceptions.ValidationError( "Invalid time value '%s'" % value, value=value, invalid='format', against='time' )
[docs] def to_raw(self, value): if value is None: return None if isinstance(value, datetime.time): return isodate.time_isoformat(value) if isinstance(value, six.string_types): self.to_python(value) return value raise exceptions.ValidationError( "Invalid time value '%s' and type %s" % (value, type(value)), value=value, invalid='type', against='time' )
[docs]class DateTime(String): _attrs = {'format': 'datetime'}
[docs] def to_python(self, value): if value is None: return None if isinstance(value, datetime.datetime): return value try: return formats.parse_datetime(value) except (isodate.ISO8601Error, TypeError): raise exceptions.ValidationError( "Invalid datetime value '%s'" % value, value=value, invalid='format', against='datetime' )
[docs] def to_raw(self, value): if value is None: return None if isinstance(value, datetime.datetime): return isodate.datetime_isoformat(value) if isinstance(value, six.string_types): self.to_python(value) return value raise exceptions.ValidationError( "Invalid datetime value '%s' and type %s" % (value, type(value)), value=value, invalid='type', against='datetime' )
[docs]class Duration(String): _attrs = {'format': 'duration'}
[docs] def to_python(self, value): if value is None: return None if isinstance(value, (int, float)): return datetime.timedelta(seconds=value) if isinstance(value, datetime.timedelta): return value try: return isodate.parse_duration(value) except (isodate.ISO8601Error, TypeError): raise exceptions.ValidationError( "Invalid duration value '%s'" % value, value=value, invalid='format', against='duration' )
[docs] def to_raw(self, value): if value is None: return None if isinstance(value, datetime.timedelta): return isodate.duration_isoformat(value) if isinstance(value, (int, float)): return isodate.duration_isoformat( datetime.timedelta(seconds=value) ) if isinstance(value, six.string_types): self.to_python(value) return value raise exceptions.ValidationError( "Invalid duration value '%s' and type %s" % (value, type(value)), value=value, invalid='type', against='timedelta' )
[docs]class TimeDelta(Number):
[docs] def to_python(self, value): if isinstance(value, (int, float)): return datetime.timedelta(seconds=value) if isinstance(value, datetime.timedelta): return value raise exceptions.ValidationError( "Invalid timedelta value '%s'" % value, value=value, invalid='type', against='timedelta' )
[docs] def to_raw(self, value): if isinstance(value, datetime.timedelta): return value.total_seconds() if isinstance(value, (int, float)): return value raise exceptions.ValidationError( "Invalid timedelta value '%s' and type %s" % (value, type(value)), value=value, invalid='type', against='timedelta' )
[docs]class Password(String):
[docs] def get_jsonschema(self): if 'output' in self.dialect: # make sure the output validation does not fail return {'type': 'string'} schema = super(Password, self).get_jsonschema() return schema
[docs] def to_raw(self, value): minlen = self.get_attr('min_len', 0) maxlen = self.get_attr('max_len', 99) expected_len = max(min(8, maxlen), minlen) return '*' * expected_len
[docs]class Email(String): _attrs = {'format': 'email'}
[docs]class Version(String): _attrs = { 'pattern': r'[0-9]+\.[0-9]+(\.[0-9]+)?', 'required': True, 'logger': 'pyrs.schema.Version', 'fallback': '1.0' } @base.constraint('version', 'Should be same as in schema')
[docs] def validate_version(self, version): major, minor, build = self.split_version(version) emajor, eminor, ebuild = self.split_version(self.expected_version) if major != emajor: raise exceptions.ConstraintError( 'Major version should be same %s!=%s' % ( version, self.expected_version ), against=self.get_attr('version') ) if minor != eminor: self.logger.warning( 'Version mismatch %s!=%s', version, self.expected_version ) elif build != ebuild: self.logger.debug( 'Version mismatch %s!=%s', version, self.expected_version )
@property def expected_version(self): return self.parent._version or self.get_attr('fallback')
[docs] def split_version(self, version): v = tuple(map(int, version.split('.'))) if len(v) == 3: return v return v[0], v[1], 0
[docs] def getvalue(self, value): if self.fieldname in value: assert value[self.fieldname] == self.parent._version return self.parent._version
[docs]class Enum(Any): """JSON generic enum class :param enum: list of possible values :type enum: list """
[docs] def get_jsonschema(self): """Ensure the generic schema, remove `types` :return: Gives back the schema :rtype: dict """ schema = super(Enum, self).get_jsonschema() if self.get_attr('enum'): schema['enum'] = self.get_attr('enum') return schema
[docs]class Ref(Any):
[docs] def get_jsonschema(self): return base.SchemaDict( self, {'$ref': '#/definitions/'+self.get_attr('ref')} )
[docs]class Null(Any): """ Generic null type """
[docs] def get_jsonschema(self): return base.SchemaDict( self, {'type': 'null'} )
[docs]class MultiSchema(base.Schema): _type = None def __init__(self, *_types, **attrs): super(MultiSchema, self).__init__(**attrs) self._types = _types
[docs] def get_jsonschema(self): schemas = [ s.get_jsonschema() for s in self._types ] return base.SchemaDict( self, {self._type: schemas} )
[docs]class AnyOf(MultiSchema): _type = "anyOf"
[docs] def to_python(self, value): last_exception = None for t in self._types: try: return t.to_python(value) except Exception as e: last_exception = e if last_exception: raise last_exception
[docs] def to_raw(self, value): last_exception = None for t in self._types: try: return t.to_raw(value) except Exception as e: last_exception = e if last_exception: raise last_exception
[docs]class Not(MultiSchema): _type = "not"
[docs]class OneOf(AnyOf): _type = "oneOf"
[docs]class AllOf(MultiSchema): _type = "allOf"
[docs] def to_python(self, value): return self._types[0].to_python(value)
[docs] def to_raw(self, value): return self._types[0].to_raw(value)