Source code for dataio.config

# -*- coding: utf-8 -*-
"""
Created on Fri Jun 10 16:05:31 2022

@author: ReMarkt
"""
import copy
from dataclasses import fields
from dotenv import load_dotenv
import json
import logging
import os
from pathlib import Path

import yaml

from dataio.resources import ResourceRepository
from dataio.utils.path_manager import PathBuilder

logger = logging.getLogger("config")


[docs] class Config:
[docs] @staticmethod def get_airflow_defaults(filename:str): """ Load config from airflow or from src/config/airflow_attributes.config.json Args: filename (str): JSON config file located in the folder 'src/config' Returns: None """ config_path = Path(__file__).parent.parent / "config" / filename logger.debug("path= ",str(config_path)) try: with open(config_path, 'r') as file: return json.load(file) except FileNotFoundError: raise Exception(f"Config file '{filename}' not found, please create it in the folder src/config")
def __init__( self, current_task_name=None, custom_resource_path=None, log_level=logging.WARNING, log_file=None, **kwargs, ): """ Config class for managing configurations, including logging. :param current_task_name: Name of the current task (used for logging). :param custom_resource_path: Optional path for custom resources. :param log_level: Logging level (default: WARNING). :param log_file: Optional file path to log output. :param kwargs: Additional configuration parameters. """ self.current_task_name = current_task_name self.custom_resource_path = custom_resource_path self._load_config("data_attributes.config.yaml") self._load_airflow_config("airflow_attributes.config.json") # Set up logging self.log_handler = logging.StreamHandler() self.log_level = log_level self.logger = self._setup_logger(log_file) # Assign additional configuration attributes for key, value in kwargs.items(): setattr(self, key, value) if key == 'config_path': self._load_config(value, True) def _setup_logger(self, log_file): """ Sets up the logger with a console handler and optional file handler. """ logger = logging.getLogger(self.current_task_name) logger.setLevel(self.log_level) # Prevent duplicate log handlers if not logger.handlers: # Console handler console_handler = logging.StreamHandler() console_handler.setLevel(self.log_level) # Formatter formatter = logging.Formatter( '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler.setFormatter(formatter) logger.addHandler(console_handler) # File handler (if provided) if log_file: file_handler = logging.FileHandler(log_file) file_handler.setLevel(self.log_level) file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger
[docs] def log(self, level, message): """ Logs a message with the specified logging level. """ if self.logger: self.logger.log(level, message)
[docs] def load_env(self) -> dict: load_dotenv() env_dict = {} for key in os.environ: env_dict[key] = os.environ[key] return env_dict
@property def bonsai_home(self): """ environment variable to define the home directory for dataio """ env_dict = self.load_env() assert env_dict[ "BONSAI_HOME" ], "Please set up environmental variable for 'BONSAI_HOME'" return Path(env_dict.get("BONSAI_HOME", str(Path.home()))) @property def dataio_root(self): """ environment variable to define the home directory for hybrid_sut """ env_dict = self.load_env() assert env_dict[ "DATAIO_ROOT" ], "Please set up environmental variable for 'DATAIO_ROOT'" return Path(env_dict.get("DATAIO_ROOT", str(Path.home()))) @property def path_repository(self) -> PathBuilder: from dataio.utils.accounts import AccountRepository print(f"Get version from {self.version_source}") return PathBuilder( Path(self.bonsai_home), version_source=self.version_source, account_repository=AccountRepository( self.bonsai_home / "_bonsai" / "accounts.json" ), ) @property def version_source(self): vdate = self.date.replace("-", "") path_version = ( self.bonsai_home / "_bonsai" / "versions" / f"versions_{vdate}.txt" ) if not path_version.exists(): path_version = self.bonsai_home / "versions" / f"versions_{vdate}.txt" # TODO: replace this path with Version class return path_version @version_source.setter def version_source(self, path: Path) -> None: self.version_source = path @property def schemas(self): from dataio.schemas import bonsai_api return bonsai_api @property def schema_enums(self): from dataio.utils import schema_enums return schema_enums @property def connector_repository(self): from dataio.utils.connectors import connector_repository return connector_repository
[docs] def list_parameters(self): """List all dataclass field names.""" return [field.name for field in fields(self)]
@property def resource_repository(self) -> ResourceRepository: from dataio.resources import ResourceRepository db_path = ( self.custom_resource_path if self.custom_resource_path else self.dataio_root ) return ResourceRepository(db_path=db_path) @property def sut_resource_repository(self): from dataio.utils.hsut.resources_hsut import CSVResourceRepository as SutCSVResourceRepository db_path = self.path_repository.exiobase4 if self.path_repository.exiobase4 else self.bonsai_home return SutCSVResourceRepository(db_path) def _load_config(self, filename:str, isLocalfile: bool = False) -> None: '''Load config file in the class Config's attributes Args: filename (str): YAML config file located in the folder 'src/config' Returns: None ''' config_path = Path(filename) if isLocalfile else Path(__file__).parent.parent / "config" / filename logger.debug("path= ",{config_path}) try: with open(config_path, 'r') as file: config = yaml.safe_load(file) for key, value in config.items(): self.__setattr__(key, value) return None except FileNotFoundError: raise Exception(f"Config file '{filename}' not found, please create it, or one in the folder src/config") def _load_airflow_config(self, filename:str): """ Load config from airflow or from src/config/airflow_attributes.config.json Args: filename (str): JSON config file located in the folder 'src/config' Returns: None """ airflow_defaults = Config.get_airflow_defaults(filename) for key, value in airflow_defaults.items(): self.__setattr__(key, value)
[docs] def copy(self): """ Creates a deep copy of the current Config instance. """ return copy.deepcopy(self)