import os from enum import Enum from typing import Optional from datetime import datetime, timedelta from fastapi import FastAPI, Depends, HTTPException, status, APIRouter, Header from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel, EmailStr from sqlalchemy import Column, Integer, String, Boolean, DateTime, select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from sqlalchemy.orm import DeclarativeBase from passlib.context import CryptContext from jose import JWTError, jwt import redis.asyncio as redis # --- KONFIGURÁCIÓ --- DATABASE_URL = "postgresql+asyncpg://user:password@localhost/service_finder_db" REDIS_URL = "redis://localhost:6379" SECRET_KEY = "szuper_titkos_jwt_kulcs_amit_env_bol_kellene_olvasni" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7 # --- ADATBÁZIS SETUP (SQLAlchemy 2.0) --- engine = create_async_engine(DATABASE_URL, echo=True) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False) class Base(DeclarativeBase): pass class User(Base): __tablename__ = "users" __table_args__ = {"schema": "public"} id = Column(Integer, primary_key=True, index=True) email = Column(String, unique=True, index=True, nullable=False) password_hash = Column(String, nullable=False) is_active = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow) async def get_db(): async with AsyncSessionLocal() as session: yield session # --- REDIS SETUP --- redis_client = redis.from_url(REDIS_URL, decode_responses=True) # --- SECURITY UTILS --- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v2/auth/login") class ClientType(str, Enum): WEB = "web" MOBILE = "mobile" def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_token(data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.utcnow() + expires_delta to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # --- PYDANTIC SCHEMAS --- class UserCreate(BaseModel): email: EmailStr password: str class UserResponse(BaseModel): id: int email: EmailStr is_active: bool class Config: from_attributes = True class Token(BaseModel): access_token: str refresh_token: str token_type: str class LoginRequest(BaseModel): username: str # OAuth2 form compatibility miatt username, de emailt várunk password: str client_type: ClientType # 'web' vagy 'mobile' # --- ÜZLETI LOGIKA & ROUTER --- router = APIRouter(prefix="/auth", tags=["Authentication"]) @router.post("/register", response_model=UserResponse) async def register(user: UserCreate, db: AsyncSession = Depends(get_db)): # 1. Email ellenőrzése stmt = select(User).where(User.email == user.email) result = await db.execute(stmt) if result.scalars().first(): raise HTTPException(status_code=400, detail="Ez az email cím már regisztrálva van.") # 2. User létrehozása (inaktív) hashed_pwd = get_password_hash(user.password) new_user = User(email=user.email, password_hash=hashed_pwd, is_active=False) db.add(new_user) await db.commit() await db.refresh(new_user) # Itt kellene elküldeni az emailt a verify linkkel (most szimuláljuk) return new_user @router.get("/verify/{token}") async def verify_email(token: str, db: AsyncSession = Depends(get_db)): # Megjegyzés: A valóságban a token-t dekódolni kellene, hogy kinyerjük a user ID-t. # Most szimuláljuk, hogy a token valójában a user email-címe base64-ben vagy hasonló. # Egyszerűsítés a példa kedvéért: feltételezzük, hogy a token = user_id try: user_id = int(token) # DEMO ONLY stmt = select(User).where(User.id == user_id) result = await db.execute(stmt) user = result.scalars().first() if not user: raise HTTPException(status_code=404, detail="Felhasználó nem található") user.is_active = True await db.commit() return {"message": "Fiók sikeresen aktiválva!"} except ValueError: raise HTTPException(status_code=400, detail="Érvénytelen token") @router.post("/login", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends(), client_type: ClientType = ClientType.WEB, # Query param vagy form field db: AsyncSession = Depends(get_db) ): """ Kritikus Redis Session Limitáció implementációja. """ # 1. User keresése stmt = select(User).where(User.email == form_data.username) result = await db.execute(stmt) user = result.scalars().first() if not user or not verify_password(form_data.password, user.password_hash): raise HTTPException(status_code=401, detail="Hibás email vagy jelszó") if not user.is_active: raise HTTPException(status_code=403, detail="A fiók még nincs aktiválva.") # 2. Token generálás access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) refresh_token_expires = timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) # A tokenbe beleégetjük a client_type-ot is, hogy validálásnál ellenőrizhessük token_data = {"sub": str(user.id), "client_type": client_type.value} access_token = create_token(token_data, access_token_expires) refresh_token = create_token({"sub": str(user.id), "type": "refresh"}, refresh_token_expires) # 3. REDIS SESSION KEZELÉS (A feladat kritikus része) # Kulcs formátum: session:{user_id}:{client_type} -> access_token session_key = f"session:{user.id}:{client_type.value}" # A Redis 'SET' parancsa felülírja a kulcsot, ha az már létezik. # Ez megvalósítja a "Logout other devices" logikát az AZONOS típusú eszközökre. # Ezzel egy időben, mivel a kulcs tartalmazza a típust (web/mobile), # garantáljuk, hogy max 1 web és 1 mobile lehet (külön kulcsok). await redis_client.set( name=session_key, value=access_token, ex=ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer" } # --- MIDDLEWARE / DEPENDENCY TOKEN ELLENŐRZÉSHEZ --- async def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db) ): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Nem sikerült hitelesíteni a felhasználót", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") client_type: str = payload.get("client_type") if user_id is None or client_type is None: raise credentials_exception except JWTError: raise credentials_exception # KRITIKUS: Token validálása Redis ellenében (Stateful JWT) # Ha a Redisben lévő token nem egyezik a küldött tokennel, # akkor a felhasználót kijelentkeztették egy másik eszközről. session_key = f"session:{user_id}:{client_type}" stored_token = await redis_client.get(session_key) if stored_token != token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="A munkamenet lejárt vagy egy másik eszközről beléptek." ) stmt = select(User).where(User.id == int(user_id)) result = await db.execute(stmt) user = result.scalars().first() if user is None: raise credentials_exception return user # --- MAIN APP --- app = FastAPI(title="Service Finder API") app.include_router(router) @app.get("/") async def root(): return {"message": "Service Finder API fut"} @app.get("/protected-route") async def protected(user: User = Depends(get_current_user)): return {"message": f"Szia {user.email}, érvényes a munkameneted!"}