Source code for dataio.validate

"""Datapackage validate module of the dataio utility."""

import os
from logging import getLogger
from pathlib import Path
from pdb import set_trace

import frictionless
import pandas as pd
import yaml
from pydantic import BaseModel

import dataio.load as load
from dataio.schemas.bonsai_api.base_models import MatrixModel
from dataio.tools import BonsaiBaseModel

from .set_logger import set_logger

logger = getLogger("root")


[docs] def validate_matrix(df: pd.DataFrame, schema: MatrixModel): rows = df.index columns = df.columns schema.row_schema.to_dataclass(rows) schema.column_schema.to_dataclass(columns)
[docs] def validate_table(df: pd.DataFrame, schema: BonsaiBaseModel): if "code" in df.columns: # Check for duplicates in the specified column duplicates = df["code"].duplicated(keep=False) if duplicates.any(): duplicate_values = df["code"][duplicates].unique() raise ValueError( f"Duplicate values found in column 'code': {duplicate_values}" ) schema.to_dataclass(df)
[docs] def validate(full_path: str, overwrite: bool = False, log_name: str = None) -> dict: """Validate datapackage. Validates datapackage with metadata at: <full_path>=<root_path>/<path>/<name>.dataio.yaml Creates <name>.validate.yaml for frictionless validation and outputs dataio-specific validation to the log. Specific fields expected in <name>.dataio.yaml: - name : should match <name> - path : from which <path> is inferred - version Parameters ---------- full_path : str path to dataio.yaml file overwrite : bool whether to overwrite output files log_name : str name of log file, if None no log is set Returns ------- dict frictionless validate report dictionary """ n_errors = 0 logger.info("Started dataio describe") logger.info("Started dataio validate") logger.info("Validate arguments") # validate input arguments for arg, typ in zip( ["full_path", "overwrite", "log_name"], [(str, Path), bool, (str, type(None))] ): if not isinstance(locals()[arg], typ): logger.error( f"argument {arg} is of type {type(locals()[arg])}" f" != {typ}" ) raise TypeError # validate path logger.info("Validate path") full_path = Path(full_path) if full_path.name[-12:] != ".dataio.yaml": logger.error(f"Full path suffix is not '.dataio.yaml': {full_path}") raise FileNotFoundError datapackage_name = full_path.name[:-12] if not os.path.exists(str(full_path)): logger.error( f"Metadata file not accessible at {full_path}\n" f"Current working directory is {os.getcwd()}" ) raise FileNotFoundError # open log file if log_name is not None: set_logger(filename=log_name, path=full_path.parent, overwrite=overwrite) logger.info("Started dataio validate log file") else: logger.info("Not initialized new log file") # read dataio datapackage metadata try: with open(full_path, "r") as f: metadata = yaml.safe_load(f) except FileNotFoundError: logger.error("Could not open dataio datapackage metadata file " f"{full_path}") raise logger.info(f"Dataio datapackage metadata file opened at {full_path}") logger.info("Validating general features") # check dataio datapackage specific fields for key in ["name", "path", "version"]: if key not in metadata.keys(): logger.error(f"Key {key} missing from metadata") n_errors += 1 if not isinstance(metadata[key], str): logger.error( f"metadata['{key}'] is of type " f"{type(metadata[key])} != {str}" ) n_errors += 1 for whitespace in [" ", "\n", "\t"]: if whitespace in metadata["name"]: logger.error(f"Metadata name '{metadata['name']}' has whitespace") n_errors += 1 if metadata["name"] != datapackage_name: logger.error( "Name of metadata file and datapackage name do not " f"match: {metadata['name']} != {datapackage_name}" ) n_errors += 1 # check datapackages parent_datapackages = {} if metadata["path"] == ".": root_path = str(Path(full_path).parent) else: root_path = str(Path(full_path).parent)[: -len(metadata["path"])] logger.info(f"root_path is '{root_path}'") if "datapackages" not in metadata.keys(): logger.info("Key 'datapackages' not found in metadata") else: logger.info("Found key 'datapackages' in metadata") if not isinstance(metadata["datapackages"], list): logger.error( "Type of metadata 'datapackages' is " f"{type(metadata['datapackages'])} and not 'list'" ) n_errors += 1 else: for datapackage in metadata["datapackages"]: for key in ["name", "path", "version"]: if key not in datapackage.keys(): logger.error(f"Key {key} missing from datapackage") n_errors += 1 if not isinstance(datapackage[key], str): logger.error( f"datapackage['{key}'] is of type " f"{type(datapackage[key])} != {str}" ) n_errors += 1 for whitespace in [" ", "\n", "\t"]: if whitespace in datapackage["name"]: logger.error( f"Datapackage '{datapackage['name']}' has whitespace" ) n_errors += 1 if datapackage["name"] == datapackage_name: logger.error( "Name of metadata file and external datapackage " f"match: {datapackage_name}" ) n_errors += 1 # load datapackage parent_path = ( Path(root_path) .joinpath(datapackage["path"]) .joinpath(f"{datapackage['name']}.dataio.yaml") ) logger.info( "Loading metadata of datapackage " f"'{datapackage['name']}' at " f"path '{parent_path}'" ) parent_datapackages[datapackage["name"]] = load( full_path=parent_path, include_tables=False ) # check version parent_version = parent_datapackages[datapackage["name"]].__metadata__[ "version" ] if datapackage["version"] != parent_version: logger.error( "Version of expected and external datapackage " "versions do not match:\n" f"name: {datapackage['name']}\n" f"expected version: {datapackage['version']}\n" f"found version: {parent_version}" ) n_errors += 1 if "tables" not in metadata.keys(): logger.error("Key 'tables' missing from metadata") n_errors += 1 if not isinstance(metadata["tables"], list): logger.error( f"metadata['tables'] is of type " f"{type(metadata['tables'])} != {list}" ) n_errors += 1 # check tables logger.info("General features validated") for position, resource in enumerate(metadata["tables"]): logger.info(f"Validating specific features of table {position}") for key in ["name", "path", "tabletype"]: if key not in resource.keys(): logger.error(f"Key '{key}' missing from table {position}") n_errors += 1 if not isinstance(resource[key], str): logger.error( f"Key {key} is of type {type(resource[key])} != " f"{str} in table {position}" ) n_errors += 1 logger.info( f"Name = '{resource['name']}' , path = " f"'{resource['path']}', tabletype = " f"'{resource['tabletype']}'" ) # name for whitespace in [" ", "\n", "\t"]: if whitespace in resource["name"]: logger.error(f"Metadata name '{resource['name']}' has whitespace") n_errors += 1 # tabletype tabletype_list = ["fact", "dimension", "tree", "concordance", "other"] if resource["tabletype"] not in tabletype_list: logger.error( f"Field 'tabletype' of resource {position} with " f"name {resource['name']} is " f"{resource['tabletype']}, which is not is " f"{tabletype_list}" ) n_errors += 1 logger.info(f"Specific features of resource {position} validated") # foreign keys resource_names = [resource["name"] for resource in metadata["tables"]] resource_types = [resource["tabletype"] for resource in metadata["tables"]] for resource in metadata["tables"]: logger.info("Validating foreign keys of resource " f"{resource['name']}") schema, n_errors = validate_schema(resource, n_errors) field_names = [field["name"] for field in schema["fields"]] field_types = [field["type"] for field in schema["fields"]] if "primaryKeys" not in schema.keys(): logger.warning("Primary key not found, no checks performed") else: primary_key = schema["primaryKeys"][0] if len(field_names) != len(set(field_names)): logger.error(f"Repeated field names: {field_names}") n_errors += 1 if primary_key not in field_names: logger.error( f"Unrecognized primary key {primary_key}, not in " f"{field_names}" ) n_errors += 1 if field_types[field_names.index(primary_key)] not in ["string"]: logger.error(f"Primary key {primary_key} type is not in " "['string']") n_errors += 1 if "foreignKeys" not in schema.keys(): logger.warning("Foreign keys not found, no checks performed") else: foreign_key_children = [ field["fields"][0] for field in schema["foreignKeys"] ] foreign_key_parents = [ field["reference"]["fields"][0] for field in schema["foreignKeys"] ] foreign_key_tables = [ ( field["reference"]["table"] if "table" in field["reference"].keys() else resource["name"] ) for field in schema["foreignKeys"] ] foreign_key_datapackages = [ ( field["reference"]["datapackage"] if "datapackage" in field["reference"].keys() else datapackage_name ) for field in schema["foreignKeys"] ] if len(foreign_key_children) != len(set(foreign_key_children)): logger.error("Repeated foreign key fields: " f"{foreign_key_children}") n_errors += 1 resource["schema"] = schema # generic constraints if resource["tabletype"] != "other": if "id" not in field_names: logger.error("Field 'id' expected if tabletype != 'other'") n_errors += 1 for field_name in field_names: if field_name not in [ "id", "value", "name", "description", "comment", "position", ]: if field_name not in foreign_key_children: logger.warning( f"Field '{field_name}' should have a " "foreign key relation" ) if "foreignKeys" in schema.keys(): for position, table in enumerate(foreign_key_tables): if (table not in resource_names) and ( foreign_key_datapackages[position] == datapackage_name ): logger.warning( f"Table {table} reference in foreign key " f"{position} does not exist in datapackage" ) if foreign_key_parents[position] != "id": logger.error( f"Parent field referenced in foreign key " f"{position} is " f"{foreign_key_parents[position]} != 'id'" ) n_errors += 1 # tabletype specific constraints if resource["tabletype"] == "fact": if "value" not in field_names: logger.error("Field 'value' expected if tabletype = 'fact'") n_errors += 1 else: if field_types[field_names.index("value")] != "number": logger.error( "Field 'value' expected to have type 'number' instead of " f"{field_types[field_names.index('value')]}" ) n_errors += 1 elif resource["tabletype"] == "dimension": if "position" not in field_names: logger.error("Field 'position' expected if tabletype = " "'dimension'") n_errors += 1 else: if field_types[field_names.index("position")] != "integer": logger.error( "Field 'position' expected to have type 'integer' instead " f"of {field_types[field_names.index('position')]}" ) n_errors += 1 elif resource["tabletype"] == "tree": if "parent_id" not in field_names: logger.error("Field 'parent_id' expected if tabletype = " "'tree'") n_errors += 1 else: if ( foreign_key_tables[foreign_key_children.index("parent_id")] != resource["name"] ): logger.error("Field 'parent_id' expected if tabletype = " "'tree'") n_errors += 1 elif resource["tabletype"] == "concordance": if len(foreign_key_tables) != 2: logger.error( "Two foreign keys expected if tabletype = " "'concordance'" ) n_errors += 1 logger.info("DataIO table constraints validation finished") if n_errors > 0: logger.error(f"{n_errors} errors found, validation aborted") raise TypeError logger.info("DataIO metadata validated, no errors found") # check dialect for table in metadata["tables"]: if "dialect" in table.keys(): logger.info(f"Table {table['name']} has 'dialect' field") if "csv" in table["dialect"].keys(): logger.info(f"Table {table['name']} 'dialect' has 'csv' field") dialect = table["dialect"]["csv"] if "delimiter" in dialect.keys(): logger.warning( f"Table {table['name']} field ['dialect']['csv']" f"['delimiter'] = {dialect['delimiter']}" ) if "quoteChar" in dialect.keys(): logger.warning( f"Table {table['name']} field ['dialect']['csv']" f"['quoteChar'] = {dialect['quoteChar']}" ) if "skipInitialSpace" in dialect.keys(): logger.warning( f"Table {table['name']} field ['dialect']['csv']" f"['skipInitialSpace'] = {dialect['skipInitialSpace']}" ) if dialect["skipInitialSpace"]: logger.warning( "This might raise problems loading if there are " "delimiters inside quoted strings" ) # shift path to tables for tab_pos, table in enumerate(metadata["tables"]): if metadata["path"] != ".": table["path"] = str(Path(metadata["path"]).joinpath(table["path"])) metadata["tables"][tab_pos] = table # fix refs to tables from other datapackages to_add = {} for tab_pos, table in enumerate(metadata["tables"]): schema = table["schema"] if "foreignKeys" in schema.keys(): for pos, foreign_key in enumerate(schema["foreignKeys"]): if "datapackage" in foreign_key["reference"].keys(): parent_datapackage = foreign_key["reference"]["datapackage"] parent_table = foreign_key["reference"]["table"] logger.info( "Found foreign key to datapackage " f"{parent_datapackage} and table {parent_table}" ) if parent_datapackage not in to_add.keys(): to_add[parent_datapackage] = [] if parent_table not in to_add[parent_datapackage]: to_add[parent_datapackage].append(parent_table) del schema["foreignKeys"][pos]["reference"]["datapackage"] schema["foreignKeys"][pos]["reference"][ "table" ] = f"{parent_datapackage}_{parent_table}" table["schema"] = schema metadata["tables"][tab_pos] = table # add tables from other datapackages for parent_datapackage, parent_tables in to_add.items(): table_positions = {} tables = parent_datapackages[parent_datapackage].__metadata__["tables"] for pos, table in enumerate(tables): table_positions[table["name"]] = pos for parent_tablename in parent_tables: tmp = tables[table_positions[parent_tablename]] tmp["name"] = f"{parent_datapackage}_{parent_tablename}" tmp["path"] = str( Path( parent_datapackages[parent_datapackage].__metadata__["path"] ).joinpath(tmp["path"]) ) del tmp["schema"]["foreignKeys"] metadata["tables"].append(tmp) # rename table to resource metadata["resources"] = metadata["tables"] del metadata["tables"] for res_pos, resource in enumerate(metadata["resources"]): if "foreignKeys" in resource["schema"].keys(): for fk_pos, foreignkey in enumerate(resource["schema"]["foreignKeys"]): if "table" in foreignkey["reference"].keys(): foreignkey["reference"]["resource"] = foreignkey["reference"][ "table" ] del foreignkey["reference"]["table"] resource["schema"]["foreignKeys"][fk_pos] = foreignkey metadata["resources"][res_pos] = resource # apply frictionless logger.info("Starting frictionless validate") frictionless_yaml = str( Path(root_path).joinpath(f"{datapackage_name}.datapackage.yaml") ) if not overwrite: if os.path.isfile(frictionless_yaml): logger.error( f"File {frictionless_yaml} already exists and " "'overwrite' option is {overwrite}" ) raise FileExistsError # set_trace() with open(frictionless_yaml, "w") as f: yaml.safe_dump(metadata, f) report = frictionless.validate(frictionless_yaml) report_yaml = Path(full_path).with_stem(f"{datapackage_name}.validate") if not overwrite: if os.path.isfile(report_yaml): logger.error( f"File {report_yaml} already exists and " "'overwrite' option is {overwrite}" ) raise FileExistsError with open(report_yaml, "w") as f: yaml.safe_dump(report.to_dict(), f) os.remove(frictionless_yaml) if report.stats["errors"] > 0: frictionless_errors = report.stats["errors"] logger.error( f"{frictionless_errors} errors related to frictionless found, validation aborted" ) raise TypeError logger.info("Finished frictionless validate") return report.to_dict()
[docs] def validate_schema(resource, n_errors): """Check if schema, fields, primary and foreign keys exist.""" # schema if "schema" not in resource.keys(): logger.error("Key 'schema' missing from table keys") n_errors += 1 schema = resource["schema"] if not isinstance(schema, dict): logger.error(f"Schema is of type {type(schema)} != " f"{dict}") n_errors += 1 for key in ["fields", "foreignKeys", "primaryKeys"]: if key not in schema.keys(): if key == "foreignKeys": logger.warning(f"Key '{key}' missing from table['schema'] keys") elif key == "primaryKeys": logger.warning(f"Key '{key}' missing from table['schema'] keys") else: logger.error(f"Key '{key}' missing from table['schema'] keys") n_errors += 1 elif not isinstance(schema[key], list): logger.error(f"Key {key} is of type {type(schema[key])} != " f"{list}") n_errors += 1 # fields for pos, field in enumerate(schema["fields"]): for key in ["name", "type"]: if key not in field.keys(): logger.error(f"Key '{key}' missing from {pos} field") n_errors += 1 elif not isinstance(field[key], str): logger.error( f"Key {key} in {pos} field is of type " "{type(field[key])} != {str}" ) n_errors += 1 if "name" in field.keys(): for whitespace in [" ", "\n", "\t"]: if whitespace in field["name"]: logger.error(f"Field {pos} 'name' has whitespace") n_errors += 1 if "type" in field.keys(): # needs to be aligned with create!!!! fric_list = ["number", "integer", "boolean", "string", "datetime"] if field["type"] == "any": logger.warning( f"Found 'any' type in in field {pos}. " "Changed to 'str'" ) field["type"] = "string" schema["fields"][pos] = field if field["type"] not in fric_list: logger.error( f"Invalid type in field {pos}. Accepted types are " f"{fric_list}." ) n_errors += 1 # primary keys if "primaryKeys" in schema.keys(): if len(schema["primaryKeys"]) > 1: logger.error(f"Multiple primary keys: {schema['primaryKeys']}") n_errors += 1 # foreign keys if "foreignKeys" in schema.keys(): for position, foreign_key in enumerate(schema["foreignKeys"]): for key, typ in zip(["fields", "reference"], [list, dict]): if key not in foreign_key.keys(): logger.error( f"Field '{key}' missing from foreign key " "{position}" ) n_errors += 1 if not isinstance(foreign_key[key], typ): logger.error( f"Field '{key}' is of type " f"{type(foreign_key[key])} != {typ}" ) n_errors += 1 if len(foreign_key["fields"]) != 1: logger.error( f"Multiple local fields in foreign key {position}:" f" {foreign_key['fields']}" ) n_errors += 1 if "fields" not in foreign_key["reference"].keys(): logger.error( "Field 'fields' missing from foreign key " f"{position} reference" ) n_errors += 1 if not isinstance(foreign_key["reference"]["fields"], list): logger.error( f"Field 'fields' om foreign key reference is of " f"type {type(foreign_key['reference']['fields'])} " "!= {list}" ) n_errors += 1 if len(foreign_key["reference"]["fields"]) != 1: logger.error( f"Multiple local fields in foreign key {position}:" f" {foreign_key['reference']['fields']}" ) n_errors += 1 # check and remove entity-relation diagram keys directions = ["forward", "back"] styles = ["solid", "invis"] if "foreignKeys" in schema.keys(): for position, foreign_key in enumerate(schema["foreignKeys"]): reference = foreign_key["reference"] # direction if "direction" in reference.keys(): if not isinstance(reference["direction"], str): logger.error( f"Type of 'direction' is {type(reference['direction'])} is " f"not {str} in foreign key {position}" ) n_errors += 1 if reference["direction"] not in directions: logger.error( f"Value of direction is {reference['direction']} and should " f"be in {directions}" ) n_errors += 1 logger.info( "Found entity relation diagram attribute 'direction' " f"at position {position}. Removed for the purpose of " "frictionless validate" ) del schema["foreignKeys"][position]["reference"]["direction"] # style if "style" in reference.keys(): if not isinstance(reference["style"], str): logger.error( f"Type of 'style' is {type(reference['style'])} is not" f" {str} in foreign key {position}" ) n_errors += 1 if reference["style"] not in styles: logger.error( f"Value of 'style' is {reference['style']} and should " f"be in {styles}" ) n_errors += 1 logger.info( "Found entity relation diagram attribute 'style' " f"at position {position}. Removed for the purpose of " "frictionless validate" ) del schema["foreignKeys"][position]["reference"]["style"] return schema, n_errors