Files
service-finder/backend/app/services/config_service.py
2026-03-22 11:02:05 +00:00

271 lines
11 KiB
Python
Executable File

# /opt/docker/dev/service_finder/backend/app/services/config_service.py
from typing import Any, Optional, Dict
import logging
import os
import json
from decimal import Decimal
from datetime import datetime, timezone
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
# Modellek importálása a központi helyről
from app.models import ExchangeRate, AssetCost, AssetTelemetry
from app.models.system.system import SystemParameter, ParameterScope
from app.db.session import AsyncSessionLocal
logger = logging.getLogger(__name__)
class CostService:
# A cost_in típusát 'Any'-re állítottam ideiglenesen, hogy ne dobjon újabb ImportError-t a hiányzó Pydantic séma miatt
async def record_cost(self, db: AsyncSession, cost_in: Any, user_id: int):
try:
# 1. Árfolyam lekérése (EUR Pivot)
rate_stmt = select(ExchangeRate).where(
ExchangeRate.target_currency == cost_in.currency_local
).order_by(ExchangeRate.id.desc()).limit(1)
rate_res = await db.execute(rate_stmt)
rate_obj = rate_res.scalar_one_or_none()
exchange_rate = rate_obj.rate if rate_obj else Decimal("1.0")
# 2. Kalkuláció
amt_eur = Decimal(str(cost_in.amount_local)) / exchange_rate
# 3. Mentés az új AssetCost modellbe
new_cost = AssetCost(
asset_id=cost_in.asset_id,
organization_id=cost_in.organization_id,
driver_id=user_id,
cost_type=cost_in.cost_type,
amount_local=cost_in.amount_local,
currency_local=cost_in.currency_local,
amount_eur=amt_eur,
exchange_rate_used=exchange_rate,
mileage_at_cost=cost_in.mileage_at_cost,
date=cost_in.date or datetime.now(timezone.utc)
)
db.add(new_cost)
# 4. Telemetria szinkron
if cost_in.mileage_at_cost:
tel_stmt = select(AssetTelemetry).where(AssetTelemetry.asset_id == cost_in.asset_id)
telemetry = (await db.execute(tel_stmt)).scalar_one_or_none()
if telemetry and cost_in.mileage_at_cost > (telemetry.current_mileage or 0):
telemetry.current_mileage = cost_in.mileage_at_cost
await db.commit()
return new_cost
except Exception as e:
await db.rollback()
raise e
class ConfigService:
"""
Egyszerű konfigurációs szolgáltatás a SystemParameter tábla lekérdezéséhez.
Támogatja a különböző típusú értékek lekérését alapértelmezett értékkel.
"""
@staticmethod
async def get(db: AsyncSession, key: str, default: Any = None, scope_level: ParameterScope = ParameterScope.GLOBAL, scope_id: Optional[str] = None) -> Any:
"""
Általános lekérdezés a SystemParameter táblából.
Args:
db: AsyncSession
key: A konfigurációs kulcs
default: Alapértelmezett érték, ha a kulcs nem található
scope_level: A paraméter scope-ja (global, country, region, user)
scope_id: A scope azonosítója (pl. országkód, user_id)
Returns:
A talált érték (a megfelelő típusban) vagy a default.
"""
from sqlalchemy import select, and_, cast, String
try:
# Convert scope_level to lowercase string for comparison
# PostgreSQL enum expects lowercase values, but Python Enum may be uppercase
scope_str = scope_level.value.lower() if hasattr(scope_level, 'value') else str(scope_level).lower()
# Build query with cast to avoid strict enum type mismatch
query = select(SystemParameter).where(
and_(
SystemParameter.key == key,
cast(SystemParameter.scope_level, String) == scope_str,
SystemParameter.is_active == True
)
)
if scope_id is None:
query = query.where(SystemParameter.scope_id.is_(None))
else:
query = query.where(SystemParameter.scope_id == scope_id)
result = await db.execute(query)
param = result.scalar_one_or_none()
if param is None:
# Opcionálisan beilleszthetjük a default értéket a táblába
# await ConfigService._insert_default(db, key, default, scope_level, scope_id)
return default
# A value oszlop JSONB, lehet dict, list, string, number, bool
db_value = param.value
# Típuskonverzió a default típusa alapján
if default is None:
return db_value
if isinstance(default, int):
if isinstance(db_value, (int, float, str)):
try:
return int(db_value)
except (ValueError, TypeError):
return default
return default
elif isinstance(default, float):
if isinstance(db_value, (int, float, str)):
try:
return float(db_value)
except (ValueError, TypeError):
return default
return default
elif isinstance(default, bool):
if isinstance(db_value, bool):
return db_value
elif isinstance(db_value, str):
return db_value.lower() in ('true', '1', 'yes', 'on')
elif isinstance(db_value, int):
return db_value != 0
return default
elif isinstance(default, str):
if isinstance(db_value, str):
return db_value
elif isinstance(db_value, (dict, list)):
return json.dumps(db_value)
else:
return str(db_value)
elif isinstance(default, dict) and isinstance(db_value, dict):
return db_value
elif isinstance(default, list) and isinstance(db_value, list):
return db_value
else:
# Egyébként visszaadjuk a db_value-t
return db_value
except Exception as e:
logger.warning(f"ConfigService.get error for key '{key}': {e}")
return default
async def get_setting(self, db: AsyncSession, key: str, default: Any = None, region_code: Optional[str] = None, org_id: Optional[int] = None, **kwargs) -> Any:
"""
Általános beállítás lekérése a régi kód kompatibilitásához.
Args:
db: AsyncSession
key: A konfigurációs kulcs
default: Alapértelmezett érték
region_code: Országkód (pl. "HU") - COUNTRY scope
org_id: Szervezet azonosító - ORGANIZATION scope
**kwargs: További paraméterek (pl. user_id)
Returns:
A talált érték vagy default.
"""
from app.models.system.system import ParameterScope
# Scope meghatározása
if org_id is not None:
scope_level = ParameterScope.ORGANIZATION
scope_id = str(org_id)
elif region_code is not None:
scope_level = ParameterScope.COUNTRY
scope_id = region_code
else:
scope_level = ParameterScope.GLOBAL
scope_id = None
# További scope-ok (pl. user) a kwargs-ból
if 'user_id' in kwargs:
scope_level = ParameterScope.USER
scope_id = str(kwargs['user_id'])
return await ConfigService.get(db, key, default, scope_level, scope_id)
@staticmethod
async def get_int(db: AsyncSession, key: str, default: int, scope_level: ParameterScope = ParameterScope.GLOBAL, scope_id: Optional[str] = None) -> int:
"""Egész szám lekérése."""
value = await ConfigService.get(db, key, default, scope_level, scope_id)
if isinstance(value, int):
return value
try:
return int(value)
except (ValueError, TypeError):
return default
@staticmethod
async def get_str(db: AsyncSession, key: str, default: str, scope_level: ParameterScope = ParameterScope.GLOBAL, scope_id: Optional[str] = None) -> str:
"""Szöveg lekérése."""
value = await ConfigService.get(db, key, default, scope_level, scope_id)
if isinstance(value, str):
return value
return str(value)
@staticmethod
async def get_bool(db: AsyncSession, key: str, default: bool, scope_level: ParameterScope = ParameterScope.GLOBAL, scope_id: Optional[str] = None) -> bool:
"""Logikai érték lekérése."""
value = await ConfigService.get(db, key, default, scope_level, scope_id)
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ('true', '1', 'yes', 'on')
if isinstance(value, int):
return value != 0
return default
@staticmethod
async def get_float(db: AsyncSession, key: str, default: float, scope_level: ParameterScope = ParameterScope.GLOBAL, scope_id: Optional[str] = None) -> float:
"""Lebegőpontos szám lekérése."""
value = await ConfigService.get(db, key, default, scope_level, scope_id)
if isinstance(value, float):
return value
try:
return float(value)
except (ValueError, TypeError):
return default
@staticmethod
async def get_json(db: AsyncSession, key: str, default: dict, scope_level: ParameterScope = ParameterScope.GLOBAL, scope_id: Optional[str] = None) -> dict:
"""JSON objektum lekérése."""
value = await ConfigService.get(db, key, default, scope_level, scope_id)
if isinstance(value, dict):
return value
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return default
return default
@staticmethod
async def _insert_default(db: AsyncSession, key: str, default: Any, scope_level: ParameterScope, scope_id: Optional[str] = None) -> None:
"""Opcionális: beszúrja a default értéket a táblába, hogy látható legyen az Admin UI-ban."""
try:
from app.models.system.system import SystemParameter
param = SystemParameter(
key=key,
category="auto_inserted",
value=default if isinstance(default, (dict, list)) else {"value": default},
scope_level=scope_level,
scope_id=scope_id,
is_active=True,
description=f"Auto-inserted default value for {key}"
)
db.add(param)
await db.commit()
except Exception as e:
logger.debug(f"Could not insert default for {key}: {e}")
await db.rollback()
# A példány, amit a többi modul (pl. az auth_service, ai_service) importálni próbál
config = ConfigService()