import os import logging import uuid import json from datetime import datetime, timedelta, timezone from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, and_ from sqlalchemy.orm import joinedload from fastapi.encoders import jsonable_encoder from fastapi import HTTPException, status from app.models.identity import User, Person, UserRole, VerificationToken, Wallet from app.models.gamification import UserStats from app.models.organization import Organization, OrganizationMember, OrgType from app.schemas.auth import UserLiteRegister, UserKYCComplete from app.core.security import get_password_hash, verify_password, generate_secure_slug from app.services.email_manager import email_manager from app.core.config import settings from app.services.config_service import config from app.services.geo_service import GeoService from app.services.security_service import security_service # Sentinel integráció logger = logging.getLogger(__name__) class AuthService: @staticmethod async def register_lite(db: AsyncSession, user_in: UserLiteRegister): """ Step 1: Lite Regisztráció (Manuális). Létrehozza a Person és User rekordokat, de a fiók inaktív marad. A folder_slug itt még NEM generálódik le! """ try: # --- Dinamikus jelszóhossz ellenőrzés --- # Lekérjük az admin beállítást, minimum 8 karakter a hard limit. min_pass = await config.get_setting(db, "auth_min_password_length", default=8) min_len = max(int(min_pass), 8) if len(user_in.password) < min_len: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"A jelszónak legalább {min_len} karakter hosszúnak kell lennie." ) new_person = Person( first_name=user_in.first_name, last_name=user_in.last_name, is_active=False ) db.add(new_person) await db.flush() new_user = User( email=user_in.email, hashed_password=get_password_hash(user_in.password), person_id=new_person.id, role=UserRole.user, is_active=False, is_deleted=False, region_code=user_in.region_code, preferred_language=user_in.lang, timezone=user_in.timezone # folder_slug marad NULL a Step 2-ig ) db.add(new_user) await db.flush() # Verifikációs token generálása reg_hours = await config.get_setting(db, "auth_registration_hours", region_code=user_in.region_code, default=48) token_val = uuid.uuid4() db.add(VerificationToken( token=token_val, user_id=new_user.id, token_type="registration", expires_at=datetime.now(timezone.utc) + timedelta(hours=int(reg_hours)) )) # Email kiküldése verification_link = f"{settings.FRONTEND_BASE_URL}/verify?token={token_val}" await email_manager.send_email( recipient=user_in.email, template_key="reg", variables={"first_name": user_in.first_name, "link": verification_link}, lang=user_in.lang ) # Audit log a regisztrációról await security_service.log_event( db, user_id=new_user.id, action="USER_REGISTER_LITE", severity="info", target_type="User", target_id=str(new_user.id), new_data={"email": user_in.email, "method": "manual"} ) await db.commit() await db.refresh(new_user) return new_user except HTTPException: await db.rollback() raise except Exception as e: await db.rollback() logger.error(f"Registration Error: {str(e)}") raise e @staticmethod async def complete_kyc(db: AsyncSession, user_id: int, kyc_in: UserKYCComplete): """ Step 2: Atomi Tranzakció. Itt dől el minden: Adatok rögzítése, Shadow Identity ellenőrzés, Flotta és Wallet létrehozás, majd a fiók aktiválása. """ try: stmt = select(User).options(joinedload(User.person)).where(User.id == user_id) res = await db.execute(stmt) user = res.scalar_one_or_none() if not user: return None # --- 1. BIZTONSÁG: User folder_slug generálása --- # Ha Google-lel jött vagy még nincs slugja, most kap egyet. if not user.folder_slug: user.folder_slug = generate_secure_slug(length=12) # Pénznem beállítása if hasattr(kyc_in, 'preferred_currency') and kyc_in.preferred_currency: user.preferred_currency = kyc_in.preferred_currency # --- 2. Shadow Identity keresése (Már létezik-e ez a fizikai személy?) --- identity_stmt = select(Person).where(and_( Person.mothers_last_name == kyc_in.mothers_last_name, Person.mothers_first_name == kyc_in.mothers_first_name, Person.birth_place == kyc_in.birth_place, Person.birth_date == kyc_in.birth_date )) existing_person = (await db.execute(identity_stmt)).scalar_one_or_none() if existing_person: # Ha találtunk egyezést, összekötjük a User-t a meglévő Person-nel user.person_id = existing_person.id active_person = existing_person logger.info(f"Shadow Identity linked: User {user_id} -> Person {existing_person.id}") else: # Ha nem, a saját (regisztrációkor létrehozott) Person-t töltjük fel active_person = user.person # --- 3. Cím rögzítése GeoService segítségével --- addr_id = await GeoService.get_or_create_full_address( db, zip_code=kyc_in.address_zip, city=kyc_in.address_city, street_name=kyc_in.address_street_name, street_type=kyc_in.address_street_type, house_number=kyc_in.address_house_number, parcel_id=kyc_in.address_hrsz ) # --- 4. Személyes adatok frissítése --- active_person.mothers_last_name = kyc_in.mothers_last_name active_person.mothers_first_name = kyc_in.mothers_first_name active_person.birth_place = kyc_in.birth_place active_person.birth_date = kyc_in.birth_date active_person.phone = kyc_in.phone_number active_person.address_id = addr_id # Dokumentumok és ICE kontakt mentése JSON-ként active_person.identity_docs = jsonable_encoder(kyc_in.identity_docs) active_person.ice_contact = jsonable_encoder(kyc_in.ice_contact) # A Person most válik aktívvá active_person.is_active = True # --- 5. EGYÉNI FLOTTA LÉTREHOZÁSA (A KYC szerves része) --- # Itt generáljuk a flotta mappáját is (folder_slug) new_org = Organization( full_name=f"{active_person.last_name} {active_person.first_name} Egyéni Flotta", name=f"{active_person.last_name} Flotta", folder_slug=generate_secure_slug(length=12), # FLOTTA SLUG org_type=OrgType.individual, owner_id=user.id, is_transferable=False, is_active=True, status="verified", language=user.preferred_language, default_currency=user.preferred_currency or "HUF", country_code=user.region_code ) db.add(new_org) await db.flush() # Flotta tagság (Owner) db.add(OrganizationMember( organization_id=new_org.id, user_id=user.id, role="owner", permissions={"can_add_asset": True, "can_view_costs": True, "is_admin": True} )) # --- 6. PÉNZTÁRCA ÉS GAMIFICATION LÉTREHOZÁSA --- db.add(Wallet( user_id=user.id, coin_balance=0, credit_balance=0, currency=user.preferred_currency or "HUF" )) db.add(UserStats(user_id=user.id, total_xp=0, current_level=1)) # --- 7. AKTIVÁLÁS ÉS AUDIT --- user.is_active = True await security_service.log_event( db, user_id=user.id, action="USER_KYC_COMPLETED", severity="info", target_type="User", target_id=str(user.id), new_data={ "status": "active", "user_folder": user.folder_slug, "organization_id": new_org.id, "organization_folder": new_org.folder_slug, "wallet_created": True } ) await db.commit() await db.refresh(user) return user except Exception as e: await db.rollback() logger.error(f"KYC Atomi Tranzakció Hiba: {str(e)}") raise e @staticmethod async def soft_delete_user(db: AsyncSession, user_id: int, reason: str, actor_id: int): """ Soft-Delete: Email felszabadítás és izoláció. """ stmt = select(User).where(User.id == user_id) user = (await db.execute(stmt)).scalar_one_or_none() if not user or user.is_deleted: return False old_email = user.email # Email átnevezése az egyediség megőrzése érdekében (újraregisztrációhoz) user.email = f"deleted_{user.id}_{datetime.now().strftime('%Y%m%d')}_{old_email}" user.is_deleted = True user.is_active = False await security_service.log_event( db, user_id=actor_id, action="USER_SOFT_DELETE", severity="warning", target_type="User", target_id=str(user_id), old_data={"email": old_email}, new_data={"is_deleted": True, "reason": reason} ) await db.commit() return True @staticmethod async def verify_email(db: AsyncSession, token_str: str): try: token_uuid = uuid.UUID(token_str) stmt = select(VerificationToken).where( and_( VerificationToken.token == token_uuid, VerificationToken.is_used == False, VerificationToken.expires_at > datetime.now(timezone.utc) ) ) res = await db.execute(stmt) token = res.scalar_one_or_none() if not token: return False token.is_used = True await db.commit() return True except: return False @staticmethod async def authenticate(db: AsyncSession, email: str, password: str): stmt = select(User).where(and_(User.email == email, User.is_deleted == False)) res = await db.execute(stmt) user = res.scalar_one_or_none() if user and verify_password(password, user.hashed_password): return user return None @staticmethod async def initiate_password_reset(db: AsyncSession, email: str): stmt = select(User).where(and_(User.email == email, User.is_deleted == False)) user = (await db.execute(stmt)).scalar_one_or_none() if user: reset_hours = await config.get_setting(db, "auth_password_reset_hours", region_code=user.region_code, default=2) token_val = uuid.uuid4() db.add(VerificationToken( token=token_val, user_id=user.id, token_type="password_reset", expires_at=datetime.now(timezone.utc) + timedelta(hours=int(reset_hours)) )) reset_link = f"{settings.FRONTEND_BASE_URL}/reset-password?token={token_val}" await email_manager.send_email( recipient=email, template_key="pwd_reset", variables={"link": reset_link}, lang=user.preferred_language ) await db.commit() return "success" return "not_found" @staticmethod async def reset_password(db: AsyncSession, email: str, token_str: str, new_password: str): try: token_uuid = uuid.UUID(token_str) stmt = select(VerificationToken).join(User).where( and_( User.email == email, VerificationToken.token == token_uuid, VerificationToken.token_type == "password_reset", VerificationToken.is_used == False, VerificationToken.expires_at > datetime.now(timezone.utc) ) ) token_rec = (await db.execute(stmt)).scalar_one_or_none() if not token_rec: return False user_stmt = select(User).where(User.id == token_rec.user_id) user = (await db.execute(user_stmt)).scalar_one() user.hashed_password = get_password_hash(new_password) token_rec.is_used = True await db.commit() return True except: return False