196 lines
7.6 KiB
Python
196 lines
7.6 KiB
Python
"""
|
|
Analytics API endpoints for TCO (Total Cost of Ownership) dashboard.
|
|
"""
|
|
|
|
import logging
|
|
import uuid
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.api import deps
|
|
from app.schemas.analytics import TCOSummaryResponse, TCOErrorResponse
|
|
from app.services.analytics_service import TCOAnalytics
|
|
from app.models import Vehicle
|
|
from app.models.organization import OrganizationMember
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
async def verify_vehicle_access(
|
|
vehicle_id: uuid.UUID,
|
|
db: AsyncSession,
|
|
current_user
|
|
) -> Vehicle:
|
|
"""
|
|
Verify that the current user has access to the vehicle (either as owner or via organization).
|
|
Raises HTTP 404 if vehicle not found, 403 if access denied.
|
|
"""
|
|
# 1. Check if vehicle exists
|
|
vehicle = await db.get(Vehicle, vehicle_id)
|
|
if not vehicle:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Vehicle with ID {vehicle_id} not found."
|
|
)
|
|
|
|
# 2. Check if user is superadmin (global access)
|
|
if current_user.role == "superadmin":
|
|
return vehicle
|
|
|
|
# 3. Check if user is member of the vehicle's organization
|
|
# (Vehicle.organization_id matches user's organization membership)
|
|
# First, get user's organization memberships
|
|
from sqlalchemy import select
|
|
stmt = select(OrganizationMember).where(
|
|
OrganizationMember.user_id == current_user.id,
|
|
OrganizationMember.organization_id == vehicle.organization_id
|
|
)
|
|
result = await db.execute(stmt)
|
|
membership = result.scalar_one_or_none()
|
|
|
|
if membership:
|
|
return vehicle
|
|
|
|
# 4. If user is not a member, check if they have fleet manager role with cross-org access
|
|
# (This could be extended based on RBAC)
|
|
# For now, deny access
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="You do not have permission to access this vehicle's analytics."
|
|
)
|
|
|
|
|
|
@router.get(
|
|
"/{vehicle_id}/summary",
|
|
response_model=TCOSummaryResponse,
|
|
responses={
|
|
404: {"model": TCOErrorResponse, "description": "Vehicle not found"},
|
|
403: {"model": TCOErrorResponse, "description": "Access denied"},
|
|
500: {"model": TCOErrorResponse, "description": "Internal server error"},
|
|
},
|
|
summary="Get TCO summary for a vehicle",
|
|
description="Returns Total Cost of Ownership analytics for a specific vehicle, "
|
|
"including user-specific costs, lifetime costs, and benchmark comparisons."
|
|
)
|
|
async def get_tco_summary(
|
|
vehicle_id: uuid.UUID,
|
|
db: AsyncSession = Depends(deps.get_db),
|
|
current_user = Depends(deps.get_current_active_user),
|
|
):
|
|
"""
|
|
Retrieve TCO analytics for a vehicle.
|
|
|
|
Steps:
|
|
1. Verify user has access to the vehicle.
|
|
2. Use TCOAnalytics service to compute user TCO, lifetime TCO, and benchmark.
|
|
3. Transform results into the response schema.
|
|
"""
|
|
try:
|
|
# Access verification
|
|
vehicle = await verify_vehicle_access(vehicle_id, db, current_user)
|
|
|
|
analytics = TCOAnalytics()
|
|
|
|
# 1. User TCO (current user's organization)
|
|
user_tco_result = await analytics.get_user_tco(
|
|
db=db,
|
|
organization_id=current_user.organization_id or vehicle.organization_id,
|
|
currency_target="HUF",
|
|
include_categories=None, # all categories
|
|
)
|
|
|
|
# 2. Lifetime TCO (across all owners, anonymized)
|
|
lifetime_tco_result = await analytics.get_vehicle_lifetime_tco(
|
|
db=db,
|
|
vehicle_model_id=vehicle.vehicle_model_id,
|
|
currency_target="HUF",
|
|
anonymize=True,
|
|
)
|
|
|
|
# 3. Benchmark TCO (global benchmark for similar vehicles)
|
|
benchmark_result = await analytics.get_global_benchmark(
|
|
db=db,
|
|
vehicle_model_id=vehicle.vehicle_model_id,
|
|
currency_target="HUF",
|
|
)
|
|
|
|
# Transform results into schema objects
|
|
# Note: This is a simplified transformation; you may need to adapt based on actual service output.
|
|
user_tco_list = []
|
|
if "by_category" in user_tco_result:
|
|
for cat_code, cat_data in user_tco_result["by_category"].items():
|
|
# Calculate percentage
|
|
total = user_tco_result.get("total_amount", 0)
|
|
percentage = (cat_data["total"] / total * 100) if total > 0 else 0
|
|
user_tco_list.append({
|
|
"category_id": 0, # TODO: map from category code to ID
|
|
"category_code": cat_code,
|
|
"category_name": cat_data.get("name", cat_code),
|
|
"amount": cat_data["total"],
|
|
"currency": user_tco_result.get("currency", "HUF"),
|
|
"amount_huf": cat_data["total"], # already in HUF
|
|
"percentage": round(percentage, 2),
|
|
})
|
|
|
|
lifetime_tco_list = []
|
|
if "by_category" in lifetime_tco_result:
|
|
for cat_code, cat_data in lifetime_tco_result["by_category"].items():
|
|
total = lifetime_tco_result.get("total_lifetime_cost", 0)
|
|
percentage = (cat_data["total"] / total * 100) if total > 0 else 0
|
|
lifetime_tco_list.append({
|
|
"category_id": 0,
|
|
"category_code": cat_code,
|
|
"category_name": cat_data.get("name", cat_code),
|
|
"amount": cat_data["total"],
|
|
"currency": lifetime_tco_result.get("currency", "HUF"),
|
|
"amount_huf": cat_data["total"],
|
|
"percentage": round(percentage, 2),
|
|
})
|
|
|
|
benchmark_tco_list = []
|
|
if "by_category" in benchmark_result:
|
|
for cat_code, cat_data in benchmark_result["by_category"].items():
|
|
total = benchmark_result.get("total_cost_sum", 0)
|
|
percentage = (cat_data["average"] / total * 100) if total > 0 else 0
|
|
benchmark_tco_list.append({
|
|
"category_id": 0,
|
|
"category_code": cat_code,
|
|
"category_name": cat_data.get("name", cat_code),
|
|
"amount": cat_data["average"],
|
|
"currency": benchmark_result.get("currency", "HUF"),
|
|
"amount_huf": cat_data["average"],
|
|
"percentage": round(percentage, 2),
|
|
})
|
|
|
|
# Calculate cost per km if odometer data available
|
|
cost_per_km = None
|
|
if vehicle.odometer and vehicle.odometer > 0:
|
|
total_cost = user_tco_result.get("total_amount", 0)
|
|
cost_per_km = total_cost / vehicle.odometer
|
|
|
|
stats = {
|
|
"total_cost": user_tco_result.get("total_amount", 0),
|
|
"cost_per_km": cost_per_km,
|
|
"total_transactions": user_tco_result.get("total_transactions", 0),
|
|
"date_range": user_tco_result.get("date_range"),
|
|
}
|
|
|
|
return TCOSummaryResponse(
|
|
vehicle_id=vehicle_id,
|
|
user_tco=user_tco_list,
|
|
lifetime_tco=lifetime_tco_list,
|
|
benchmark_tco=benchmark_tco_list,
|
|
stats=stats,
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception(f"Unexpected error in TCO summary for vehicle {vehicle_id}: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Internal server error: {str(e)}"
|
|
) |