Source code for dataio.schemas.bonsai_api.external_schemas

import datetime
from typing import ClassVar, Dict, Optional, Tuple

from pydantic import Field, PrivateAttr

import dataio.schemas.bonsai_api.PPF_fact_schemas_uncertainty as PPF_fact_schemas_uncertainty
from dataio.schemas.bonsai_api.base_models import FactBaseModel_uncertainty


[docs] class PRODCOMProductionVolume( PPF_fact_schemas_uncertainty.ProductionVolumes_uncertainty ): indicator: str _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("GEOnumeric", "location"), "product": ("prodcom_total_2_0", "flowobject"), } # Total production (ds-056121) def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}-{self.value}-{self.unit}"
[docs] class PRODCOMSoldProductionVolume( PPF_fact_schemas_uncertainty.ProductionVolumes_uncertainty ): indicator: str _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("GEOnumeric", "location"), "product": ("prodcom_sold_2_0", "flowobject"), } # Sold production (ds-056120) def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}-{self.value}-{self.unit}"
[docs] class IndustrialCommodityStatistic( PPF_fact_schemas_uncertainty.ProductionVolumes_uncertainty ): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("name_short", "location"), "product": ("undata_ics", "flowobject"), } def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}-{self.value}-{self.unit}"
[docs] class baseExternalSchemas(FactBaseModel_uncertainty): # Created with Sanders. Might Not be the base location: str time: int unit: str value: float comment: Optional[str] = None flag: Optional[str] = None
[docs] class ExternalMonetarySUT(baseExternalSchemas): table_type: str # Supply or use table product_code: str product_name: str activity_code: str activity_name: str price_type: str = Field( # Current price or previous year price. default="current prices" ) consumer_price: bool = Field(default=False) # Consumer price vs production price money_unit: Optional[str] = None # Millions, billions etc. diagonal: Optional[bool] = None
# For annual data tables that start on a day other than January 1st. E.g. the fiscal year of India.
[docs] class BrokenYearMonetarySUT(ExternalMonetarySUT): time: datetime.date # Start date of fiscal year
[docs] class EuropeanMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("cpa_2_1_lvl2", "flowobject"), "activity_code": ("nace_rev2", "activitytype"), }
[docs] class OldEuropeanMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("cpa_2008", "flowobject"), "activity_code": ("nace_rev2", "activitytype"), }
[docs] class OlderEuropeanMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("cpa_2008", "flowobject"), "activity_code": ("nace_rev1_1", "activitytype"), }
[docs] class InternationalMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("cpc_2_1", "flowobject"), "activity_code": ("isic_rev4", "activitytype"), }
[docs] class NACEMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("nace_rev2", "flowobject"), "activity_code": ("nace_rev2", "activitytype"), }
[docs] class OECDMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("cpa_2008", "flowobject"), "activity_code": ("isic_rev4", "activitytype"), }
[docs] class AfricanMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("africa_flow", "flowobject"), "activity_code": ("isic_africa", "activitytype"), }
[docs] class AustralianMonetarySUT(ExternalMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("suic", "flowobject"), "activity_code": ("anzsic_2006", "activitytype"), }
[docs] class EgyptianMonetarySUT(BrokenYearMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("cpc_1_1", "flowobject"), "activity_code": ("isic_rev4", "activitytype"), }
[docs] class IndianMonetarySUT(BrokenYearMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("india_sut", "flowobject"), "activity_code": ("india_sut", "activitytype"), }
[docs] class JapanMonetarySUT(BrokenYearMonetarySUT): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product_code": ("japan_sut", "flowobject"), "activity": ("japan_sut", "activitytype"), }
[docs] class FAOstat(FactBaseModel_uncertainty): product_name: str product: str fao_element: str location: str time: int value: float unit: str flag: Optional[str] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("FAOcode", "location"), "product": ("faostat", "flowobject"), }
[docs] class FAOtrade(FactBaseModel_uncertainty): product_name: str product: str export_location: str # Exporter import_location: str # Importer time: int value: float unit: str flag: Optional[str] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("fao_area", "location"), "partner": ("fao_area", "location"), "product": ("fao_items", "flowobject"), }
[docs] class UNdataEnergyBalance(FactBaseModel_uncertainty): activity: str product: str location: str time: int value: float unit: str comment: Optional[str] = None flag: Optional[str] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("ISOnumeric", "location"), "product": ("undata_energy_stats", "flowobject"), # or prodcom_sold_2_0 } def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}"
[docs] class UNdataEnergyStatistic(FactBaseModel_uncertainty): activity: str product: str location: str time: int value: float unit: str comment: Optional[str] = None flag: Optional[str] = None conversion_factor: Optional[float] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("ISOnumeric", "location"), "product": ("undata_energy_stats", "flowobject"), "activity": ("undata_energy_stats", "activitytype"), } def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}"
[docs] class BACITrade(FactBaseModel_uncertainty): time: int product: str export_location: str import_location: str value: float unit: str flag: Optional[str] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product": ("hs1992", "flowobject"), "export_location": ("ISO2", "location"), "import_location": ("ISO2", "location"), }
[docs] class USGSProductionVolume(PPF_fact_schemas_uncertainty.ProductionVolumes_uncertainty): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("name_short", "location"), "product": ("usgs", "flowobject"), } def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}-{self.value}-{self.unit}"
[docs] class StatCanChemProductionVolume( PPF_fact_schemas_uncertainty.ProductionVolumes_uncertainty ): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product": ("statcan_chemicals", "flowobject"), } def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}-{self.value}-{self.unit}"
[docs] class ComexTrade(FactBaseModel_uncertainty): time: int product: str activity: str export_location: str import_location: str value: float unit: str transport_mode: Optional[str] = None country_active_transport_mode: Optional[str] = None container: Optional[str] = None flag: Optional[str] = None def __str__(self) -> str: return f"{self.product}-{self.activity}-{self.time}-{self.value}-{self.unit}" _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product": ("comext", "flowobject"), "activity": ("comext", "activitytype"), "export_location": ("ISO2", "location"), "import_location": ("ISO2", "location"), }
[docs] class UNdataWDI(FactBaseModel_uncertainty): location: str time: int value: float unit: str flag: Optional[str] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("ISO3", "location"), }
[docs] class ProdcomTrade(FactBaseModel_uncertainty): time: int product: int export_location: str import_location: str value: float unit: str flag: Optional[str] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "product": ("prodcom_sold_2_0", "flowobject"), "export_location": ("GEOnumeric", "location"), "import_location": ("GEOnumeric", "location"), }
[docs] class UnfcccProductionVolume( PPF_fact_schemas_uncertainty.ProductionVolumes_uncertainty ): _classification: ClassVar[Dict[str, Tuple[str, str]]] = { "location": ("ISO3", "location"), "product": ("unfccc_prodvol","flowobject"), # I'd add in the correspondence next sprint } def __str__(self) -> str: return f"{self.location}-{self.product}-{self.activity}-{self.time}-{self.value}-{self.unit}"
[docs] class ADBMonetaryIOT(baseExternalSchemas): supplier_name: str # Supplying activity supplier_code: str user_name: str # Using activity user_code: str price_type: str = Field( # Current price or previous year price. default="current prices" ) money_unit: Optional[str] = None # Millions, billions etc. diagonal: Optional[bool] = None _classification: ClassVar[Dict[str, Tuple[str, str]]] ={ "supplier_code": ("adb_supplier", "flow"), "user_code": ("adb_user", "flow"), "location": ("regex", "location"), }