dataio package
Subpackages
- dataio.schemas package
- Subpackages
- dataio.schemas.bonsai_api package
- Submodules
- dataio.schemas.bonsai_api.PPF_fact_schemas_samples module
- dataio.schemas.bonsai_api.PPF_fact_schemas_uncertainty module
- dataio.schemas.bonsai_api.admin module
- dataio.schemas.bonsai_api.base_models module
- dataio.schemas.bonsai_api.correspondences module
- dataio.schemas.bonsai_api.dims module
- dataio.schemas.bonsai_api.external_schemas module
- dataio.schemas.bonsai_api.facts module
- dataio.schemas.bonsai_api.ipcc module
- dataio.schemas.bonsai_api.matrix module
- dataio.schemas.bonsai_api.metadata module
- dataio.schemas.bonsai_api.uncertainty module
- Module contents
- dataio.schemas.bonsai_api package
- Submodules
- dataio.schemas.base_models module
- Module contents
- Subpackages
- dataio.utils package
- Subpackages
- Submodules
- dataio.utils.accounts module
- dataio.utils.connectors module
- dataio.utils.path_manager module
PathBuilderPathBuilder.b2_versionPathBuilder.balance_rawPathBuilder.cementPathBuilder.cement_dataPathBuilder.cleaned_exio_3PathBuilder.cleaned_faoPathBuilder.cleaned_forestryPathBuilder.compose()PathBuilder.corresp_faoPathBuilder.dm_coeffPathBuilder.emissions_coeffPathBuilder.emissions_intermediatePathBuilder.emissions_rawPathBuilder.fao_collectionPathBuilder.fao_processedPathBuilder.fao_rawPathBuilder.fao_storePathBuilder.fert_intermPathBuilder.fertilisersPathBuilder.ferts_collectionPathBuilder.fish_marketsPathBuilder.gams_inputsPathBuilder.heat_marketsPathBuilder.hiot_intermPathBuilder.hiot_rawPathBuilder.hiot_with_capitalPathBuilder.hiot_with_ilucPathBuilder.hiot_with_marg_electPathBuilder.iLUC_intermPathBuilder.iLUC_paramPathBuilder.iLUC_rawPathBuilder.iea_cleanPathBuilder.iea_intermPathBuilder.iea_raw_exioPathBuilder.ipcc_paramPathBuilder.land_usePathBuilder.lci_act_genericPathBuilder.lci_cleanedPathBuilder.lci_concitoPathBuilder.lci_countryPathBuilder.lci_exio4PathBuilder.lci_fishPathBuilder.lci_prod_genericPathBuilder.lci_productsPathBuilder.lci_rawPathBuilder.lci_vehiclesPathBuilder.list_path_attributes()PathBuilder.matrix_of_investPathBuilder.merged_collected_dataPathBuilder.monetary_tablesPathBuilder.natural_resourcePathBuilder.outlierPathBuilder.pricesPathBuilder.prod_marketsPathBuilder.property_paramPathBuilder.property_valuesPathBuilder.simaproPathBuilder.supply_intermediatePathBuilder.supply_rawPathBuilder.trade_intermediatePathBuilder.trade_rawPathBuilder.trade_routePathBuilder.un_dataPathBuilder.un_data_elabPathBuilder.use_intermediatePathBuilder.use_rawPathBuilder.value_addedPathBuilder.waste_accountsPathBuilder.waste_markets
- dataio.utils.schema_enums module
APIEndpointsAPIEndpoints.ACTIVITIESAPIEndpoints.ACTIVITY_CORRAPIEndpoints.BASE_URLAPIEndpoints.FOOTPRINTAPIEndpoints.LOCATIONSAPIEndpoints.LOCATION_CORRAPIEndpoints.METADATAAPIEndpoints.PRODUCTAPIEndpoints.PRODUCTSAPIEndpoints.PRODUCT_CORRAPIEndpoints.PROPERTIESAPIEndpoints.RECIPESAPIEndpoints.SUPPLYAPIEndpoints.TOKENAPIEndpoints.USEAPIEndpoints.get_url()
EmissCompartmentExio_fert_nutrientsIndexNamesIndexNames.ACT_CODEIndexNames.AGGREGATIONIndexNames.AGRI_SYSTEMIndexNames.ANIMAL_CATEGORYIndexNames.ASSOCIATED_TREATIndexNames.CLIMATE_METRICIndexNames.COEFFICIENTIndexNames.COUNTRY_CODEIndexNames.DESCRIPIndexNames.DESTIN_COUNTRYIndexNames.DESTIN_COUNTRY_EDGEIndexNames.EDGEIndexNames.EMIS_COMPARTMENTIndexNames.EMIS_SUBSTANCEIndexNames.EXIO3IndexNames.EXIO3_ACTIndexNames.EXIO_ACCOUNTIndexNames.EXIO_ACTIndexNames.EXIO_CNTIndexNames.EXIO_CNT_acronIndexNames.EXIO_CODEIndexNames.EXIO_PRDIndexNames.FACTORIndexNames.FLAGIndexNames.GLOBAL_AREAIndexNames.INPUT_PRODIndexNames.LCI_FLAGIndexNames.LCI_UNITIndexNames.LCI_VALUEIndexNames.MARKETIndexNames.NUTRIENT_CONTIndexNames.OJBECT_CODEIndexNames.ORIGIN_COUNTRYIndexNames.ORIGIN_COUNTRY_EDGEIndexNames.PACKAGEDIndexNames.PACK_CODEIndexNames.PACK_MARKETIndexNames.PACK_PRODIndexNames.PARENT_CLASSIndexNames.PARENT_CODEIndexNames.PARENT_LINKIndexNames.PARENT_NAMEIndexNames.PERIODIndexNames.PERIOD_DELAYIndexNames.POSITIONIndexNames.PRODUCTIndexNames.PRODUCTIONIndexNames.PROD_CODEIndexNames.REF_PROD_CODEIndexNames.REPLACED_MRKIndexNames.REPLACED_PRODUCTIndexNames.REPL_FACTORIndexNames.RESOURCEIndexNames.SCENARIOIndexNames.SHAREIndexNames.SOURCEIndexNames.SOURCE_CLASSIndexNames.SOURCE_CODEIndexNames.SOURCE_LINKIndexNames.SOURCE_NAMEIndexNames.SUBSTITUTION_FACTORIndexNames.TARGET_CLASSIndexNames.TARGET_CODEIndexNames.TARGET_LINKIndexNames.TARGET_NAMEIndexNames.TRADE_ROUTE_IDIndexNames.TRANSPORT_MODEIndexNames.TRANSPORT_MODE_EDGEIndexNames.UNITIndexNames.UNIT_DESTINIndexNames.UNIT_FOOTPRINTIndexNames.UNIT_SOURCEIndexNames.VALUEIndexNames.VALUE_FOOTPRINTIndexNames.VALUE_INIndexNames.WASTE_FRACTIONIndexNames.WASTE_MARKET
PropertyPropertyEnumanimal_systemdata_index_categdata_index_categ.balance_categdata_index_categ.balance_columnsdata_index_categ.column_categdata_index_categ.emiss_categdata_index_categ.fao_animal_system_indexdata_index_categ.fao_clean_indexdata_index_categ.general_categdata_index_categ.pack_indexdata_index_categ.trade_categdata_index_categ.waste_sup_coldata_index_categ.waste_sup_index
fao_categglobal_land_categipcc_categland_use_categ
- dataio.utils.versions module
- Module contents
Submodules
dataio.arg_parser module
dataio.config module
Created on Fri Jun 10 16:05:31 2022
@author: ReMarkt
- class dataio.config.Config(current_task_name=None, custom_resource_path=None, log_level=30, log_file=None, run_by_user=None, **kwargs)[source]
Bases:
object- property bonsai_home
environment variable to define the home directory for dataio
- property classif_repository
- property connector_repository
- property corr_repository
- property dataio_root
environment variable to define the home directory for hybrid_sut
- static get_airflow_defaults(filename: str)[source]
Load config from airflow or from src/config/airflow_attributes.config.json :param filename: JSON config file located in the folder ‘src/config’ :type filename: str
Returns: None
- property path_repository: PathBuilder
- property resource_repository: ResourceRepository
- property schema_enums
- property schemas
- property sut_resource_repository
- property version_source
dataio.default module
dataio.load module
Datapackage load module of dataio utility.
- dataio.load.build_cache_key(resource: DataResource) str[source]
- Builds a unique string key for caching based on:
resource.api_endpoint
relevant fields (version, task_name, etc.)
- dataio.load.clean_up_cache(CACHE_DIR, MAX_CACHE_FILES)[source]
Enforce that only up to MAX_CACHE_FILES CSV files remain in CACHE_DIR. Remove the oldest files (by modification time) if there are more than that.
- dataio.load.get_cache_path(key: str, CACHE_DIR='./data_cache/') str[source]
Returns a local file path where the DataFrame is cached, based on the unique cache key.
- dataio.load.load(path: Path, schemas: Dict[str, BaseModel] = None)[source]
This will return an empty dict if the file can’t be found
- dataio.load.load_api(self, resource: DataResource, CACHE_DIR, MAX_CACHE_FILES) DataFrame[source]
Fetches data from the resource’s API endpoint and returns it as a DataFrame. Assumes the endpoint returns CSV text (adjust as needed for JSON, etc.).
- Raises:
ValueError – If api_endpoint is not set or an HTTP error occurs.
- dataio.load.load_matrix_file(path_to_file: Path, schema: MatrixModel, **kwargs)[source]
- dataio.load.load_metadata(path_to_metadata, datapackage_names=None)[source]
Load metadata from a YAML file and convert it into a dictionary with UUIDs as keys and MetaData objects as values. The YAML file is expected to start directly with a list of metadata entries.
- Parameters:
file_path (str) – The path to the YAML file that contains metadata entries.
- Returns:
A dictionary where each key is a UUID (as a string) of a MetaData object and each value is the corresponding MetaData object.
- Return type:
- Raises:
FileNotFoundError – If the specified file does not exist.
yaml.YAMLError – If there is an error in parsing the YAML file.
pydantic.ValidationError – If an item in the YAML file does not conform to the MetaData model.
Examples
Assuming a YAML file located at ‘example.yaml’:
>>> metadata_dict = load_metadata_from_yaml('example.yaml') >>> print(metadata_dict['123e4567-e89b-12d3-a456-426614174000']) MetaData(id=UUID('123e4567-e89b-12d3-a456-426614174000'), created_by=User(...), ...)
- dataio.load.load_table_file(path_to_file: Path, schema: BonsaiBaseModel, **kwargs)[source]
dataio.resources module
- class dataio.resources.CSVResourceRepository(db_path: str, table_name: str = 'resources', **kwargs)[source]
Bases:
object
- class dataio.resources.ResourceRepository(storage_method='local', db_path: str | None = None, table_name: str | None = 'resources', API_token: str | None = None, username: str | None = None, password: str | None = None, cache_dir: str = './data_cache/', MAX_CACHE_FILES: int = 3, ureg=<pint.registry.UnitRegistry object>)[source]
Bases:
objectRepository for managing data resources within a CSV file storage system.
- db_path
Path to the directory containing the resource CSV file.
- Type:
Path
- resources_list_path
Full path to the CSV file that stores resource information.
- Type:
Path
- schema
Schema used for resource data validation and storage.
- Type:
- cache_dir
cache_dir determinds the location of the cached data resources. Default: ./data_cache/
- Type:
- add_or_update_resource_list(resource: DataResource, \*\*kwargs)[source]
Adds a new resource or updates an existing one in the repository.
- add_to_resource_list(resource: DataResource)[source]
Adds a new resource to the repository.
- update_resource_list(resource: DataResource)[source]
Updates an existing resource in the repository.
- add_from_dataframe(data, loc, task_name, task_relation, last_update, \*\*kwargs)[source]
Adds resource information from a DataFrame.
- write_dataframe_for_task(data, name, \*\*kwargs)[source]
Writes a DataFrame to the storage based on resource information.
- write_dataframe_for_resource(data, resource, overwrite)[source]
Validates and writes a DataFrame to the resource location. list_available_resources() Lists all available resources in the repository.
- comment_resource(resource, comment)[source]
Adds a comment to a resource and updates the repository.
- add_from_dataframe(data: DataFrame, loc: Path | str, task_name: str | None = None, task_relation: str = 'output', last_update: date = datetime.date(2025, 10, 20), **kwargs) DataResource[source]
- add_or_update_resource_list(resource: DataResource, storage_method: str | None = None, **kwargs) str[source]
Adds a new resource to the repository or updates it if it already exists.
- Parameters:
resource (DataResource) – The resource data to add or update.
kwargs (dict) – Additional keyword arguments used for extended functionality.
- Return type:
str of the versions uuid
- add_to_resource_list(resource: DataResource, storage_method: str | None = None) None[source]
Appends a new resource to the repository.
- Parameters:
resource (DataResource) – The resource data to add.
- Return type:
str of the generated uuid
- comment_resource(resource: DataResource, comment: str) DataResource[source]
- convert_dataframe(data: DataFrame, original_schema: any, classifications: dict, units: list[str] | None = None) Tuple[DataFrame, dict[str, set[str]]][source]
Convert a DataFrame’s classification columns to a target schema using concordance mappings.
- This method aligns input data with a target classification system by:
Validating and extracting classification metadata from original_schema
Applying concordances between source and target classifications
Handling pairwise mappings for activity/flow relationships
Tracking and reporting unmapped values
Optionally converting measurement units
- Parameters:
data (pd.DataFrame) – The input dataset whose classification columns are to be mapped.
original_schema (type | BaseModel | dict) –
- Schema definition of the source data. Must either:
Provide a get_classification() method (dataio.schema), or
Be a dict mapping column names to (classification_name, classification_type).
Example
>>> original_schema = { ... "location": ("iso_3166_1_numeric", "location"), ... "product": ("ipcc", "flowobject"), ... }
classifications (dict) –
Mapping of classification type names to their target classification systems. .. rubric:: Example
>>> bonsai_classifications = { ... "location": "bonsai", ... "flowobject": "bonsai", ... }
units (list[str] | None, optional) – List of units to convert into, if applicable.
- Returns:
- A tuple containing:
The transformed DataFrame with mapped classifications
A dictionary of unmapped values per column
- Return type:
- Raises:
AttributeError – If original_schema lacks a required get_classification() method.
TypeError – If original_schema is neither a valid schema nor a dict in the correct format.
Notes
Logs warnings for missing or unavailable concordances.
Many-to-many correspondences are skipped during mapping.
If concordances contain account type information, it is extracted and added as a new column.
- convert_dataframe_to_bonsai_classification(data: DataFrame, original_schema, units=None) Tuple[DataFrame, dict[str, set[str]]][source]
- convert_units(data: DataFrame, target_units: list[str]) DataFrame[source]
Converts values in the ‘value’ column of a DataFrame to the specified target units in the list. Units not listed in the target_units remain unchanged.
- Parameters:
data (pd.DataFrame) – A DataFrame with ‘unit’ and ‘value’ columns.
target_units (list) – A list of target units to convert compatible units to. Example: [“kg”, “J”, “m”]
- Returns:
A DataFrame with the converted values and target units.
- Return type:
pd.DataFrame
- get_dataframe_for_resource(res: DataResource, storage_method: str | None = None)[source]
- get_resource_info(storage_method: str | None = None, **filters: dict) DataResource | List[DataResource][source]
Retrieves resource information based on specified filters, optionally overriding the default storage method for this call.
- Parameters:
- Returns:
Matches found, either a single or multiple.
- Return type:
DataResource or List[DataResource]
- group_and_sum(df, code_column: str, group_columns: list, values_to_sum: list | set)[source]
Grouping function to handle unit compatibility
- harmonize_with_resource(dataframe, storage_method: str | None = None, overwrite=True, **kwargs)[source]
- list_available_resources(storage_method: str | None = None) list[DataResource][source]
Lists all available resources in the repository, either from local CSV or from the API, depending on the storage method.
- Parameters:
storage_method (str | None) – Optional override for single-call usage (‘local’ or ‘api’). If None, uses self.load_storage_method.
- Returns:
A list of all DataResource items found.
- Return type:
- load_with_bonsai_classification(storage_method: str | None = None, **kwargs) Tuple[DataFrame, dict[str, set[str]]][source]
This method loads the selected data based on kwargs with the default BONSAI classifications. The default classifications for BONSAI are the following:
location: ISO3 flowobject: BONSAI
- load_with_classification(classifications: dict, units: list[str] | None = None, storage_method: str | None = None, **kwargs) Tuple[DataFrame, dict[str, set[str]]][source]
loads data with a certain classificaiton. for the selected fields. Rows that can’t be automatically transformed are ignored and returned as is
- resource_exists(resource: DataResource, storage_method: str | None = None) bool[source]
Checks if the given resource already exists in the repository (locally or via the API).
- Returns:
True if exactly one matching resource is found.
False if none are found.
- Return type:
- Raises:
ValueError – If multiple matches are found or if an invalid storage method is set.
- update_resource_list(resource: DataResource, storage_method: str | None = None) None[source]
Updates an existing resource in the repository.
- Parameters:
resource (DataResource) – The resource data to update.
- dataio.resources.compare_version_strings(resource1: DataResource, resource2: DataResource)[source]
dataio.save module
Datapackage save module of dataio utility.
- dataio.save.old_save(datapackage, root_path: str = '.', increment: str = None, overwrite: bool = False, create_path: bool = False, log_name: str = None)[source]
Save datapackage from dataio.yaml file.
- Parameters:
datapackage (DataPackage) – dataio datapackage
root_path (str) – path to root of database
increment (str) – semantic level to increment, in [None, ‘patch’, ‘minor’, ‘major’]
overwrite (bool) – whether to overwrite
create_path (bool) – whether to create path
log_name (str) – name of log file, if None no log is set
- dataio.save.save_to_api(data: DataFrame, resource: DataResource, schema=None, overwrite=True, append=False)[source]
Saves the given DataFrame to resource.api_endpoint via a single JSON POST. The JSON body has the form:
- {
- “data”: [
{…}, {…}
]
}
so that multiple rows can be created in one request (per your test example).
- Parameters:
data (pd.DataFrame) – The data to be sent. Each row becomes one dict.
resource (DataResource) – Must have a non-empty ‘api_endpoint’. (Optionally add resource.id => references the ‘version’ if your endpoint requires it.)
schema (optional) – If you want to validate ‘data’ before sending, do so here.
overwrite (bool) – If your API supports ‘overwrite’, pass it as a query param or in the body (depending on your API).
- Raises:
ValueError – If ‘resource.api_endpoint’ is missing or if the POST fails.
dataio.set_logger module
Module with set_logger function.
Part of package ‘templates’ Part of the Getting the Data Right project Created on Jul 25, 2023 @author: Joao Rodrigues
- dataio.set_logger.set_logger(filename: str | Path = None, path: str = '/builds/bonsamurais/bonsai/util/dataio/docs', log_level: int = 20, log_format: str = '%(asctime)s | [%(levelname)s]: %(message)s', overwrite=False, create_path=False) None[source]
Initialize the logger.
This function creates and initializes a log file. Logging output is sent both to the file and standard output. If ‘filename’ == None, no file output is written To further write to this logger add in the script:
import logging logger = logging.getLogger(‘root’) logger.info(<info_string>) logger.warning(<warning_string>) logger.error(<error_string>)
- Parameters:
filename (str) – name of output file
path (str) – path to folder of output file
log_level (int) –
- lowest log level to be reported. Options are:
10=debug 20=info 30=warning 40=error 50=critical
log_format (str) – format of the log
overwrite (bool) – whether to overwrite existing log file
create_path (bool) – whether to create path to log file if it does not exist
dataio.tools module
- class dataio.tools.BonsaiBaseModel[source]
Bases:
BaseModel- classmethod get_api_endpoint() Dict[str, str][source]
Retrieves the api endpoint dictionary, hidden from serialization.
- classmethod get_classification() Dict[str, str][source]
Retrieves the classification dictionary, hidden from serialization.
- classmethod get_csv_field_dtypes() Dict[str, Any][source]
Return a dictionary with field names and their corresponding types. Since csv files can only contain str, float and int, all types that are not int and float will be changed to str
- classmethod get_empty_dataframe()[source]
Returns an empty pandas DataFrame with columns corresponding to the fields of the data class.
- Returns:
An empty DataFrame with columns corresponding to the fields of the data class.
- Return type:
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod to_dataclass(input_data) BonsaiTableModel[source]
- to_pandas() DataFrame[source]
Converts instances of BaseToolModel within BaseTableClass to a pandas DataFrame.
- Returns:
DataFrame containing data from instances of BaseToolModel.
- Return type:
- classmethod transform_dataframe(df: DataFrame, column_mapping: Dict[str, str] | None = None) DataFrame[source]
Transform a DataFrame into the BonsaiTable format by renaming columns and dropping others. Ensures all non-optional columns are present, and keeps optional columns if they are in the DataFrame. The column_mapping is optional. If not provided, only columns matching the schema fields are kept.
- Parameters:
- Returns:
A DataFrame with columns renamed (if mapping is provided) and unnecessary columns dropped.
- Return type:
pd.DataFrame
- Raises:
ValueError – If any required columns are missing from the DataFrame.
- class dataio.tools.BonsaiTableModel(*, data: list[BonsaiBaseModel])[source]
Bases:
BaseModel- data: list[BonsaiBaseModel]
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- to_json()[source]
Convert the object to a JSON string representation.
- Returns:
A JSON string representing the object with data information.
- Return type:
- dataio.tools.get_dataclasses(directory: str = 'src/dataio/schemas/bonsai_api') List[str][source]
Retrieve a list of Pydantic dataclass names that inherit from BaseToolModel from Python files in the specified directory.
- Parameters:
directory (str) – The directory path where Python files containing Pydantic dataclasses are located. Defaults to “src/dataio/schemas/bonsai_api”.
- Returns:
A list of fully qualified names (including module names) of Pydantic dataclasses that inherit from BaseToolModel.
- Return type:
List[str]
dataio.validate module
Datapackage validate module of the dataio utility.
- dataio.validate.validate(full_path: str, overwrite: bool = False, log_name: str = None) dict[source]
Validate datapackage.
Validates datapackage with metadata at: <full_path>=<root_path>/<path>/<name>.dataio.yaml
Creates <name>.validate.yaml for frictionless validation and outputs dataio-specific validation to the log.
Specific fields expected in <name>.dataio.yaml:
name : should match <name>
path : from which <path> is inferred
version
- dataio.validate.validate_matrix(df: DataFrame, schema: MatrixModel)[source]
- dataio.validate.validate_schema(resource, n_errors)[source]
Check if schema, fields, primary and foreign keys exist.
- dataio.validate.validate_table(df: DataFrame, schema: BonsaiBaseModel)[source]
Module contents
Initializes dataio Python package.
- dataio.plot(full_path: str = None, overwrite: bool = False, log_name: str = None, export_png: bool = True, export_svg: bool = False, export_gv: bool = False)[source]
Create entity-relation diagram from dataio.yaml file.
Exports .gv config file and figures in and .svg and .png format
GraphViz must be installed in computer, not only as Python package.
Structure of the output erd configuration dictionary:
First-level: key = datapackage name; value type : dictionary
Second level: keys = table name; value type : pandas.DataFrame
pandas.DataFrame index: table field names
pandas.DataFrame columns:
type: str
primary: bool
foreign: bool
field: str (field of foreign key)
table: str (table of foreign key)
datapackage: str (datapackage of foreign key)
direction: str in [‘forward’, ‘back’] (direction of arrow)
style: str in [‘invis’, ‘solid’] (style of arrow)
- Parameters:
full_path (str) – path to dataio.yaml file
overwrite (bool) – whether to overwrite output files
log_name (str) – name of log file, if None no log is set
export_png (bool) – whether to export .png graphic file
export_svg (bool) – whether to export .svg graphic file
export_gv (bool) – whether to export .gv configuration file
- Returns:
dict – erd configuration dictionary
gv – graphviz configuration object