dataio package

Subpackages

Submodules

dataio.arg_parser module

dataio.arg_parser.arg_parser(package: object, main: object, argv: list)[source]

Argument parser of main task executable.

Parameters:
  • package (obj)

  • main (obj)

  • argv (list)

dataio.config module

Created on Fri Jun 10 16:05:31 2022

@author: ReMarkt

class dataio.config.Config(current_task_name=None, custom_resource_path=None, log_level=30, log_file=None, run_by_user=None, **kwargs)[source]

Bases: object

property bonsai_home

environment variable to define the home directory for dataio

property classif_repository
property connector_repository
copy()[source]

Creates a deep copy of the current Config instance.

property corr_repository
property dataio_root

environment variable to define the home directory for hybrid_sut

static get_airflow_defaults(filename: str)[source]

Load config from airflow or from src/config/airflow_attributes.config.json :param filename: JSON config file located in the folder ‘src/config’ :type filename: str

Returns: None

list_parameters()[source]

List all dataclass field names.

load_env() dict[source]
log(level, message)[source]

Logs a message with the specified logging level.

property path_repository: PathBuilder
property resource_repository: ResourceRepository
property schema_enums
property schemas
property sut_resource_repository
property version_source

dataio.default module

dataio.load module

Datapackage load module of dataio utility.

dataio.load.build_cache_key(resource: DataResource) str[source]
Builds a unique string key for caching based on:
  • resource.api_endpoint

  • relevant fields (version, task_name, etc.)

dataio.load.clean_up_cache(CACHE_DIR, MAX_CACHE_FILES)[source]

Enforce that only up to MAX_CACHE_FILES CSV files remain in CACHE_DIR. Remove the oldest files (by modification time) if there are more than that.

dataio.load.get_cache_path(key: str, CACHE_DIR='./data_cache/') str[source]

Returns a local file path where the DataFrame is cached, based on the unique cache key.

dataio.load.load(path: Path, schemas: Dict[str, BaseModel] = None)[source]

This will return an empty dict if the file can’t be found

dataio.load.load_api(self, resource: DataResource, CACHE_DIR, MAX_CACHE_FILES) DataFrame[source]

Fetches data from the resource’s API endpoint and returns it as a DataFrame. Assumes the endpoint returns CSV text (adjust as needed for JSON, etc.).

Raises:

ValueError – If api_endpoint is not set or an HTTP error occurs.

dataio.load.load_dict_file(path_to_file, schema: BaseModel)[source]
dataio.load.load_matrix_file(path_to_file: Path, schema: MatrixModel, **kwargs)[source]
dataio.load.load_metadata(path_to_metadata, datapackage_names=None)[source]

Load metadata from a YAML file and convert it into a dictionary with UUIDs as keys and MetaData objects as values. The YAML file is expected to start directly with a list of metadata entries.

Parameters:

file_path (str) – The path to the YAML file that contains metadata entries.

Returns:

A dictionary where each key is a UUID (as a string) of a MetaData object and each value is the corresponding MetaData object.

Return type:

dict

Raises:
  • FileNotFoundError – If the specified file does not exist.

  • yaml.YAMLError – If there is an error in parsing the YAML file.

  • pydantic.ValidationError – If an item in the YAML file does not conform to the MetaData model.

Examples

Assuming a YAML file located at ‘example.yaml’:

>>> metadata_dict = load_metadata_from_yaml('example.yaml')
>>> print(metadata_dict['123e4567-e89b-12d3-a456-426614174000'])
MetaData(id=UUID('123e4567-e89b-12d3-a456-426614174000'), created_by=User(...), ...)
dataio.load.load_table_file(path_to_file: Path, schema: BonsaiBaseModel, **kwargs)[source]

dataio.resources module

class dataio.resources.CSVResourceRepository(db_path: str, table_name: str = 'resources', **kwargs)[source]

Bases: object

class dataio.resources.ResourceRepository(storage_method='local', db_path: str | None = None, table_name: str | None = 'resources', API_token: str | None = None, username: str | None = None, password: str | None = None, cache_dir: str = './data_cache/', MAX_CACHE_FILES: int = 3)[source]

Bases: object

Repository for managing data resources within a CSV file storage system.

db_path

Path to the directory containing the resource CSV file.

Type:

Path

table_name

Name of the table (used for naming the CSV file).

Type:

str

resources_list_path

Full path to the CSV file that stores resource information.

Type:

Path

schema

Schema used for resource data validation and storage.

Type:

DataResource

cache_dir

cache_dir determinds the location of the cached data resources. Default: ./data_cache/

Type:

str

add_or_update_resource_list(resource: DataResource, \*\*kwargs)[source]

Adds a new resource or updates an existing one in the repository.

add_to_resource_list(resource: DataResource)[source]

Adds a new resource to the repository.

