# -*- coding: utf-8 -*-
"""Main module."""
from __future__ import absolute_import
import json
from datetime import date, datetime
import dateutil
from toolspy import is_int, is_number
from decimal import Decimal
import six
[docs]def instance_of(item, type_):
"""This method is an enhanced version of isinstance
It can handle cases where a number like string is meant
to be treated as a number or where a datetime like string
is meant to be treated as a datetime
"""
if type_ == datetime:
if isinstance(item, datetime):
return True
elif isinstance(item, str) or isinstance(item, six.text_type):
return isinstance(dateutil.parser.parse(item), datetime)
else:
return False
elif type_ == date:
if isinstance(item, date):
return True
elif isinstance(item, str) or isinstance(item, six.text_type):
return isinstance(dateutil.parser.parse(item), datetime)
else:
return False
elif type_ == int:
if isinstance(item, int):
return True
else:
return is_int(item)
elif type_ == float:
return isinstance(item, float) or is_number(item)
elif type_ == Decimal:
return isinstance(item, Decimal) or is_number(item)
else:
return isinstance(item, type_)
[docs]def validate_schema(schema):
conditions = [
isinstance(schema, dict),
]
fields_dict_schema = {
"validators": [],
"allow_unknown_fields": True,
}
schema_of_schema = {
"fields": {
"fields": {
"required": False,
"type": dict,
"dict_schema": fields_dict_schema
},
"validators": {
"type": list,
"list_item_type": callable,
"required": False
},
"polymorphic_on": {
"required": False
}
},
"validators": [],
"wildcard_field_validators": []
}
[docs]def validate_dict(
dictionary, schema, allow_unknown_fields=None,
allow_required_fields_to_be_skipped=None,
polymorphic_identity=None,
context=None, parent_contexts=None,
siblings_list=None, curr_obj_idx_in_siblings_list=None,
schemas_registry=None):
"""The main method to be used to validate a dictionary against a schema
This method accepts the dictionary to be validated as the first argument and validates
it against the schema which is passed as the second argument
Parameters
-----------
dictionary: dict
The dict object which is to be validated
schema: dict
The schema against which the validation has to be done.
allow_unknown_fields: bool
If this is set to True, the validator will allow the dictionary to have fields which are not specified in the schema.
The validation will be done only on those fields whose behavior is specified by the schema. The rest would be ignored.
allow_required_fields_to_be_skipped: bool
Sometimes even if a field is marked as required, it might become necessary to skip that check when
the validator is called in certain contexts (Consider POST vs PUT for example. A field might be
required for the POST request, but might be optional for the PUT request. If you are re-using the
same validation logic for both, it will be convenient if you can allow the PUT request to allow
the required fields to be skipped)
Returns
--------
validation_status: bool
A boolean flag which says if the validation succeeded or failed
validation_errors: dict or str or None
The errors if any, outputted as a string or a dict based on the schema
"""
is_valid = True
errors = None
if not isinstance(dictionary, dict):
return (False, {'TYPE_ERROR': "Object is not a dict"})
if allow_unknown_fields is None:
allow_unknown_fields = schema.get('allow_unknown_fields', False)
if allow_required_fields_to_be_skipped is None:
allow_required_fields_to_be_skipped = schema.get(
'allow_required_fields_to_be_skipped', False)
fields = schema.get("fields")
schema_validators = schema.get("validators", [])
if fields:
polymorphic_field = schema.get('polymorphic_on')
if polymorphic_field:
if polymorphic_identity is None:
polymorphic_identity = dictionary.get(polymorphic_field)
if polymorphic_identity:
additional_schema_for_polymorph = schema.get(
"additional_schema_for_polymorphs", {}).get(polymorphic_identity, {})
for k, v in additional_schema_for_polymorph.get("fields", {}).items():
fields[k] = v
schema_validators += additional_schema_for_polymorph.get(
"validators", [])
if not allow_unknown_fields:
for k in dictionary.keys():
if k not in list(fields.keys()):
if errors is None:
errors = {}
is_valid = False
if 'UNKNOWN_FIELDS' not in errors:
errors['UNKNOWN_FIELDS'] = []
errors['UNKNOWN_FIELDS'].append(k)
for field_name, field_props in fields.items():
field_errors = {}
field_is_valid = True
if field_name not in dictionary:
if allow_required_fields_to_be_skipped:
continue
else:
required = field_props.get('required', False)
if callable(required):
error_message = required.desc or required.__name__
required = required(
dictionary, schema=schema, context=context,
parent_contexts=parent_contexts,
siblings_list=siblings_list,
curr_obj_idx_in_siblings_list=curr_obj_idx_in_siblings_list)
else:
error_message = '%s is a required field' % field_name
if required:
is_valid = False
field_is_valid = False
if errors is None:
errors = {}
if 'MISSING_FIELDS' not in errors:
errors['MISSING_FIELDS'] = []
errors['MISSING_FIELDS'].append(field_name)
field_errors['MISSING_FIELD_ERROR'] = error_message
else:
allowed = field_props.get("allowed", True)
if callable(allowed):
error_message = allowed.desc or allowed.__name__
allowed = allowed(
dictionary, schema=schema, context=context,
parent_contexts=parent_contexts,
siblings_list=siblings_list,
curr_obj_idx_in_siblings_list=curr_obj_idx_in_siblings_list)
else:
error_message = '%s is not an allowed field' % field_name
if allowed == False:
is_valid = False
field_is_valid = False
field_errors['FIELD_NOT_ALLOWED_ERROR'] = error_message
else:
field_type = field_props.get('type')
if field_type is not None:
if type(field_type) == type:
if field_type == dict:
is_mapped_collection = field_props.get(
'is_mapped_collection')
dict_schema = field_props.get('dict_schema')
if is_mapped_collection:
pass
else:
if dict_schema is None:
rel_schema_cls_name = field_props.get(
'is_a_relation_to')
if rel_schema_cls_name and schemas_registry:
dict_schema = schemas_registry.get(
rel_schema_cls_name)
if dict_schema:
if isinstance(parent_contexts, list):
_parent_contexts = parent_contexts[:]
_parent_contexts.append(context)
else:
_parent_contexts = [context]
validation_result, validation_errors = validate_dict(
dictionary[field_name], schema=dict_schema, allow_unknown_fields=allow_unknown_fields,
allow_required_fields_to_be_skipped=allow_required_fields_to_be_skipped,
parent_contexts=_parent_contexts)
if not validation_result:
field_errors['VALIDATION_ERRORS_FOR_OBJECT'] = validation_errors
field_is_valid = field_is_valid and validation_result
is_valid = is_valid and validation_result
elif field_type == list:
list_item_type = field_props.get(
'list_item_type')
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'] = [
]
if dictionary[field_name] is not None:
if type(list_item_type) == type:
if list_item_type == dict:
list_item_schema = field_props.get(
'list_item_schema')
if list_item_schema is None:
rel_schema_cls_name = field_props.get(
'is_a_relation_to')
if rel_schema_cls_name is not None and schemas_registry is not None:
list_item_schema = schemas_registry.get(
rel_schema_cls_name.__name__)
if list_item_schema:
if isinstance(parent_contexts, list):
_parent_contexts = parent_contexts[:]
_parent_contexts.append(
context)
else:
_parent_contexts = [
context]
validation_result, validation_errors = validate_list_of_dicts(
dictionary[field_name], list_item_schema,
allow_unknown_fields=allow_unknown_fields,
allow_required_fields_to_be_skipped=allow_required_fields_to_be_skipped,
parent_contexts=_parent_contexts)
if not validation_result:
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'] = validation_errors
field_is_valid = field_is_valid and validation_result
is_valid = is_valid and validation_result
else:
for item in dictionary[field_name]:
if not instance_of(item, list_item_type):
field_is_valid = False
is_valid = False
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'].append(
{"TYPE_ERROR": "Item should be of type {0}".format(list_item_type.__name__)})
else:
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'].append(
None)
elif type(list_item_type) == tuple:
for item in dictionary[field_name]:
if not any(instance_of(item, t) for t in list_item_type):
field_is_valid = False
is_valid = False
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'].append(
{"TYPE_ERROR": "Item should be of type {0}".format(
"/".join([t.__name__ for t in list_item_type]))})
else:
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'].append(
None)
if 'permitted_values_for_list_items' in field_props:
for idx, item in enumerate(dictionary[field_name]):
if item not in field_props['permitted_values_for_list_items']:
if field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'][idx] is None:
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'][idx] = {
}
field_errors['VALIDATION_ERRORS_FOR_OBJECTS_IN_LIST'][idx]['PERMITTED_VALUES_ERROR'] = "Field data can be one of the following only: {0}".format(
"/".join([str(v) for v in field_props['permitted_values_for_list_items']]))
field_is_valid = False
is_valid = False
elif not instance_of(dictionary[field_name], field_type):
field_errors['TYPE_ERROR'] = "Field data should be of type {0}".format(
field_type.__name__)
field_is_valid = False
is_valid = False
elif type(field_type) == tuple:
if not any(instance_of(dictionary[field_name], t) for t in field_type):
field_errors['TYPE_ERROR'] = "Field data should be of type {0}".format(
"/".join([t.__name__ for t in field_type]))
field_is_valid = False
is_valid = False
if 'permitted_values' in field_props:
if dictionary[field_name] not in field_props['permitted_values']:
field_errors['PERMITTED_VALUES_ERROR'] = "Field data can be one of the following only: {0}".format(
"/".join([v for v in field_props['permitted_values']]))
field_is_valid = False
is_valid = False
for _validator in field_props.get('validators', []):
if _validator is None:
continue
validation_result, validation_errors = _validator(
dictionary[field_name], dictionary, schema=schema, context=context,
parent_contexts=parent_contexts,
siblings_list=siblings_list,
curr_obj_idx_in_siblings_list=curr_obj_idx_in_siblings_list)
if not validation_result:
validator_name = _validator.desc.upper() or _validator.__name__.upper()
validator_name = validator_name.replace(" ", "_")
field_errors[validator_name] = validation_errors
field_is_valid = field_is_valid and validation_result
is_valid = is_valid and validation_result
if not field_is_valid:
if errors is None:
errors = {}
if 'FIELD_LEVEL_ERRORS' not in errors:
errors['FIELD_LEVEL_ERRORS'] = {}
errors['FIELD_LEVEL_ERRORS'][field_name] = field_errors
for schema_validator in schema_validators:
validation_result, validation_errors = schema_validator(
dictionary, schema=schema, context=context, siblings_list=siblings_list,
parent_contexts=parent_contexts,
curr_obj_idx_in_siblings_list=curr_obj_idx_in_siblings_list)
if validation_result is False:
if errors is None:
errors = {}
if 'SCHEMA_LEVEL_ERRORS' not in errors:
errors['SCHEMA_LEVEL_ERRORS'] = []
errors['SCHEMA_LEVEL_ERRORS'].append(validation_errors)
is_valid = False
return (is_valid, errors)
[docs]def validate_list_of_dicts(
list_of_dicts, dict_schema,
allow_unknown_fields=None, allow_required_fields_to_be_skipped=None,
context=None, parent_contexts=None, schemas_registry=None):
is_valid = True
errors = []
if not isinstance(list_of_dicts, list):
return (False, "Expected a list")
for idx, dictionary in enumerate(list_of_dicts):
dictionary_validity, dictionary_errors = validate_dict(
dictionary, dict_schema, allow_unknown_fields=allow_unknown_fields,
allow_required_fields_to_be_skipped=allow_required_fields_to_be_skipped,
context=context, siblings_list=list_of_dicts, parent_contexts=parent_contexts,
schemas_registry=schemas_registry, curr_obj_idx_in_siblings_list=idx)
if dictionary_validity is False:
errors.append(dictionary_errors)
else:
errors.append(None)
is_valid = is_valid and dictionary_validity
return (is_valid, errors)
[docs]def func_and_desc(func, desc):
func.desc = desc
return func
[docs]def json_encoder(obj):
if callable(obj):
if hasattr(obj, 'desc'):
return obj.desc
if obj.__name__ == '<lambda>':
return "Nameless function"
return obj.__name__
else:
try:
return json.JSONEncoder().default(obj)
except:
return six.text_type(obj)
[docs]def schema_to_json(schema):
return json.dumps(
schema,
default=json_encoder)