"""Datapackage validate module of the dataio utility."""
import os
from logging import getLogger
from pathlib import Path
from pdb import set_trace
import frictionless
import pandas as pd
import yaml
from pydantic import BaseModel
import dataio.load as load
from dataio.schemas.bonsai_api.base_models import MatrixModel
from dataio.tools import BonsaiBaseModel
from .set_logger import set_logger
logger = getLogger("root")
[docs]
def validate_matrix(df: pd.DataFrame, schema: MatrixModel):
rows = df.index
columns = df.columns
schema.row_schema.to_dataclass(rows)
schema.column_schema.to_dataclass(columns)
[docs]
def validate_table(df: pd.DataFrame, schema: BonsaiBaseModel):
if "code" in df.columns:
# Check for duplicates in the specified column
duplicates = df["code"].duplicated(keep=False)
if duplicates.any():
duplicate_values = df["code"][duplicates].unique()
raise ValueError(
f"Duplicate values found in column 'code': {duplicate_values}"
)
schema.to_dataclass(df)
[docs]
def validate(full_path: str, overwrite: bool = False, log_name: str = None) -> dict:
"""Validate datapackage.
Validates datapackage with metadata at:
<full_path>=<root_path>/<path>/<name>.dataio.yaml
Creates <name>.validate.yaml for frictionless validation
and outputs dataio-specific validation to the log.
Specific fields expected in <name>.dataio.yaml:
- name : should match <name>
- path : from which <path> is inferred
- version
Parameters
----------
full_path : str
path to dataio.yaml file
overwrite : bool
whether to overwrite output files
log_name : str
name of log file, if None no log is set
Returns
-------
dict
frictionless validate report dictionary
"""
n_errors = 0
logger.info("Started dataio describe")
logger.info("Started dataio validate")
logger.info("Validate arguments")
# validate input arguments
for arg, typ in zip(
["full_path", "overwrite", "log_name"], [(str, Path), bool, (str, type(None))]
):
if not isinstance(locals()[arg], typ):
logger.error(
f"argument {arg} is of type {type(locals()[arg])}" f" != {typ}"
)
raise TypeError
# validate path
logger.info("Validate path")
full_path = Path(full_path)
if full_path.name[-12:] != ".dataio.yaml":
logger.error(f"Full path suffix is not '.dataio.yaml': {full_path}")
raise FileNotFoundError
datapackage_name = full_path.name[:-12]
if not os.path.exists(str(full_path)):
logger.error(
f"Metadata file not accessible at {full_path}\n"
f"Current working directory is {os.getcwd()}"
)
raise FileNotFoundError
# open log file
if log_name is not None:
set_logger(filename=log_name, path=full_path.parent, overwrite=overwrite)
logger.info("Started dataio validate log file")
else:
logger.info("Not initialized new log file")
# read dataio datapackage metadata
try:
with open(full_path, "r") as f:
metadata = yaml.safe_load(f)
except FileNotFoundError:
logger.error("Could not open dataio datapackage metadata file " f"{full_path}")
raise
logger.info(f"Dataio datapackage metadata file opened at {full_path}")
logger.info("Validating general features")
# check dataio datapackage specific fields
for key in ["name", "path", "version"]:
if key not in metadata.keys():
logger.error(f"Key {key} missing from metadata")
n_errors += 1
if not isinstance(metadata[key], str):
logger.error(
f"metadata['{key}'] is of type " f"{type(metadata[key])} != {str}"
)
n_errors += 1
for whitespace in [" ", "\n", "\t"]:
if whitespace in metadata["name"]:
logger.error(f"Metadata name '{metadata['name']}' has whitespace")
n_errors += 1
if metadata["name"] != datapackage_name:
logger.error(
"Name of metadata file and datapackage name do not "
f"match: {metadata['name']} != {datapackage_name}"
)
n_errors += 1
# check datapackages
parent_datapackages = {}
if metadata["path"] == ".":
root_path = str(Path(full_path).parent)
else:
root_path = str(Path(full_path).parent)[: -len(metadata["path"])]
logger.info(f"root_path is '{root_path}'")
if "datapackages" not in metadata.keys():
logger.info("Key 'datapackages' not found in metadata")
else:
logger.info("Found key 'datapackages' in metadata")
if not isinstance(metadata["datapackages"], list):
logger.error(
"Type of metadata 'datapackages' is "
f"{type(metadata['datapackages'])} and not 'list'"
)
n_errors += 1
else:
for datapackage in metadata["datapackages"]:
for key in ["name", "path", "version"]:
if key not in datapackage.keys():
logger.error(f"Key {key} missing from datapackage")
n_errors += 1
if not isinstance(datapackage[key], str):
logger.error(
f"datapackage['{key}'] is of type "
f"{type(datapackage[key])} != {str}"
)
n_errors += 1
for whitespace in [" ", "\n", "\t"]:
if whitespace in datapackage["name"]:
logger.error(
f"Datapackage '{datapackage['name']}' has whitespace"
)
n_errors += 1
if datapackage["name"] == datapackage_name:
logger.error(
"Name of metadata file and external datapackage "
f"match: {datapackage_name}"
)
n_errors += 1
# load datapackage
parent_path = (
Path(root_path)
.joinpath(datapackage["path"])
.joinpath(f"{datapackage['name']}.dataio.yaml")
)
logger.info(
"Loading metadata of datapackage "
f"'{datapackage['name']}' at "
f"path '{parent_path}'"
)
parent_datapackages[datapackage["name"]] = load(
full_path=parent_path, include_tables=False
)
# check version
parent_version = parent_datapackages[datapackage["name"]].__metadata__[
"version"
]
if datapackage["version"] != parent_version:
logger.error(
"Version of expected and external datapackage "
"versions do not match:\n"
f"name: {datapackage['name']}\n"
f"expected version: {datapackage['version']}\n"
f"found version: {parent_version}"
)
n_errors += 1
if "tables" not in metadata.keys():
logger.error("Key 'tables' missing from metadata")
n_errors += 1
if not isinstance(metadata["tables"], list):
logger.error(
f"metadata['tables'] is of type " f"{type(metadata['tables'])} != {list}"
)
n_errors += 1
# check tables
logger.info("General features validated")
for position, resource in enumerate(metadata["tables"]):
logger.info(f"Validating specific features of table {position}")
for key in ["name", "path", "tabletype"]:
if key not in resource.keys():
logger.error(f"Key '{key}' missing from table {position}")
n_errors += 1
if not isinstance(resource[key], str):
logger.error(
f"Key {key} is of type {type(resource[key])} != "
f"{str} in table {position}"
)
n_errors += 1
logger.info(
f"Name = '{resource['name']}' , path = "
f"'{resource['path']}', tabletype = "
f"'{resource['tabletype']}'"
)
# name
for whitespace in [" ", "\n", "\t"]:
if whitespace in resource["name"]:
logger.error(f"Metadata name '{resource['name']}' has whitespace")
n_errors += 1
# tabletype
tabletype_list = ["fact", "dimension", "tree", "concordance", "other"]
if resource["tabletype"] not in tabletype_list:
logger.error(
f"Field 'tabletype' of resource {position} with "
f"name {resource['name']} is "
f"{resource['tabletype']}, which is not is "
f"{tabletype_list}"
)
n_errors += 1
logger.info(f"Specific features of resource {position} validated")
# foreign keys
resource_names = [resource["name"] for resource in metadata["tables"]]
resource_types = [resource["tabletype"] for resource in metadata["tables"]]
for resource in metadata["tables"]:
logger.info("Validating foreign keys of resource " f"{resource['name']}")
schema, n_errors = validate_schema(resource, n_errors)
field_names = [field["name"] for field in schema["fields"]]
field_types = [field["type"] for field in schema["fields"]]
if "primaryKeys" not in schema.keys():
logger.warning("Primary key not found, no checks performed")
else:
primary_key = schema["primaryKeys"][0]
if len(field_names) != len(set(field_names)):
logger.error(f"Repeated field names: {field_names}")
n_errors += 1
if primary_key not in field_names:
logger.error(
f"Unrecognized primary key {primary_key}, not in " f"{field_names}"
)
n_errors += 1
if field_types[field_names.index(primary_key)] not in ["string"]:
logger.error(f"Primary key {primary_key} type is not in " "['string']")
n_errors += 1
if "foreignKeys" not in schema.keys():
logger.warning("Foreign keys not found, no checks performed")
else:
foreign_key_children = [
field["fields"][0] for field in schema["foreignKeys"]
]
foreign_key_parents = [
field["reference"]["fields"][0] for field in schema["foreignKeys"]
]
foreign_key_tables = [
(
field["reference"]["table"]
if "table" in field["reference"].keys()
else resource["name"]
)
for field in schema["foreignKeys"]
]
foreign_key_datapackages = [
(
field["reference"]["datapackage"]
if "datapackage" in field["reference"].keys()
else datapackage_name
)
for field in schema["foreignKeys"]
]
if len(foreign_key_children) != len(set(foreign_key_children)):
logger.error("Repeated foreign key fields: " f"{foreign_key_children}")
n_errors += 1
resource["schema"] = schema
# generic constraints
if resource["tabletype"] != "other":
if "id" not in field_names:
logger.error("Field 'id' expected if tabletype != 'other'")
n_errors += 1
for field_name in field_names:
if field_name not in [
"id",
"value",
"name",
"description",
"comment",
"position",
]:
if field_name not in foreign_key_children:
logger.warning(
f"Field '{field_name}' should have a "
"foreign key relation"
)
if "foreignKeys" in schema.keys():
for position, table in enumerate(foreign_key_tables):
if (table not in resource_names) and (
foreign_key_datapackages[position] == datapackage_name
):
logger.warning(
f"Table {table} reference in foreign key "
f"{position} does not exist in datapackage"
)
if foreign_key_parents[position] != "id":
logger.error(
f"Parent field referenced in foreign key "
f"{position} is "
f"{foreign_key_parents[position]} != 'id'"
)
n_errors += 1
# tabletype specific constraints
if resource["tabletype"] == "fact":
if "value" not in field_names:
logger.error("Field 'value' expected if tabletype = 'fact'")
n_errors += 1
else:
if field_types[field_names.index("value")] != "number":
logger.error(
"Field 'value' expected to have type 'number' instead of "
f"{field_types[field_names.index('value')]}"
)
n_errors += 1
elif resource["tabletype"] == "dimension":
if "position" not in field_names:
logger.error("Field 'position' expected if tabletype = " "'dimension'")
n_errors += 1
else:
if field_types[field_names.index("position")] != "integer":
logger.error(
"Field 'position' expected to have type 'integer' instead "
f"of {field_types[field_names.index('position')]}"
)
n_errors += 1
elif resource["tabletype"] == "tree":
if "parent_id" not in field_names:
logger.error("Field 'parent_id' expected if tabletype = " "'tree'")
n_errors += 1
else:
if (
foreign_key_tables[foreign_key_children.index("parent_id")]
!= resource["name"]
):
logger.error("Field 'parent_id' expected if tabletype = " "'tree'")
n_errors += 1
elif resource["tabletype"] == "concordance":
if len(foreign_key_tables) != 2:
logger.error(
"Two foreign keys expected if tabletype = " "'concordance'"
)
n_errors += 1
logger.info("DataIO table constraints validation finished")
if n_errors > 0:
logger.error(f"{n_errors} errors found, validation aborted")
raise TypeError
logger.info("DataIO metadata validated, no errors found")
# check dialect
for table in metadata["tables"]:
if "dialect" in table.keys():
logger.info(f"Table {table['name']} has 'dialect' field")
if "csv" in table["dialect"].keys():
logger.info(f"Table {table['name']} 'dialect' has 'csv' field")
dialect = table["dialect"]["csv"]
if "delimiter" in dialect.keys():
logger.warning(
f"Table {table['name']} field ['dialect']['csv']"
f"['delimiter'] = {dialect['delimiter']}"
)
if "quoteChar" in dialect.keys():
logger.warning(
f"Table {table['name']} field ['dialect']['csv']"
f"['quoteChar'] = {dialect['quoteChar']}"
)
if "skipInitialSpace" in dialect.keys():
logger.warning(
f"Table {table['name']} field ['dialect']['csv']"
f"['skipInitialSpace'] = {dialect['skipInitialSpace']}"
)
if dialect["skipInitialSpace"]:
logger.warning(
"This might raise problems loading if there are "
"delimiters inside quoted strings"
)
# shift path to tables
for tab_pos, table in enumerate(metadata["tables"]):
if metadata["path"] != ".":
table["path"] = str(Path(metadata["path"]).joinpath(table["path"]))
metadata["tables"][tab_pos] = table
# fix refs to tables from other datapackages
to_add = {}
for tab_pos, table in enumerate(metadata["tables"]):
schema = table["schema"]
if "foreignKeys" in schema.keys():
for pos, foreign_key in enumerate(schema["foreignKeys"]):
if "datapackage" in foreign_key["reference"].keys():
parent_datapackage = foreign_key["reference"]["datapackage"]
parent_table = foreign_key["reference"]["table"]
logger.info(
"Found foreign key to datapackage "
f"{parent_datapackage} and table {parent_table}"
)
if parent_datapackage not in to_add.keys():
to_add[parent_datapackage] = []
if parent_table not in to_add[parent_datapackage]:
to_add[parent_datapackage].append(parent_table)
del schema["foreignKeys"][pos]["reference"]["datapackage"]
schema["foreignKeys"][pos]["reference"][
"table"
] = f"{parent_datapackage}_{parent_table}"
table["schema"] = schema
metadata["tables"][tab_pos] = table
# add tables from other datapackages
for parent_datapackage, parent_tables in to_add.items():
table_positions = {}
tables = parent_datapackages[parent_datapackage].__metadata__["tables"]
for pos, table in enumerate(tables):
table_positions[table["name"]] = pos
for parent_tablename in parent_tables:
tmp = tables[table_positions[parent_tablename]]
tmp["name"] = f"{parent_datapackage}_{parent_tablename}"
tmp["path"] = str(
Path(
parent_datapackages[parent_datapackage].__metadata__["path"]
).joinpath(tmp["path"])
)
del tmp["schema"]["foreignKeys"]
metadata["tables"].append(tmp)
# rename table to resource
metadata["resources"] = metadata["tables"]
del metadata["tables"]
for res_pos, resource in enumerate(metadata["resources"]):
if "foreignKeys" in resource["schema"].keys():
for fk_pos, foreignkey in enumerate(resource["schema"]["foreignKeys"]):
if "table" in foreignkey["reference"].keys():
foreignkey["reference"]["resource"] = foreignkey["reference"][
"table"
]
del foreignkey["reference"]["table"]
resource["schema"]["foreignKeys"][fk_pos] = foreignkey
metadata["resources"][res_pos] = resource
# apply frictionless
logger.info("Starting frictionless validate")
frictionless_yaml = str(
Path(root_path).joinpath(f"{datapackage_name}.datapackage.yaml")
)
if not overwrite:
if os.path.isfile(frictionless_yaml):
logger.error(
f"File {frictionless_yaml} already exists and "
"'overwrite' option is {overwrite}"
)
raise FileExistsError
# set_trace()
with open(frictionless_yaml, "w") as f:
yaml.safe_dump(metadata, f)
report = frictionless.validate(frictionless_yaml)
report_yaml = Path(full_path).with_stem(f"{datapackage_name}.validate")
if not overwrite:
if os.path.isfile(report_yaml):
logger.error(
f"File {report_yaml} already exists and "
"'overwrite' option is {overwrite}"
)
raise FileExistsError
with open(report_yaml, "w") as f:
yaml.safe_dump(report.to_dict(), f)
os.remove(frictionless_yaml)
if report.stats["errors"] > 0:
frictionless_errors = report.stats["errors"]
logger.error(
f"{frictionless_errors} errors related to frictionless found, validation aborted"
)
raise TypeError
logger.info("Finished frictionless validate")
return report.to_dict()
[docs]
def validate_schema(resource, n_errors):
"""Check if schema, fields, primary and foreign keys exist."""
# schema
if "schema" not in resource.keys():
logger.error("Key 'schema' missing from table keys")
n_errors += 1
schema = resource["schema"]
if not isinstance(schema, dict):
logger.error(f"Schema is of type {type(schema)} != " f"{dict}")
n_errors += 1
for key in ["fields", "foreignKeys", "primaryKeys"]:
if key not in schema.keys():
if key == "foreignKeys":
logger.warning(f"Key '{key}' missing from table['schema'] keys")
elif key == "primaryKeys":
logger.warning(f"Key '{key}' missing from table['schema'] keys")
else:
logger.error(f"Key '{key}' missing from table['schema'] keys")
n_errors += 1
elif not isinstance(schema[key], list):
logger.error(f"Key {key} is of type {type(schema[key])} != " f"{list}")
n_errors += 1
# fields
for pos, field in enumerate(schema["fields"]):
for key in ["name", "type"]:
if key not in field.keys():
logger.error(f"Key '{key}' missing from {pos} field")
n_errors += 1
elif not isinstance(field[key], str):
logger.error(
f"Key {key} in {pos} field is of type "
"{type(field[key])} != {str}"
)
n_errors += 1
if "name" in field.keys():
for whitespace in [" ", "\n", "\t"]:
if whitespace in field["name"]:
logger.error(f"Field {pos} 'name' has whitespace")
n_errors += 1
if "type" in field.keys():
# needs to be aligned with create!!!!
fric_list = ["number", "integer", "boolean", "string", "datetime"]
if field["type"] == "any":
logger.warning(
f"Found 'any' type in in field {pos}. " "Changed to 'str'"
)
field["type"] = "string"
schema["fields"][pos] = field
if field["type"] not in fric_list:
logger.error(
f"Invalid type in field {pos}. Accepted types are " f"{fric_list}."
)
n_errors += 1
# primary keys
if "primaryKeys" in schema.keys():
if len(schema["primaryKeys"]) > 1:
logger.error(f"Multiple primary keys: {schema['primaryKeys']}")
n_errors += 1
# foreign keys
if "foreignKeys" in schema.keys():
for position, foreign_key in enumerate(schema["foreignKeys"]):
for key, typ in zip(["fields", "reference"], [list, dict]):
if key not in foreign_key.keys():
logger.error(
f"Field '{key}' missing from foreign key " "{position}"
)
n_errors += 1
if not isinstance(foreign_key[key], typ):
logger.error(
f"Field '{key}' is of type "
f"{type(foreign_key[key])} != {typ}"
)
n_errors += 1
if len(foreign_key["fields"]) != 1:
logger.error(
f"Multiple local fields in foreign key {position}:"
f" {foreign_key['fields']}"
)
n_errors += 1
if "fields" not in foreign_key["reference"].keys():
logger.error(
"Field 'fields' missing from foreign key " f"{position} reference"
)
n_errors += 1
if not isinstance(foreign_key["reference"]["fields"], list):
logger.error(
f"Field 'fields' om foreign key reference is of "
f"type {type(foreign_key['reference']['fields'])} "
"!= {list}"
)
n_errors += 1
if len(foreign_key["reference"]["fields"]) != 1:
logger.error(
f"Multiple local fields in foreign key {position}:"
f" {foreign_key['reference']['fields']}"
)
n_errors += 1
# check and remove entity-relation diagram keys
directions = ["forward", "back"]
styles = ["solid", "invis"]
if "foreignKeys" in schema.keys():
for position, foreign_key in enumerate(schema["foreignKeys"]):
reference = foreign_key["reference"]
# direction
if "direction" in reference.keys():
if not isinstance(reference["direction"], str):
logger.error(
f"Type of 'direction' is {type(reference['direction'])} is "
f"not {str} in foreign key {position}"
)
n_errors += 1
if reference["direction"] not in directions:
logger.error(
f"Value of direction is {reference['direction']} and should "
f"be in {directions}"
)
n_errors += 1
logger.info(
"Found entity relation diagram attribute 'direction' "
f"at position {position}. Removed for the purpose of "
"frictionless validate"
)
del schema["foreignKeys"][position]["reference"]["direction"]
# style
if "style" in reference.keys():
if not isinstance(reference["style"], str):
logger.error(
f"Type of 'style' is {type(reference['style'])} is not"
f" {str} in foreign key {position}"
)
n_errors += 1
if reference["style"] not in styles:
logger.error(
f"Value of 'style' is {reference['style']} and should "
f"be in {styles}"
)
n_errors += 1
logger.info(
"Found entity relation diagram attribute 'style' "
f"at position {position}. Removed for the purpose of "
"frictionless validate"
)
del schema["foreignKeys"][position]["reference"]["style"]
return schema, n_errors