update_resource_list(resource: DataResource)[source]

Updates an existing resource in the repository.

get_resource_info(**filters)[source]

Retrieves resource information based on specified filters.

add_from_dataframe(data, loc, task_name, task_relation, last_update, \*\*kwargs)[source]

Adds resource information from a DataFrame.

get_dataframe_for_task(name, \*\*kwargs)[source]

Retrieves a DataFrame for a specific task.

write_dataframe_for_task(data, name, \*\*kwargs)[source]

Writes a DataFrame to the storage based on resource information.

write_dataframe_for_resource(data, resource, overwrite)[source]

Validates and writes a DataFrame to the resource location. list_available_resources() Lists all available resources in the repository.

comment_resource(resource, comment)[source]

Adds a comment to a resource and updates the repository.

add_from_dataframe(data: DataFrame, loc: Path | str, task_name: str | None = None, task_relation: str = 'output', last_update: date = datetime.date(2025, 6, 27), **kwargs) DataResource[source]
add_or_update_resource_list(resource: DataResource, storage_method: str | None = None, **kwargs) str[source]

Adds a new resource to the repository or updates it if it already exists.

Parameters:
  • resource (DataResource) – The resource data to add or update.

  • kwargs (dict) – Additional keyword arguments used for extended functionality.

Return type:

str of the versions uuid

add_to_resource_list(resource: DataResource, storage_method: str | None = None) None[source]

Appends a new resource to the repository.

Parameters:

resource (DataResource) – The resource data to add.

Return type:

str of the generated uuid

comment_resource(resource: DataResource, comment: str) DataResource[source]
convert_dataframe(data: DataFrame, original_schema, classifications: dict, units: list[str] | None = None) Tuple[DataFrame, dict[str, set[str]]][source]
convert_dataframe_to_bonsai_classification(data: DataFrame, original_schema, units=None) Tuple[DataFrame, dict[str, set[str]]][source]
convert_units(data: DataFrame, target_units: list[str]) DataFrame[source]

Converts values in the ‘value’ column of a DataFrame to the specified target units in the list. Units not listed in the target_units remain unchanged.

Parameters:
  • dataframe (pd.DataFrame) – A DataFrame with ‘unit’ and ‘value’ columns.

  • target_units (list) – A list of target units to convert compatible units to. Example: [“kg”, “J”, “m”]

Returns:

A DataFrame with the converted values and target units.

Return type:

pd.DataFrame

convert_units_pandas(data: DataFrame, target_units: list[str]) DataFrame[source]

Converts values in the ‘value’ column of a DataFrame to the specified target units in the list. Units not listed in the target_units remain unchanged.

Parameters:
  • dataframe (pd.DataFrame) – A DataFrame with ‘unit’ and ‘value’ columns.

  • target_units (list) – A list of target units to convert compatible units to. Example: [“kg”, “J”, “m”]

Returns:

A DataFrame with the converted values and target units.

Return type:

pd.DataFrame

get_dataframe_for_resource(res: DataResource, storage_method: str | None = None)[source]
get_dataframe_for_task(name: str, storage_method: str | None = None, **kwargs) DataFrame[source]
get_latest_version(storage_method: str | None = None, **filters: dict)[source]
get_resource_info(storage_method: str | None = None, **filters: dict) DataResource | List[DataResource][source]

Retrieves resource information based on specified filters, optionally overriding the default storage method for this call.

Parameters:
  • storage_method (str or None) – Override for this call. If None, use self.load_storage_method. Valid values: ‘local’, ‘api’.

  • filters (dict) – Key-value pairs of attributes to filter the resources by.

Returns:

Matches found, either a single or multiple.

Return type:

DataResource or List[DataResource]

group_and_sum(df, code_column: str, group_columns: list, values_to_sum: list | set)[source]

Grouping function to handle unit compatibility

harmonize_with_resource(dataframe, storage_method: str | None = None, overwrite=True, **kwargs)[source]
list_available_resources(storage_method: str | None = None) list[DataResource][source]

Lists all available resources in the repository, either from local CSV or from the API, depending on the storage method.

Parameters:

storage_method (str | None) – Optional override for single-call usage (‘local’ or ‘api’). If None, uses self.load_storage_method.

Returns:

A list of all DataResource items found.

Return type:

list[DataResource]

load_with_bonsai_classification(storage_method: str | None = None, **kwargs) Tuple[DataFrame, dict[str, set[str]]][source]

This method loads the selected data based on kwargs with the default BONSAI classifications. The default classifications for BONSAI are the following:

location: ISO3 flowobject: BONSAI

load_with_classification(classifications: dict, units: list[str] | None = None, storage_method: str | None = None, **kwargs) Tuple[DataFrame, dict[str, set[str]]][source]

