Files
service-finder/backend/app/services/analytics_service.py
2026-03-22 11:02:05 +00:00

441 lines
17 KiB
Python

# /opt/docker/dev/service_finder/backend/app/services/analytics_service.py
"""
TCO (Total Cost of Ownership) Analytics Service.
Számítások a vehicle.costs tábla alapján, árfolyam-átváltással a system_service segítségével.
"""
import logging
from typing import Optional, Dict, Any, List
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.vehicle import VehicleCost, CostCategory
from app.models import VehicleModelDefinition
from app.models.marketplace.organization import Organization
from app.services.system_service import SystemService
logger = logging.getLogger(__name__)
class TCOAnalytics:
"""
TCO Analytics osztály 3 fő metódussal:
1. get_user_tco: Egy adott organization_id költségeinek összesítése
2. get_vehicle_lifetime_tco: Egy jármű összes tulajdonos költségének összesítése (anonimizálva)
3. get_global_benchmark: Egy modell (vehicle_model_id) átlagos költségeinek számítása
"""
def __init__(self):
self.system_service = SystemService()
async def get_user_tco(
self,
db: AsyncSession,
organization_id: int,
currency_target: str = "HUF",
include_categories: Optional[List[str]] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> Dict[str, Any]:
"""
Egy adott szervezet (organization_id) összes költségének összesítése.
Átváltja a különböző valutákban lévő költségeket a célvalutára (currency_target).
:param db: Adatbázis munkamenet
:param organization_id: A szervezet azonosítója
:param currency_target: Célvaluta (pl. "HUF", "EUR")
:param include_categories: Szűrés költségkategóriákra (opcionális)
:param start_date: Kezdő dátum (ISO formátum, opcionális)
:param end_date: Végdátum (ISO formátum, opcionális)
:return: Szótár a következőkkel:
- total_amount: Összesített összeg a célvalutában
- total_transactions: Tranzakciók száma
- by_category: Kategóriánkénti bontás
- currency: A célvaluta
"""
# Alap lekérdezés: organization_id szűrés
stmt = select(
VehicleCost.amount,
VehicleCost.currency,
VehicleCost.category_id,
CostCategory.code,
CostCategory.name
).join(
CostCategory, VehicleCost.category_id == CostCategory.id
).where(
VehicleCost.organization_id == organization_id
)
# Dátum szűrés
if start_date:
stmt = stmt.where(VehicleCost.date >= start_date)
if end_date:
stmt = stmt.where(VehicleCost.date <= end_date)
# Kategória szűrés
if include_categories:
stmt = stmt.where(CostCategory.code.in_(include_categories))
result = await db.execute(stmt)
rows = result.all()
# Árfolyamok lekérése a system_service-ből
exchange_rates = await self._get_exchange_rates(db, currency_target)
total_amount = 0.0
category_totals = {}
for row in rows:
amount = float(row.amount)
source_currency = row.currency
# Átváltás célvalutára
converted_amount = await self._convert_currency(
db, amount, source_currency, currency_target, exchange_rates
)
total_amount += converted_amount
# Kategória összesítés
category_code = row.code
if category_code not in category_totals:
category_totals[category_code] = {
"name": row.name,
"total": 0.0,
"count": 0
}
category_totals[category_code]["total"] += converted_amount
category_totals[category_code]["count"] += 1
return {
"organization_id": organization_id,
"total_amount": round(total_amount, 2),
"total_transactions": len(rows),
"currency": currency_target,
"by_category": category_totals,
"date_range": {
"start": start_date,
"end": end_date
}
}
async def get_vehicle_lifetime_tco(
self,
db: AsyncSession,
vehicle_model_id: int,
currency_target: str = "HUF",
anonymize: bool = True,
) -> Dict[str, Any]:
"""
Egy jármű (vehicle_model_id) összes tulajdonos általi költségének összesítése.
Alapértelmezetten anonimizálva (organization_id-k elrejtve).
:param db: Adatbázis munkamenet
:param vehicle_model_id: A járműmodell azonosítója
:param currency_target: Célvaluta (pl. "HUF", "EUR")
:param anonymize: Ha True, nem tartalmazza az organization_id-kat
:return: Szótár a következőkkel:
- vehicle_model_id: A járműmodell azonosítója
- total_lifetime_cost: Teljes élettartam költség a célvalutában
- total_owners: Különböző tulajdonosok száma
- average_cost_per_owner: Tulajdonosonkénti átlag
- by_owner: Tulajdonosonkénti bontás (ha anonymize=False)
- currency: A célvaluta
"""
# Összes költség lekérdezése a járműhöz
stmt = select(
VehicleCost.amount,
VehicleCost.currency,
VehicleCost.organization_id,
Organization.name.label("org_name")
).outerjoin(
Organization, VehicleCost.organization_id == Organization.id
).where(
VehicleCost.vehicle_id == vehicle_model_id
)
result = await db.execute(stmt)
rows = result.all()
# Árfolyamok lekérése
exchange_rates = await self._get_exchange_rates(db, currency_target)
total_lifetime_cost = 0.0
owners = set()
owner_totals = {}
for row in rows:
amount = float(row.amount)
source_currency = row.currency
# Átváltás célvalutára
converted_amount = await self._convert_currency(
db, amount, source_currency, currency_target, exchange_rates
)
total_lifetime_cost += converted_amount
# Tulajdonos adatok
org_id = row.organization_id
if org_id:
owners.add(org_id)
if not anonymize:
if org_id not in owner_totals:
owner_totals[org_id] = {
"name": row.org_name,
"total": 0.0,
"count": 0
}
owner_totals[org_id]["total"] += converted_amount
owner_totals[org_id]["count"] += 1
total_owners = len(owners)
average_cost_per_owner = round(total_lifetime_cost / max(total_owners, 1), 2)
result_data = {
"vehicle_model_id": vehicle_model_id,
"total_lifetime_cost": round(total_lifetime_cost, 2),
"total_owners": total_owners,
"average_cost_per_owner": average_cost_per_owner,
"currency": currency_target,
"anonymized": anonymize,
}
if not anonymize:
result_data["by_owner"] = owner_totals
return result_data
async def get_global_benchmark(
self,
db: AsyncSession,
vehicle_model_id: Optional[int] = None,
make: Optional[str] = None,
model: Optional[str] = None,
fuel_type: Optional[str] = None,
currency_target: str = "HUF",
) -> Dict[str, Any]:
"""
Egy modell (vehicle_model_id) vagy modellcsoport átlagos költségeinek számítása.
Ha vehicle_model_id nincs megadva, akkor make/model/fuel_type alapján csoportosít.
:param db: Adatbázis munkamenet
:param vehicle_model_id: Konkrét járműmodell azonosítója (opcionális)
:param make: Gyártó (opcionális)
:param model: Modell (opcionális)
:param fuel_type: Üzemanyag típus (opcionális)
:param currency_target: Célvaluta (pl. "HUF", "EUR")
:return: Szótár a következőkkel:
- benchmark_type: "specific_model" vagy "grouped"
- vehicle_count: Járművek száma a mintában
- total_cost_sum: Összes költség a célvalutában
- average_cost_per_vehicle: Járművenkénti átlag
- average_cost_per_km: Kilométerenkénti átlag (ha elérhető odometer adat)
- by_category: Kategóriánkénti átlagok
- currency: A célvaluta
"""
# Alap lekérdezés: vehicle és cost összekapcsolása
stmt = select(
VehicleCost.amount,
VehicleCost.currency,
VehicleCost.vehicle_id,
VehicleCost.odometer,
CostCategory.code,
VehicleModelDefinition.make,
VehicleModelDefinition.model,
VehicleModelDefinition.fuel_type
).join(
VehicleModelDefinition, VehicleCost.vehicle_id == VehicleModelDefinition.id
).join(
CostCategory, VehicleCost.category_id == CostCategory.id
)
# Szűrés
if vehicle_model_id:
stmt = stmt.where(VehicleCost.vehicle_id == vehicle_model_id)
benchmark_type = "specific_model"
else:
conditions = []
if make:
conditions.append(VehicleModelDefinition.make == make)
if model:
conditions.append(VehicleModelDefinition.model == model)
if fuel_type:
conditions.append(VehicleModelDefinition.fuel_type == fuel_type)
if conditions:
stmt = stmt.where(and_(*conditions))
benchmark_type = "grouped"
result = await db.execute(stmt)
rows = result.all()
if not rows:
return {
"benchmark_type": benchmark_type,
"vehicle_count": 0,
"total_cost_sum": 0.0,
"average_cost_per_vehicle": 0.0,
"average_cost_per_km": None,
"by_category": {},
"currency": currency_target,
"message": "No data found for the specified criteria"
}
# Árfolyamok
exchange_rates = await self._get_exchange_rates(db, currency_target)
total_cost_sum = 0.0
total_odometer_sum = 0
vehicle_ids = set()
category_totals = {}
category_counts = {}
for row in rows:
amount = float(row.amount)
source_currency = row.currency
# Átváltás
converted_amount = await self._convert_currency(
db, amount, source_currency, currency_target, exchange_rates
)
total_cost_sum += converted_amount
vehicle_ids.add(row.vehicle_id)
# Odometer összegzés (ha van)
if row.odometer:
total_odometer_sum += row.odometer
# Kategória összesítés
category_code = row.code
if category_code not in category_totals:
category_totals[category_code] = 0.0
category_counts[category_code] = 0
category_totals[category_code] += converted_amount
category_counts[category_code] += 1
vehicle_count = len(vehicle_ids)
average_cost_per_vehicle = round(total_cost_sum / vehicle_count, 2)
# Kilométerenkénti átlag számítása
average_cost_per_km = None
if total_odometer_sum > 0:
average_cost_per_km = round(total_cost_sum / total_odometer_sum, 4)
# Kategóriánkénti átlagok
category_averages = {}
for code, total in category_totals.items():
count = category_counts[code]
category_averages[code] = {
"total": round(total, 2),
"count": count,
"average": round(total / count, 2)
}
return {
"benchmark_type": benchmark_type,
"vehicle_count": vehicle_count,
"total_cost_sum": round(total_cost_sum, 2),
"average_cost_per_vehicle": average_cost_per_vehicle,
"average_cost_per_km": average_cost_per_km,
"by_category": category_averages,
"currency": currency_target,
"criteria": {
"vehicle_model_id": vehicle_model_id,
"make": make,
"model": model,
"fuel_type": fuel_type
}
}
async def _get_exchange_rates(
self,
db: AsyncSession,
target_currency: str
) -> Dict[str, float]:
"""
Árfolyamok lekérése a system_service-ből.
A rendszerparaméterekben az "exchange_rates" kulcs alatt tároljuk.
:param db: Adatbázis munkamenet
:param target_currency: Célvaluta
:return: Szótár forrásvaluta -> célvaluta árfolyammal
"""
exchange_rates = await self.system_service.get_scoped_parameter(
db,
key="exchange_rates",
default={}
)
# Ha nincs adat, alapértelmezett árfolyamok
if not exchange_rates:
logger.warning("No exchange rates found in system parameters, using defaults")
# Alapértelmezett árfolyamok (1 EUR = 400 HUF, 1 USD = 350 HUF stb.)
exchange_rates = {
"EUR": {"HUF": 400.0, "EUR": 1.0, "USD": 1.1},
"USD": {"HUF": 350.0, "EUR": 0.9, "USD": 1.0},
"HUF": {"HUF": 1.0, "EUR": 0.0025, "USD": 0.0029},
"GBP": {"HUF": 460.0, "EUR": 1.15, "USD": 1.26},
}
# Ellenőrizzük, hogy a célvaluta szerepel-e az árfolyamokban
if target_currency not in exchange_rates.get("EUR", {}):
logger.warning(f"Target currency {target_currency} not found in exchange rates, using 1:1 conversion")
return exchange_rates
async def _convert_currency(
self,
db: AsyncSession,
amount: float,
source_currency: str,
target_currency: str,
exchange_rates: Dict[str, Any]
) -> float:
"""
Pénznem átváltása a megadott árfolyamok alapján.
:param amount: Összeg a forrásvalutában
:param source_currency: Forrásvaluta (pl. "EUR")
:param target_currency: Célvaluta (pl. "HUF")
:param exchange_rates: Árfolyam szótár
:return: Átváltott összeg a célvalutában
"""
if source_currency == target_currency:
return amount
# Keresés az árfolyamokban
try:
# Próbáljuk meg a forrásvaluta -> célvaluta árfolyamot
if source_currency in exchange_rates:
rates = exchange_rates[source_currency]
if target_currency in rates:
rate = rates[target_currency]
return amount * rate
# Ha nem találjuk, próbáljuk meg fordítva (inverz)
if target_currency in exchange_rates:
rates = exchange_rates[target_currency]
if source_currency in rates:
rate = 1.0 / rates[source_currency]
return amount * rate
# Ha még mindig nem találjuk, használjunk EUR-t közvetítőként
if "EUR" in exchange_rates:
eur_rates = exchange_rates["EUR"]
if source_currency in eur_rates and target_currency in eur_rates:
# Forrás -> EUR -> Cél
to_eur = amount / eur_rates[source_currency]
return to_eur * eur_rates[target_currency]
except (KeyError, ZeroDivisionError, TypeError) as e:
logger.error(f"Currency conversion error: {e}, using 1:1 conversion")
# Visszaesés: 1:1 árfolyam
logger.warning(f"Could not convert {source_currency} to {target_currency}, using 1:1 conversion")
return amount