Files
service-finder/backend/app/api/v1/endpoints/gamification.py
2026-03-26 07:09:44 +00:00

926 lines
31 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# /opt/docker/dev/service_finder/backend/app/api/v1/endpoints/gamification.py
from fastapi import APIRouter, Depends, HTTPException, Body, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, desc, func, and_
from typing import List, Optional
from datetime import datetime, timedelta
from app.db.session import get_db
from app.api.deps import get_current_user
from app.models.identity import User
from app.models import UserStats, PointsLedger, LevelConfig, UserContribution, Badge, UserBadge, Season
from app.models.system import SystemParameter, ParameterScope
from app.models.marketplace.service import ServiceStaging
from app.schemas.gamification import SeasonResponse, UserStatResponse, LeaderboardEntry
router = APIRouter()
# -- SEGÉDFÜGGVÉNY A RENDSZERBEÁLLÍTÁSOKHOZ --
async def get_system_param(db: AsyncSession, key: str, default_value):
stmt = select(SystemParameter).where(SystemParameter.key == key)
res = (await db.execute(stmt)).scalar_one_or_none()
return res.value if res else default_value
@router.get("/my-stats")
async def get_my_stats(db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user)):
stmt = select(UserStats).where(UserStats.user_id == current_user.id)
stats = (await db.execute(stmt)).scalar_one_or_none()
if not stats:
return {"total_xp": 0, "current_level": 1, "penalty_points": 0, "services_submitted": 0}
return stats
@router.get("/leaderboard")
async def get_leaderboard(
limit: int = 10,
season_id: Optional[int] = None,
db: AsyncSession = Depends(get_db)
):
"""Vezetőlista - globális vagy szezonális"""
if season_id:
# Szezonális vezetőlista
stmt = (
select(
User.email,
func.sum(UserContribution.points_awarded).label("total_points"),
func.sum(UserContribution.xp_awarded).label("total_xp")
)
.join(UserContribution, User.id == UserContribution.user_id)
.where(UserContribution.season_id == season_id)
.group_by(User.id)
.order_by(desc("total_points"))
.limit(limit)
)
else:
# Globális vezetőlista
stmt = (
select(User.email, UserStats.total_xp, UserStats.current_level)
.join(UserStats, User.id == UserStats.user_id)
.order_by(desc(UserStats.total_xp))
.limit(limit)
)
result = await db.execute(stmt)
if season_id:
return [
{"user": f"{r[0][:2]}***@{r[0].split('@')[1]}", "points": r[1], "xp": r[2]}
for r in result.all()
]
else:
return [
{"user": f"{r[0][:2]}***@{r[0].split('@')[1]}", "xp": r[1], "level": r[2]}
for r in result.all()
]
@router.get("/seasons")
async def get_seasons(
active_only: bool = True,
db: AsyncSession = Depends(get_db)
):
"""Szezonok listázása"""
stmt = select(Season)
if active_only:
stmt = stmt.where(Season.is_active == True)
result = await db.execute(stmt)
seasons = result.scalars().all()
return [
{
"id": s.id,
"name": s.name,
"start_date": s.start_date,
"end_date": s.end_date,
"is_active": s.is_active
}
for s in seasons
]
@router.get("/my-contributions")
async def get_my_contributions(
season_id: Optional[int] = None,
limit: int = 50,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Felhasználó hozzájárulásainak listázása"""
stmt = select(UserContribution).where(UserContribution.user_id == current_user.id)
if season_id:
stmt = stmt.where(UserContribution.season_id == season_id)
stmt = stmt.order_by(desc(UserContribution.created_at)).limit(limit)
result = await db.execute(stmt)
contributions = result.scalars().all()
return [
{
"id": c.id,
"contribution_type": c.contribution_type,
"entity_type": c.entity_type,
"entity_id": c.entity_id,
"points_awarded": c.points_awarded,
"xp_awarded": c.xp_awarded,
"status": c.status,
"created_at": c.created_at
}
for c in contributions
]
@router.get("/season-standings/{season_id}")
async def get_season_standings(
season_id: int,
limit: int = 20,
db: AsyncSession = Depends(get_db)
):
"""Szezon állása - top hozzájárulók"""
# Aktuális szezon ellenőrzése
season_stmt = select(Season).where(Season.id == season_id)
season = (await db.execute(season_stmt)).scalar_one_or_none()
if not season:
raise HTTPException(status_code=404, detail="Season not found")
# Top hozzájárulók lekérdezése
stmt = (
select(
User.email,
func.sum(UserContribution.points_awarded).label("total_points"),
func.sum(UserContribution.xp_awarded).label("total_xp"),
func.count(UserContribution.id).label("contribution_count")
)
.join(UserContribution, User.id == UserContribution.user_id)
.where(
and_(
UserContribution.season_id == season_id,
UserContribution.status == "approved"
)
)
.group_by(User.id)
.order_by(desc("total_points"))
.limit(limit)
)
result = await db.execute(stmt)
standings = result.all()
# Szezonális jutalmak konfigurációja
season_config = await get_system_param(
db, "seasonal_competition_config",
{
"top_contributors_count": 10,
"rewards": {
"first_place": {"credits": 1000, "badge": "season_champion"},
"second_place": {"credits": 500, "badge": "season_runner_up"},
"third_place": {"credits": 250, "badge": "season_bronze"},
"top_10": {"credits": 100, "badge": "season_elite"}
}
}
)
return {
"season": {
"id": season.id,
"name": season.name,
"start_date": season.start_date,
"end_date": season.end_date
},
"standings": [
{
"rank": idx + 1,
"user": f"{r[0][:2]}***@{r[0].split('@')[1]}",
"points": r[1],
"xp": r[2],
"contributions": r[3],
"reward": get_season_reward(idx + 1, season_config)
}
for idx, r in enumerate(standings)
],
"config": season_config
}
def get_season_reward(rank: int, config: dict) -> dict:
"""Szezonális jutalom meghatározása a rang alapján"""
rewards = config.get("rewards", {})
if rank == 1:
return rewards.get("first_place", {})
elif rank == 2:
return rewards.get("second_place", {})
elif rank == 3:
return rewards.get("third_place", {})
elif rank <= config.get("top_contributors_count", 10):
return rewards.get("top_10", {})
else:
return {}
@router.get("/self-defense-status")
async def get_self_defense_status(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Önvédelmi rendszer státusz lekérdezése"""
stmt = select(UserStats).where(UserStats.user_id == current_user.id)
stats = (await db.execute(stmt)).scalar_one_or_none()
if not stats:
return {
"penalty_level": 0,
"restrictions": [],
"recovery_progress": 0,
"can_submit_services": True
}
# Önvédelmi büntetések konfigurációja
penalty_config = await get_system_param(
db, "self_defense_penalties",
{
"level_minus_1": {"restrictions": ["no_service_submissions"], "duration_days": 7},
"level_minus_2": {"restrictions": ["no_service_submissions", "no_reviews"], "duration_days": 30},
"level_minus_3": {"restrictions": ["no_service_submissions", "no_reviews", "no_messaging"], "duration_days": 365}
}
)
# Büntetési szint meghatározása (egyszerűsített logika)
penalty_level = 0
if stats.penalty_points >= 1000:
penalty_level = -3
elif stats.penalty_points >= 500:
penalty_level = -2
elif stats.penalty_points >= 100:
penalty_level = -1
restrictions = []
if penalty_level < 0:
level_key = f"level_minus_{abs(penalty_level)}"
restrictions = penalty_config.get(level_key, {}).get("restrictions", [])
return {
"penalty_level": penalty_level,
"penalty_points": stats.penalty_points,
"restrictions": restrictions,
"recovery_progress": min(stats.total_xp / 10000 * 100, 100) if penalty_level < 0 else 100,
"can_submit_services": "no_service_submissions" not in restrictions
}
# --- AZ ÚJ, DINAMIKUS BEKÜLDŐ VÉGPONT (Gamification 2.0 kompatibilis) ---
@router.post("/submit-service")
async def submit_new_service(
name: str = Body(...),
city: str = Body(...),
address: str = Body(...),
contact_phone: Optional[str] = Body(None),
website: Optional[str] = Body(None),
description: Optional[str] = Body(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
# 1. Önvédelmi státusz ellenőrzése
defense_status = await get_self_defense_status(db, current_user)
if not defense_status["can_submit_services"]:
raise HTTPException(
status_code=403,
detail="Önvédelmi korlátozás miatt nem küldhetsz be új szerviz adatokat."
)
# 2. Beállítások lekérése az Admin által vezérelt táblából
submission_rewards = await get_system_param(
db, "service_submission_rewards",
{"points": 50, "xp": 100, "social_credits": 10}
)
contribution_config = await get_system_param(
db, "contribution_types_config",
{
"service_submission": {"points": 50, "xp": 100, "weight": 1.0}
}
)
# 3. Aktuális szezon lekérdezése
season_stmt = select(Season).where(
and_(
Season.is_active == True,
Season.start_date <= datetime.utcnow().date(),
Season.end_date >= datetime.utcnow().date()
)
).limit(1)
season_result = await db.execute(season_stmt)
current_season = season_result.scalar_one_or_none()
# 4. Felhasználó statisztikák
stmt = select(UserStats).where(UserStats.user_id == current_user.id)
stats = (await db.execute(stmt)).scalar_one_or_none()
user_lvl = stats.current_level if stats else 1
# 5. Trust score számítás a szint alapján
trust_weight = min(20 + (user_lvl * 6), 90)
# 6. Nyers adat beküldése a Robotoknak (Staging)
import hashlib
f_print = hashlib.md5(f"{name.lower()}{city.lower()}{address.lower()}".encode()).hexdigest()
new_staging = ServiceStaging(
name=name,
city=city,
address_line1=address,
contact_phone=contact_phone,
website=website,
description=description,
fingerprint=f_print,
status="pending",
trust_score=trust_weight,
submitted_by=current_user.id,
raw_data={
"submitted_by_user": current_user.id,
"user_level": user_lvl,
"submitted_at": datetime.utcnow().isoformat()
}
)
db.add(new_staging)
await db.flush() # Get the ID
# 7. UserContribution létrehozása
contribution = UserContribution(
user_id=current_user.id,
season_id=current_season.id if current_season else None,
contribution_type="service_submission",
entity_type="service_staging",
entity_id=new_staging.id,
points_awarded=submission_rewards.get("points", 50),
xp_awarded=submission_rewards.get("xp", 100),
status="pending", # Robot 5 jóváhagyására vár
metadata={
"service_name": name,
"city": city,
"staging_id": new_staging.id
},
created_at=datetime.utcnow()
)
db.add(contribution)
# 8. PointsLedger bejegyzés
ledger = PointsLedger(
user_id=current_user.id,
points=submission_rewards.get("points", 50),
xp=submission_rewards.get("xp", 100),
source_type="service_submission",
source_id=new_staging.id,
description=f"Szerviz beküldés: {name}",
created_at=datetime.utcnow()
)
db.add(ledger)
# 9. UserStats frissítése
if stats:
stats.total_points += submission_rewards.get("points", 50)
stats.total_xp += submission_rewards.get("xp", 100)
stats.services_submitted += 1
stats.updated_at = datetime.utcnow()
else:
# Ha nincs még UserStats, létrehozzuk
stats = UserStats(
user_id=current_user.id,
total_points=submission_rewards.get("points", 50),
total_xp=submission_rewards.get("xp", 100),
services_submitted=1,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
db.add(stats)
try:
await db.commit()
return {
"status": "success",
"message": "Szerviz beküldve a rendszerbe elemzésre!",
"xp_earned": submission_rewards.get("xp", 100),
"points_earned": submission_rewards.get("points", 50),
"staging_id": new_staging.id,
"season_id": current_season.id if current_season else None
}
except Exception as e:
await db.rollback()
raise HTTPException(status_code=400, detail=f"Hiba a beküldés során: {str(e)}")
# --- Gamification 2.0 API végpontok (Frontend/Mobil) ---
@router.get("/me", response_model=UserStatResponse)
async def get_my_gamification_stats(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Visszaadja a bejelentkezett felhasználó aktuális statisztikáit.
Ha nincs rekord, alapértelmezett értékekkel tér vissza.
"""
stmt = select(UserStats).where(UserStats.user_id == current_user.id)
stats = (await db.execute(stmt)).scalar_one_or_none()
if not stats:
# Alapértelmezett statisztika
return UserStatResponse(
user_id=current_user.id,
total_xp=0,
current_level=1,
restriction_level=0,
penalty_quota_remaining=0,
banned_until=None
)
return UserStatResponse.from_orm(stats)
@router.get("/seasons/active", response_model=SeasonResponse)
async def get_active_season(
db: AsyncSession = Depends(get_db)
):
"""
Visszaadja az éppen aktív szezont.
"""
stmt = select(Season).where(Season.is_active == True)
season = (await db.execute(stmt)).scalar_one_or_none()
if not season:
raise HTTPException(status_code=404, detail="No active season found")
return SeasonResponse.from_orm(season)
@router.get("/leaderboard", response_model=List[LeaderboardEntry])
async def get_leaderboard_top10(
limit: int = Query(10, ge=1, le=100),
db: AsyncSession = Depends(get_db)
):
"""
Visszaadja a top felhasználókat total_xp alapján csökkenő sorrendben.
"""
stmt = (
select(UserStats, User.email)
.join(User, UserStats.user_id == User.id)
.order_by(desc(UserStats.total_xp))
.limit(limit)
)
result = await db.execute(stmt)
rows = result.all()
leaderboard = []
for stats, email in rows:
leaderboard.append(
LeaderboardEntry(
user_id=stats.user_id,
username=email, # email használata username helyett
total_xp=stats.total_xp,
current_level=stats.current_level
)
)
return leaderboard
# --- QUIZ ENDPOINTS FOR DAILY QUIZ GAMIFICATION ---
@router.get("/quiz/daily")
async def get_daily_quiz(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Returns daily quiz questions for the user.
Checks if user has already played today.
"""
# Check if user has already played today
today = datetime.now().date()
stmt = select(PointsLedger).where(
PointsLedger.user_id == current_user.id,
func.date(PointsLedger.created_at) == today,
PointsLedger.reason.ilike("%quiz%")
)
result = await db.execute(stmt)
already_played = result.scalar_one_or_none()
if already_played:
raise HTTPException(
status_code=400,
detail="You have already played the daily quiz today. Try again tomorrow."
)
# Return quiz questions (for now, using mock questions - in production these would come from a database)
quiz_questions = [
{
"id": 1,
"question": "Melyik alkatrész felelős a motor levegőüzemanyag keverékének szabályozásáért?",
"options": ["Generátor", "Lambdaszonda", "Féktárcsa", "Olajszűrő"],
"correctAnswer": 1,
"explanation": "A lambdaszonda méri a kipufogógáz oxigéntartalmát, és ezen alapul a befecskendezés."
},
{
"id": 2,
"question": "Mennyi ideig érvényes egy gépjármű műszaki vizsgája Magyarországon?",
"options": ["1 év", "2 év", "4 év", "6 év"],
"correctAnswer": 1,
"explanation": "A személygépkocsik műszaki vizsgája 2 évre érvényes, kivéve az újonnan forgalomba helyezett autókat."
},
{
"id": 3,
"question": "Melyik anyag NEM része a hibrid autók akkumulátorának?",
"options": ["Lítium", "Nikkel", "Ólom", "Kobalt"],
"correctAnswer": 2,
"explanation": "A hibrid és elektromos autók akkumulátoraiban általában lítium, nikkel és kobalt található, ólom az ólomsavas akkukban van."
}
]
return {
"questions": quiz_questions,
"total_questions": len(quiz_questions),
"date": today.isoformat()
}
@router.post("/quiz/answer")
async def submit_quiz_answer(
question_id: int = Body(...),
selected_option: int = Body(...),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Submit answer to a quiz question and award points if correct.
"""
# Check if user has already played today
today = datetime.now().date()
stmt = select(PointsLedger).where(
PointsLedger.user_id == current_user.id,
func.date(PointsLedger.created_at) == today,
PointsLedger.reason.ilike("%quiz%")
)
result = await db.execute(stmt)
already_played = result.scalar_one_or_none()
if already_played:
raise HTTPException(
status_code=400,
detail="You have already played the daily quiz today. Try again tomorrow."
)
# Mock quiz data - in production this would come from a database
quiz_data = {
1: {"correct_answer": 1, "points": 10, "explanation": "A lambdaszonda méri a kipufogógáz oxigéntartalmát, és ezen alapul a befecskendezés."},
2: {"correct_answer": 1, "points": 10, "explanation": "A személygépkocsik műszaki vizsgája 2 évre érvényes, kivéve az újonnan forgalomba helyezett autókat."},
3: {"correct_answer": 2, "points": 10, "explanation": "A hibrid és elektromos autók akkumulátoraiban általában lítium, nikkel és kobalt található, ólom az ólomsavas akkukban van."}
}
if question_id not in quiz_data:
raise HTTPException(status_code=404, detail="Question not found")
question_info = quiz_data[question_id]
is_correct = selected_option == question_info["correct_answer"]
# Award points if correct
if is_correct:
# Update user stats
stats_stmt = select(UserStats).where(UserStats.user_id == current_user.id)
stats_result = await db.execute(stats_stmt)
user_stats = stats_result.scalar_one_or_none()
if not user_stats:
# Create user stats if they don't exist
user_stats = UserStats(
user_id=current_user.id,
total_xp=question_info["points"],
current_level=1
)
db.add(user_stats)
else:
user_stats.total_xp += question_info["points"]
# Add points ledger entry
points_ledger = PointsLedger(
user_id=current_user.id,
points=question_info["points"],
reason=f"Daily quiz correct answer - Question {question_id}",
created_at=datetime.now()
)
db.add(points_ledger)
await db.commit()
return {
"is_correct": is_correct,
"correct_answer": question_info["correct_answer"],
"points_awarded": question_info["points"] if is_correct else 0,
"explanation": question_info["explanation"]
}
@router.post("/quiz/complete")
async def complete_daily_quiz(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Mark daily quiz as completed for today.
This prevents the user from playing again today.
"""
today = datetime.now().date()
# Check if already completed today
stmt = select(PointsLedger).where(
PointsLedger.user_id == current_user.id,
func.date(PointsLedger.created_at) == today,
PointsLedger.reason == "Daily quiz completed"
)
result = await db.execute(stmt)
already_completed = result.scalar_one_or_none()
if already_completed:
raise HTTPException(
status_code=400,
detail="Daily quiz already marked as completed today."
)
# Add completion entry
completion_ledger = PointsLedger(
user_id=current_user.id,
points=0,
reason="Daily quiz completed",
created_at=datetime.now()
)
db.add(completion_ledger)
await db.commit()
return {"message": "Daily quiz marked as completed for today."}
@router.get("/quiz/stats")
async def get_quiz_stats(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get user's quiz statistics including points, streak, and last played date.
"""
# Get user stats
stats_stmt = select(UserStats).where(UserStats.user_id == current_user.id)
stats_result = await db.execute(stats_stmt)
user_stats = stats_result.scalar_one_or_none()
# Get quiz points from ledger
points_stmt = select(func.sum(PointsLedger.points)).where(
PointsLedger.user_id == current_user.id,
PointsLedger.reason.ilike("%quiz%")
)
points_result = await db.execute(points_stmt)
quiz_points = points_result.scalar() or 0
# Get last played date
last_played_stmt = select(PointsLedger.created_at).where(
PointsLedger.user_id == current_user.id,
PointsLedger.reason.ilike("%quiz%")
).order_by(desc(PointsLedger.created_at)).limit(1)
last_played_result = await db.execute(last_played_stmt)
last_played = last_played_result.scalar()
# Calculate streak (simplified - in production would be more sophisticated)
streak = 0
if last_played:
# Simple streak calculation - check last 7 days
streak = 1 # Placeholder
return {
"total_quiz_points": quiz_points,
"total_xp": user_stats.total_xp if user_stats else 0,
"current_level": user_stats.current_level if user_stats else 1,
"last_played": last_played.isoformat() if last_played else None,
"current_streak": streak,
"can_play_today": not await has_played_today(db, current_user.id)
}
async def has_played_today(db: AsyncSession, user_id: int) -> bool:
"""Check if user has already played quiz today."""
today = datetime.now().date()
stmt = select(PointsLedger).where(
PointsLedger.user_id == user_id,
func.date(PointsLedger.created_at) == today,
PointsLedger.reason.ilike("%quiz%")
)
result = await db.execute(stmt)
return result.scalar_one_or_none() is not None
# --- BADGE/TROPHY ENDPOINTS ---
@router.get("/badges")
async def get_all_badges(
db: AsyncSession = Depends(get_db)
):
"""
Get all available badges in the system.
"""
stmt = select(Badge)
result = await db.execute(stmt)
badges = result.scalars().all()
return [
{
"id": badge.id,
"name": badge.name,
"description": badge.description,
"icon_url": badge.icon_url
}
for badge in badges
]
@router.get("/my-badges")
async def get_my_badges(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get badges earned by the current user.
"""
stmt = (
select(UserBadge, Badge)
.join(Badge, UserBadge.badge_id == Badge.id)
.where(UserBadge.user_id == current_user.id)
.order_by(desc(UserBadge.earned_at))
)
result = await db.execute(stmt)
user_badges = result.all()
return [
{
"badge_id": badge.id,
"badge_name": badge.name,
"badge_description": badge.description,
"badge_icon_url": badge.icon_url,
"earned_at": user_badge.earned_at.isoformat() if user_badge.earned_at else None
}
for user_badge, badge in user_badges
]
@router.post("/badges/award/{badge_id}")
async def award_badge_to_user(
badge_id: int,
user_id: int = Body(None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Award a badge to a user (admin only or automated system).
"""
# Check if badge exists
badge_stmt = select(Badge).where(Badge.id == badge_id)
badge_result = await db.execute(badge_stmt)
badge = badge_result.scalar_one_or_none()
if not badge:
raise HTTPException(status_code=404, detail="Badge not found")
# Determine target user (default to current user if not specified)
target_user_id = user_id if user_id else current_user.id
# Check if user already has this badge
existing_stmt = select(UserBadge).where(
UserBadge.user_id == target_user_id,
UserBadge.badge_id == badge_id
)
existing_result = await db.execute(existing_stmt)
existing = existing_result.scalar_one_or_none()
if existing:
raise HTTPException(status_code=400, detail="User already has this badge")
# Award the badge
user_badge = UserBadge(
user_id=target_user_id,
badge_id=badge_id,
earned_at=datetime.now()
)
db.add(user_badge)
# Also add points for earning a badge
points_ledger = PointsLedger(
user_id=target_user_id,
points=50, # Points for earning a badge
reason=f"Badge earned: {badge.name}",
created_at=datetime.now()
)
db.add(points_ledger)
# Update user stats
stats_stmt = select(UserStats).where(UserStats.user_id == target_user_id)
stats_result = await db.execute(stats_stmt)
user_stats = stats_result.scalar_one_or_none()
if user_stats:
user_stats.total_xp += 50
else:
user_stats = UserStats(
user_id=target_user_id,
total_xp=50,
current_level=1
)
db.add(user_stats)
await db.commit()
return {
"message": f"Badge '{badge.name}' awarded to user",
"badge_id": badge.id,
"badge_name": badge.name,
"points_awarded": 50
}
@router.get("/achievements")
async def get_achievements_progress(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""
Get user's progress on various achievements (combines badges and other metrics).
"""
# Get user badges
badges_stmt = select(UserBadge.badge_id).where(UserBadge.user_id == current_user.id)
badges_result = await db.execute(badges_stmt)
user_badge_ids = [row[0] for row in badges_result.all()]
# Get all badges
all_badges_stmt = select(Badge)
all_badges_result = await db.execute(all_badges_stmt)
all_badges = all_badges_result.scalars().all()
# Get user stats
stats_stmt = select(UserStats).where(UserStats.user_id == current_user.id)
stats_result = await db.execute(stats_stmt)
user_stats = stats_result.scalar_one_or_none()
# Define achievement categories
achievements = []
# Badge-based achievements
for badge in all_badges:
achievements.append({
"id": f"badge_{badge.id}",
"title": badge.name,
"description": badge.description,
"icon_url": badge.icon_url,
"is_earned": badge.id in user_badge_ids,
"category": "badge",
"progress": 100 if badge.id in user_badge_ids else 0
})
# XP-based achievements
xp_levels = [
{"title": "Novice", "xp_required": 100, "description": "Earn 100 XP"},
{"title": "Apprentice", "xp_required": 500, "description": "Earn 500 XP"},
{"title": "Expert", "xp_required": 2000, "description": "Earn 2000 XP"},
{"title": "Master", "xp_required": 5000, "description": "Earn 5000 XP"},
]
current_xp = user_stats.total_xp if user_stats else 0
for level in xp_levels:
progress = min((current_xp / level["xp_required"]) * 100, 100)
achievements.append({
"id": f"xp_{level['xp_required']}",
"title": level["title"],
"description": level["description"],
"icon_url": None,
"is_earned": current_xp >= level["xp_required"],
"category": "xp",
"progress": progress
})
# Quiz-based achievements
quiz_points_stmt = select(func.sum(PointsLedger.points)).where(
PointsLedger.user_id == current_user.id,
PointsLedger.reason.ilike("%quiz%")
)
quiz_points_result = await db.execute(quiz_points_stmt)
quiz_points = quiz_points_result.scalar() or 0
quiz_achievements = [
{"title": "Quiz Beginner", "points_required": 50, "description": "Earn 50 quiz points"},
{"title": "Quiz Enthusiast", "points_required": 200, "description": "Earn 200 quiz points"},
{"title": "Quiz Master", "points_required": 500, "description": "Earn 500 quiz points"},
]
for achievement in quiz_achievements:
progress = min((quiz_points / achievement["points_required"]) * 100, 100)
achievements.append({
"id": f"quiz_{achievement['points_required']}",
"title": achievement["title"],
"description": achievement["description"],
"icon_url": None,
"is_earned": quiz_points >= achievement["points_required"],
"category": "quiz",
"progress": progress
})
return {
"achievements": achievements,
"total_achievements": len(achievements),
"earned_count": sum(1 for a in achievements if a["is_earned"]),
"progress_percentage": round((sum(1 for a in achievements if a["is_earned"]) / len(achievements)) * 100, 1) if achievements else 0
}