Files
service-finder/backend/app/api/deps.py

139 lines
4.8 KiB
Python
Executable File

# /opt/docker/dev/service_finder/backend/app/api/deps.py
from typing import Optional, Dict, Any, Union
import logging
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db.session import get_db
from app.core.security import decode_token, DEFAULT_RANK_MAP
from app.models.identity import User, UserRole # JAVÍTVA: Új Identity modell használata
from app.core.config import settings
logger = logging.getLogger(__name__)
# --- GONDOLATMENET / THOUGHT PROCESS ---
# 1. Az OAuth2 folyamat a központosított bejelentkezési végponton keresztül fut.
# 2. A token visszafejtésekor ellenőrizni kell a 'type' mezőt, hogy ne lehessen refresh tokennel belépni.
# 3. A felhasználó lekérésekor a SQLAlchemy 2.0 aszinkron 'execute' és 'scalar_one_or_none' metódusait használjuk.
# 4. A Scoped RBAC (Role-Based Access Control) biztosítja, hogy a felhasználók ne férjenek hozzá egymás flottáihoz.
# ---------------------------------------
# Az OAuth2 folyamat a bejelentkezési végponton keresztül
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login"
)
async def get_current_token_payload(
token: str = Depends(reusable_oauth2)
) -> Dict[str, Any]:
"""
JWT token visszafejtése és a típus (access) ellenőrzése.
"""
# Fejlesztői bypass (opcionális, csak DEBUG módban)
if settings.DEBUG and token == "dev_bypass_active":
return {
"sub": "1",
"role": "superadmin",
"rank": 100,
"scope_level": "global",
"scope_id": "all",
"type": "access"
}
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Érvénytelen vagy lejárt munkamenet."
)
return payload
async def get_current_user(
db: AsyncSession = Depends(get_db),
payload: Dict = Depends(get_current_token_payload)
) -> User:
"""
Lekéri a felhasználót a token 'sub' mezője alapján (SQLAlchemy 2.0 aszinkron módon).
"""
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token azonosítási hiba."
)
# JAVÍTVA: Modern SQLAlchemy 2.0 aszinkron lekérdezés
result = await db.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
if not user or user.is_deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="A felhasználó nem található."
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Ellenőrzi, hogy a felhasználó aktív-e (KYC Step 2 kész).
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="A művelethez aktív profil és KYC azonosítás szükséges."
)
return current_user
async def check_resource_access(
resource_scope_id: Union[str, int],
current_user: User = Depends(get_current_user)
):
"""
Scoped RBAC: Megakadályozza a jogosulatlan hozzáférést mások adataihoz.
"""
if current_user.role == UserRole.superadmin:
return True
user_scope = str(current_user.scope_id) if current_user.scope_id else None
requested_scope = str(resource_scope_id)
# 1. Saját ID ellenőrzése
if str(current_user.id) == requested_scope:
return True
# 2. Szervezeti/Flotta scope ellenőrzése
if user_scope and user_scope == requested_scope:
return True
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Nincs jogosultsága ehhez az erőforráshoz."
)
def check_min_rank(role_key: str):
"""
Dinamikus Rank ellenőrzés a system_parameters tábla alapján.
"""
async def rank_checker(
db: AsyncSession = Depends(get_db),
payload: Dict = Depends(get_current_token_payload)
):
# A settings.get_db_setting-et használjuk a dinamikus lekéréshez
ranks = await settings.get_db_setting(
db, "rbac_rank_matrix", default=DEFAULT_RANK_MAP
)
required_rank = ranks.get(role_key, 0)
user_rank = payload.get("rank", 0)
if user_rank < required_rank:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Alacsony jogosultsági szint. (Elvárt: {required_rank})"
)
return True
return rank_checker