loads data with a certain classificaiton. for the selected fields. Rows that can’t be automatically transformed are ignored and returned as is

resource_exists(resource: DataResource, storage_method: str | None = None) bool[source]

Checks if the given resource already exists in the repository (locally or via the API).

Returns:

  • True if exactly one matching resource is found.

  • False if none are found.

Return type:

bool

Raises:

ValueError – If multiple matches are found or if an invalid storage method is set.

update_resource_list(resource: DataResource, storage_method: str | None = None) None[source]

Updates an existing resource in the repository.

Parameters:

resource (DataResource) – The resource data to update.

valid_units()[source]
write_dataframe_for_resource(data: DataFrame, resource: DataResource, overwrite=True, append=False, storage_method: str | None = None)[source]
write_dataframe_for_task(data: DataFrame, name: str, data_version: str, overwrite=True, append=False, storage_method: str | None = None, **kwargs)[source]
dataio.resources.compare_version_strings(resource1: DataResource, resource2: DataResource)[source]
dataio.resources.map_to_bonsai(row, column_names, mapping_dict)[source]

Map values from two column_names together

dataio.save module

Datapackage save module of dataio utility.

dataio.save.old_save(datapackage, root_path: str = '.', increment: str = None, overwrite: bool = False, create_path: bool = False, log_name: str = None)[source]

Save datapackage from dataio.yaml file.

Parameters:
  • datapackage (DataPackage) – dataio datapackage

  • root_path (str) – path to root of database

  • increment (str) – semantic level to increment, in [None, ‘patch’, ‘minor’, ‘major’]

  • overwrite (bool) – whether to overwrite

  • create_path (bool) – whether to create path

  • log_name (str) – name of log file, if None no log is set

dataio.save.save(data, name: str, path: Path, schema=None, overwrite=True, append=False)[source]
dataio.save.save_dict(data, path: Path, append=False)[source]
dataio.save.save_matrix(data: DataFrame, name: str, path: Path, append=False)[source]
dataio.save.save_table(data, path: Path, append=False)[source]
dataio.save.save_to_api(data: DataFrame, resource: DataResource, schema=None, overwrite=True, append=False)[source]

Saves the given DataFrame to resource.api_endpoint via a single JSON POST. The JSON body has the form:

{
“data”: [

{…}, {…}

]

}

so that multiple rows can be created in one request (per your test example).

Parameters:
  • data (pd.DataFrame) – The data to be sent. Each row becomes one dict.

  • resource (DataResource) – Must have a non-empty ‘api_endpoint’. (Optionally add resource.id => references the ‘version’ if your endpoint requires it.)

  • schema (optional) – If you want to validate ‘data’ before sending, do so here.

  • overwrite (bool) – If your API supports ‘overwrite’, pass it as a query param or in the body (depending on your API).

Raises:

ValueError – If ‘resource.api_endpoint’ is missing or if the POST fails.

dataio.save.version_list2str(version_list)[source]

Convert semantic version list to string ‘vMAJOR.MINOR.PATCH’.

dataio.save.version_str2list(version_str)[source]

Convert semantic version string ‘vMAJOR.MINOR.PATCH’ to list.

dataio.set_logger module

Module with set_logger function.

Part of package ‘templates’ Part of the Getting the Data Right project Created on Jul 25, 2023 @author: Joao Rodrigues

dataio.set_logger.set_logger(filename: str | Path = None, path: str = '/builds/bonsamurais/bonsai/util/dataio/docs', log_level: int = 20, log_format: str = '%(asctime)s | [%(levelname)s]: %(message)s', overwrite=False, create_path=False) None[source]

Initialize the logger.

This function creates and initializes a log file. Logging output is sent both to the file and standard output. If ‘filename’ == None, no file output is written To further write to this logger add in the script:

import logging logger = logging.getLogger(‘root’) logger.info(<info_string>) logger.warning(<warning_string>) logger.error(<error_string>)

Parameters:
  • filename (str) – name of output file

  • path (str) – path to folder of output file

  • log_level (int) –

    lowest log level to be reported. Options are:

    10=debug 20=info 30=warning 40=error 50=critical

  • log_format (str) – format of the log

  • overwrite (bool) – whether to overwrite existing log file

  • create_path (bool) – whether to create path to log file if it does not exist

dataio.tools module

class dataio.tools.BonsaiBaseModel[source]

Bases: BaseModel

classmethod get_api_endpoint() Dict[str, str][source]

Retrieves the api endpoint dictionary, hidden from serialization.

classmethod get_classification() Dict[str, str][source]

Retrieves the classification dictionary, hidden from serialization.

classmethod get_csv_field_dtypes() Dict[str, Any][source]

Return a dictionary with field names and their corresponding types. Since csv files can only contain str, float and int, all types that are not int and float will be changed to str

classmethod get_empty_dataframe()[source]

Returns an empty pandas DataFrame with columns corresponding to the fields of the data class.

Returns:

An empty DataFrame with columns corresponding to the fields of the data class.

Return type:

pandas.DataFrame

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod to_dataclass(input_data) BonsaiTableModel[source]
to_pandas() DataFrame[source]

Converts instances of BaseToolModel within BaseTableClass to a pandas DataFrame.

Returns:

DataFrame containing data from instances of BaseToolModel.

Return type:

pandas.DataFrame

classmethod transform_dataframe(df: DataFrame, column_mapping: Dict[str, str] | None = None) DataFrame[source]

Transform a DataFrame into the BonsaiTable format by renaming columns and dropping others. Ensures all non-optional columns are present, and keeps optional columns if they are in the DataFrame. The column_mapping is optional. If not provided, only columns matching the schema fields are kept.

Parameters:
  • df (pd.DataFrame) – The input DataFrame to transform.

  • column_mapping (Optional[Dict[str, str]]) – A dictionary mapping input DataFrame column names to schema field names. If None, only columns matching the schema fields will be kept.

Returns:

A DataFrame with columns renamed (if mapping is provided) and unnecessary columns dropped.

Return type:

pd.DataFrame

Raises:

ValueError – If any required columns are missing from the DataFrame.

class dataio.tools.BonsaiTableModel(*, data: list[BonsaiBaseModel])[source]

Bases: BaseModel

data: list[BonsaiBaseModel]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

to_json()[source]

Convert the object to a JSON string representation.

Returns:

A JSON string representing the object with data information.

Return type:

str

to_pandas() DataFrame[source]

Converts instances of BaseToolModel within BaseTableClass to a pandas DataFrame.

Returns:

DataFrame containing data from instances of BaseToolModel.

Return type:

pandas.DataFrame

dataio.tools.get_dataclasses(directory: str = 'src/dataio/schemas/bonsai_api') List[str][source]

Retrieve a list of Pydantic dataclass names that inherit from BaseToolModel from Python files in the specified directory.

Parameters:

directory (str) – The directory path where Python files containing Pydantic dataclasses are located. Defaults to “src/dataio/schemas/bonsai_api”.

Returns:

A list of fully qualified names (including module names) of Pydantic dataclasses that inherit from BaseToolModel.

Return type:

List[str]

dataio.tools.print_data_classes()[source]

Print out all the available data classes in the directory src/dataio/schemas/bonsai_api

dataio.validate module

Datapackage validate module of the dataio utility.

dataio.validate.validate(full_path: str, overwrite: bool = False, log_name: str = None) dict[source]

Validate datapackage.

Validates datapackage with metadata at: <full_path>=<root_path>/<path>/<name>.dataio.yaml

Creates <name>.validate.yaml for frictionless validation and outputs dataio-specific validation to the log.

Specific fields expected in <name>.dataio.yaml:

  • name : should match <name>

  • path : from which <path> is inferred

  • version

Parameters:
  • full_path (str) – path to dataio.yaml file

  • overwrite (bool) – whether to overwrite output files

  • log_name (str) – name of log file, if None no log is set

Returns:

frictionless validate report dictionary

Return type:

dict

dataio.validate.validate_matrix(df: DataFrame, schema: MatrixModel)[source]
dataio.validate.validate_schema(resource, n_errors)[source]

Check if schema, fields, primary and foreign keys exist.

dataio.validate.validate_table(df: DataFrame, schema: BonsaiBaseModel)[source]

Module contents

Initializes dataio Python package.

dataio.plot(full_path: str = None, overwrite: bool = False, log_name: str = None, export_png: bool = True, export_svg: bool = False, export_gv: bool = False)[source]

Create entity-relation diagram from dataio.yaml file.

Exports .gv config file and figures in and .svg and .png format

GraphViz must be installed in computer, not only as Python package.

Structure of the output erd configuration dictionary:

First-level: key = datapackage name; value type : dictionary

Second level: keys = table name; value type : pandas.DataFrame

pandas.DataFrame index: table field names

pandas.DataFrame columns:

  • type: str

  • primary: bool

  • foreign: bool

  • field: str (field of foreign key)

  • table: str (table of foreign key)

  • datapackage: str (datapackage of foreign key)

  • direction: str in [‘forward’, ‘back’] (direction of arrow)

  • style: str in [‘invis’, ‘solid’] (style of arrow)

Parameters:
  • full_path (str) – path to dataio.yaml file

  • overwrite (bool) – whether to overwrite output files

  • log_name (str) – name of log file, if None no log is set

  • export_png (bool) – whether to export .png graphic file

  • export_svg (bool) – whether to export .svg graphic file

  • export_gv (bool) – whether to export .gv configuration file

Returns:

  • dict – erd configuration dictionary

  • gv – graphviz configuration object