# /opt/docker/dev/service_finder/backend/app/services/billing_engine.py """ 🤖 Atomic Billing Engine - Quadruple Wallet & Double-Entry Ledger A Service Finder pénzügyi motorja. Felelős a következőkért: 1. Árképzés (Pricing Pipeline): Régió, RBAC rang és egyedi kedvezmények alapján 2. Intelligens levonás (Smart Deduction): VOUCHER → SERVICE_COINS/PURCHASED → EARNED sorrend 3. Atomikus tranzakciók (Atomic Transactions): Double-entry könyvelés a FinancialLedger táblában Design elvek: - FIFO (First In, First Out) voucher kezelés - SZÉP-kártya modell: lejárt voucher 10% díj, 90% átcsoportosítás új lejárattal - SQLAlchemy Session.begin() atomi tranzakciók - Soft-delete és Twin-technika támogatása """ import logging import uuid import enum from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any from decimal import Decimal from sqlalchemy import select, update, func, and_, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import selectinload from app.models.identity import User, Wallet, ActiveVoucher, UserRole from app.models.audit import FinancialLedger, LedgerEntryType, WalletType from app.core.config import settings from app.services.config_service import config logger = logging.getLogger("billing-engine") class PricingCalculator: """ Árképzési csővezeték. Számítja a végső árat régió, RBAC rang és egyedi kedvezmények alapján. """ # Region multipliers (country code -> multiplier) REGION_MULTIPLIERS = { "HU": 1.0, # Hungary - base "GB": 1.2, # UK - 20% higher "DE": 1.15, # Germany - 15% higher "US": 1.25, # USA - 25% higher "RO": 0.9, # Romania - 10% lower "SK": 0.95, # Slovakia - 5% lower } # RBAC rank discounts (higher rank = bigger discount) # Map the actual UserRole enum values to discount percentages RBAC_DISCOUNTS = { UserRole.superadmin: 0.5, # 50% discount UserRole.admin: 0.3, # 30% discount UserRole.fleet_manager: 0.2, # 20% discount UserRole.user: 0.0, # 0% discount # Add other roles as needed UserRole.region_admin: 0.25, # 25% discount UserRole.country_admin: 0.25, # 25% discount UserRole.moderator: 0.15, # 15% discount UserRole.sales_agent: 0.1, # 10% discount UserRole.service_owner: 0.1, # 10% discount UserRole.driver: 0.0, # 0% discount } @classmethod async def calculate_final_price( cls, db: AsyncSession, base_amount: float, country_code: str = "HU", user_role: UserRole = UserRole.user, individual_discounts: Optional[List[Dict[str, Any]]] = None ) -> float: """ Végső ár kiszámítása. Args: db: Database session base_amount: Alapár (pl. szolgáltatás díja) country_code: Országkód (pl. "HU", "GB") user_role: Felhasználó RBAC rangja individual_discounts: Egyedi kedvezmények listája Returns: Végső ár (float) """ # 1. Region multiplier region_multiplier = cls.REGION_MULTIPLIERS.get(country_code.upper(), 1.0) amount = base_amount * region_multiplier # 2. RBAC discount rbac_discount = cls.RBAC_DISCOUNTS.get(user_role, 0.0) if rbac_discount > 0: amount = amount * (1 - rbac_discount) # 3. Individual discounts (e.g., promo codes, loyalty points) if individual_discounts: for discount in individual_discounts: discount_type = discount.get("type") discount_value = discount.get("value", 0) if discount_type == "percentage": amount = amount * (1 - discount_value / 100) elif discount_type == "fixed": amount = max(0, amount - discount_value) elif discount_type == "multiplier": amount = amount * discount_value # 4. Round to 2 decimal places amount = round(amount, 2) logger.info( f"Pricing calculation: base={base_amount}, country={country_code}, " f"role={user_role}, final={amount}" ) return amount class SmartDeduction: """ Intelligens levonás a Quadruple Wallet rendszerből. Levonási sorrend: VOUCHER → SERVICE_COINS/PURCHASED → EARNED """ @classmethod async def deduct_from_wallets( cls, db: AsyncSession, user_id: int, amount: float ) -> Dict[str, float]: """ Összeg levonása a felhasználó pénztárcáiból intelligens sorrendben. Args: db: Database session user_id: Felhasználó ID amount: Levonandó összeg Returns: Dict: wallet_type -> used_amount """ # Get user's wallet stmt = select(Wallet).where(Wallet.user_id == user_id) result = await db.execute(stmt) wallet = result.scalar_one_or_none() if not wallet: raise ValueError(f"Wallet not found for user_id={user_id}") remaining = Decimal(str(amount)) used_amounts = { "VOUCHER": 0.0, "SERVICE_COINS": 0.0, "PURCHASED": 0.0, "EARNED": 0.0 } print(f"[DEBUG] SmartDeduction.deduct_from_wallets: user_id={user_id}, amount={amount}, remaining={remaining}") print(f"[DEBUG] Wallet before: purchased={wallet.purchased_credits}, earned={wallet.earned_credits}, service_coins={wallet.service_coins}") # 1. VOUCHER levonás (FIFO) if remaining > 0: voucher_used = await cls._deduct_from_vouchers(db, wallet.id, remaining) used_amounts["VOUCHER"] = float(voucher_used) remaining -= Decimal(str(voucher_used)) print(f"[DEBUG] After VOUCHER: voucher_used={voucher_used}, remaining={remaining}") # 2. SERVICE_COINS levonás if remaining > 0 and wallet.service_coins >= remaining: used_amounts["SERVICE_COINS"] = float(remaining) wallet.service_coins -= remaining remaining = Decimal('0') print(f"[DEBUG] After SERVICE_COINS (full): used={remaining}, wallet.service_coins={wallet.service_coins}") elif remaining > 0 and wallet.service_coins > 0: used_amounts["SERVICE_COINS"] = float(wallet.service_coins) remaining -= wallet.service_coins wallet.service_coins = Decimal('0') print(f"[DEBUG] After SERVICE_COINS (partial): used={wallet.service_coins}, remaining={remaining}, wallet.service_coins={wallet.service_coins}") # 3. PURCHASED levonás if remaining > 0 and wallet.purchased_credits >= remaining: used_amounts["PURCHASED"] = float(remaining) wallet.purchased_credits -= remaining remaining = Decimal('0') print(f"[DEBUG] After PURCHASED (full): used={remaining}, wallet.purchased_credits={wallet.purchased_credits}") elif remaining > 0 and wallet.purchased_credits > 0: used_amounts["PURCHASED"] = float(wallet.purchased_credits) remaining -= wallet.purchased_credits wallet.purchased_credits = Decimal('0') print(f"[DEBUG] After PURCHASED (partial): used={wallet.purchased_credits}, remaining={remaining}, wallet.purchased_credits={wallet.purchased_credits}") # 4. EARNED levonás (utolsó) if remaining > 0 and wallet.earned_credits >= remaining: used_amounts["EARNED"] = float(remaining) wallet.earned_credits -= remaining remaining = Decimal('0') elif remaining > 0 and wallet.earned_credits > 0: used_amounts["EARNED"] = float(wallet.earned_credits) remaining -= wallet.earned_credits wallet.earned_credits = Decimal('0') # Check if we have enough funds if remaining > 0: raise ValueError( f"Insufficient funds. User_id={user_id}, " f"required={amount}, remaining={remaining}" ) # Update wallet logger.info( f"Smart deduction completed for user_id={user_id}: " f"total={amount}, used={used_amounts}" ) return used_amounts @classmethod async def _deduct_from_vouchers( cls, db: AsyncSession, wallet_id: int, amount: Decimal ) -> Decimal: """ Voucher levonás FIFO elv szerint (legrégebbi lejáratú először). Args: db: Database session wallet_id: Pénztárca ID amount: Levonandó összeg Returns: Decimal: Voucherból felhasznált összeg """ # Get active vouchers ordered by expiry (FIFO) stmt = ( select(ActiveVoucher) .where( and_( ActiveVoucher.wallet_id == wallet_id, ActiveVoucher.expires_at > datetime.utcnow() ) ) .order_by(ActiveVoucher.expires_at.asc()) ) result = await db.execute(stmt) vouchers = result.scalars().all() remaining = amount total_used = Decimal('0') for voucher in vouchers: if remaining <= 0: break voucher_amount = Decimal(str(voucher.amount)) if voucher_amount <= remaining: # Use entire voucher total_used += voucher_amount remaining -= voucher_amount await db.delete(voucher) # Voucher fully used else: # Use part of voucher total_used += remaining voucher.amount = voucher_amount - remaining remaining = Decimal('0') return total_used @classmethod async def process_voucher_expiration(cls, db: AsyncSession) -> Dict[str, Any]: """ Lejárt voucher-ek feldolgozása SZÉP-kártya modell szerint. Dinamikus díj levonása, a maradék átcsoportosítás új lejárattal. Returns: Dict: Statisztikák a feldolgozásról """ now = datetime.utcnow() # Get dynamic fee percentage from config service fee_percent = await config.get_setting(db, "voucher_expiry_fee_percent", default=10.0) fee_rate = Decimal(str(fee_percent)) / Decimal("100.0") # Find expired vouchers with eager loading of wallet relationship stmt = select(ActiveVoucher).where(ActiveVoucher.expires_at <= now).options(selectinload(ActiveVoucher.wallet)) result = await db.execute(stmt) expired_vouchers = result.scalars().all() stats = { "total_expired": len(expired_vouchers), "total_amount": 0.0, "fee_collected": 0.0, "rolled_over": 0.0, "wallets_affected": set(), "fee_percent": float(fee_percent) } for voucher in expired_vouchers: original_amount = Decimal(str(voucher.original_amount)) current_amount = Decimal(str(voucher.amount)) # Calculate dynamic fee fee = current_amount * fee_rate rolled_over = current_amount - fee # Get wallet for ledger entry wallet = voucher.wallet # Create FinancialLedger entry for the fee (platform revenue) if fee > 0: ledger_entry = FinancialLedger( user_id=wallet.user_id, amount=fee, entry_type=LedgerEntryType.DEBIT, wallet_type=WalletType.VOUCHER, transaction_type="VOUCHER_EXPIRY_FEE", details={ "description": f"Voucher expiry fee ({fee_percent}%)", "reference_type": "VOUCHER_EXPIRY_FEE", "reference_id": voucher.id, "wallet_type": "VOUCHER", "fee_percent": fee_percent }, transaction_id=uuid.uuid4(), balance_after=0, # Voucher balance after deletion is 0 currency="EUR" ) db.add(ledger_entry) # Create new voucher with new expiry (30 days from now) for rolled over amount if rolled_over > 0: new_expiry = now + timedelta(days=30) new_voucher = ActiveVoucher( wallet_id=wallet.id, amount=rolled_over, original_amount=rolled_over, expires_at=new_expiry ) db.add(new_voucher) # Delete expired voucher await db.delete(voucher) # Update stats stats["total_amount"] += float(current_amount) stats["fee_collected"] += float(fee) stats["rolled_over"] += float(rolled_over) stats["wallets_affected"].add(wallet.id) if expired_vouchers: stats["wallets_affected"] = len(stats["wallets_affected"]) logger.info( f"Voucher expiration processed: {stats['total_expired']} vouchers, " f"fee_percent={fee_percent}%, fee={stats['fee_collected']}, rolled_over={stats['rolled_over']}" ) return stats class AtomicTransactionManager: """ Atomikus tranzakciókezelő double-entry könyveléssel. Minden pénzmozgás rögzítésre kerül a FinancialLedger táblában. """ @classmethod async def atomic_billing_transaction( cls, db: AsyncSession, user_id: int, amount: float, description: str, reference_type: Optional[str] = None, reference_id: Optional[int] = None, used_amounts: Optional[Dict[str, float]] = None, beneficiary_id: Optional[int] = None ) -> Dict[str, Any]: """ Atomikus számlázási tranzakció végrehajtása. Args: db: Database session user_id: Felhasználó ID amount: Összeg description: Tranzakció leírása reference_type: Referencia típus (pl. "service", "subscription") reference_id: Referencia ID used_amounts: Optional pre-calculated deduction amounts. If provided, SmartDeduction.deduct_from_wallets will not be called. Returns: Dict: Tranzakció részletei """ transaction_id = uuid.uuid4() async def execute_logic(): # Get user and wallet user_stmt = select(User).where(User.id == user_id) user_result = await db.execute(user_stmt) user = user_result.scalar_one_or_none() if not user: raise ValueError(f"User not found: id={user_id}") wallet_stmt = select(Wallet).where(Wallet.user_id == user_id) wallet_result = await db.execute(wallet_stmt) wallet = wallet_result.scalar_one_or_none() if not wallet: raise ValueError(f"Wallet not found for user: id={user_id}") # Perform smart deduction if used_amounts not provided if used_amounts is None: deduction_result = await SmartDeduction.deduct_from_wallets(db, user_id, amount) else: # Validate that used_amounts matches the expected amount total_used = sum(used_amounts.values()) if abs(total_used - amount) > 0.01: # Allow small floating point differences raise ValueError( f"Provided used_amounts ({total_used}) does not match expected amount ({amount})" ) deduction_result = used_amounts # Use deduction_result for ledger creation used_amounts_for_ledger = deduction_result # Create ledger entries for each wallet type used for wallet_type_str, used_amount in used_amounts_for_ledger.items(): if used_amount > 0: wallet_type = WalletType[wallet_type_str] # DEBIT entry (money leaving the wallet) debit_entry = FinancialLedger( user_id=user_id, amount=Decimal(str(used_amount)), entry_type=LedgerEntryType.DEBIT, wallet_type=wallet_type, transaction_type=reference_type or "atomic_debit", details={ "description": f"{description} - {wallet_type_str}", "reference_type": reference_type, "reference_id": reference_id, "wallet_type": wallet_type_str, }, transaction_id=transaction_id, balance_after=await cls._get_wallet_balance(db, wallet, wallet_type), currency="EUR" ) db.add(debit_entry) # CREDIT entry (money going to system revenue OR beneficiary) is_internal_transfer = beneficiary_id is not None credit_user_id = beneficiary_id if is_internal_transfer else user_id credit_tx_type = "internal_transfer_credit" if is_internal_transfer else "system_revenue" credit_desc = f"Transfer to beneficiary - {wallet_type_str}" if is_internal_transfer else f"System revenue - {wallet_type_str}" credit_entry = FinancialLedger( user_id=credit_user_id, amount=Decimal(str(used_amount)), entry_type=LedgerEntryType.CREDIT, wallet_type=wallet_type, transaction_type=credit_tx_type, details={ "description": credit_desc, "reference_type": reference_type, "reference_id": reference_id, "beneficiary_id": beneficiary_id }, transaction_id=transaction_id, balance_after=0, # Később fejlesztendő: pontos balance currency="EUR" ) db.add(credit_entry) # Flush to generate IDs but let context manager commit await db.flush() transaction_details = { "transaction_id": str(transaction_id), "user_id": user_id, "amount": amount, "description": description, "used_amounts": used_amounts_for_ledger, "timestamp": datetime.utcnow().isoformat() } logger.info( f"Atomic transaction completed: {transaction_id}, " f"user={user_id}, amount={amount}" ) return transaction_details try: # Start atomic transaction only if not already in one if not db.in_transaction(): # No active transaction, start a new one async with db.begin(): return await execute_logic() else: # Already in a transaction, execute logic within existing transaction return await execute_logic() except Exception as e: logger.error(f"Atomic transaction failed: {e}") raise @classmethod async def _get_wallet_balance( cls, db: AsyncSession, wallet: Wallet, wallet_type: WalletType ) -> Optional[float]: """ Get current balance for a specific wallet type. """ if wallet_type == WalletType.EARNED: return float(wallet.earned_credits) elif wallet_type == WalletType.PURCHASED: return float(wallet.purchased_credits) elif wallet_type == WalletType.SERVICE_COINS: return float(wallet.service_coins) elif wallet_type == WalletType.VOUCHER: # Calculate total voucher balance stmt = select(func.sum(ActiveVoucher.amount)).where( and_( ActiveVoucher.wallet_id == wallet.id, ActiveVoucher.expires_at > datetime.utcnow() ) ) result = await db.execute(stmt) total_vouchers = result.scalar() or Decimal('0') return float(total_vouchers) return None @classmethod async def get_transaction_history( cls, db: AsyncSession, user_id: Optional[int] = None, transaction_id: Optional[uuid.UUID] = None, limit: int = 100, offset: int = 0 ) -> List[Dict[str, Any]]: """ Tranzakció előzmények lekérdezése. """ # Build query stmt = select(FinancialLedger) # Apply filters if user_id is not None: stmt = stmt.where(FinancialLedger.user_id == user_id) if transaction_id is not None: stmt = stmt.where(FinancialLedger.transaction_id == transaction_id) # Order by most recent first stmt = stmt.order_by(FinancialLedger.created_at.desc()) # Apply pagination stmt = stmt.offset(offset).limit(limit) # Execute query result = await db.execute(stmt) ledger_entries = result.scalars().all() # Convert to dictionary format transactions = [] for entry in ledger_entries: transactions.append({ "id": entry.id, "user_id": entry.user_id, "amount": float(entry.amount), "entry_type": entry.entry_type.value, "wallet_type": entry.wallet_type.value if entry.wallet_type else None, "description": entry.description, "transaction_id": str(entry.transaction_id), "reference_type": entry.reference_type, "reference_id": entry.reference_id, "balance_after": float(entry.balance_after) if entry.balance_after else None, "created_at": entry.created_at.isoformat() if entry.created_at else None }) return transactions @classmethod async def get_wallet_summary( cls, db: AsyncSession, user_id: int ) -> Dict[str, Any]: """ Pénztárca összegző információk lekérdezése. """ # Get wallet stmt = select(Wallet).where(Wallet.user_id == user_id) result = await db.execute(stmt) wallet = result.scalar_one_or_none() if not wallet: raise ValueError(f"Wallet not found for user_id={user_id}") # Calculate voucher balance voucher_stmt = select(func.sum(ActiveVoucher.amount)).where( and_( ActiveVoucher.wallet_id == wallet.id, ActiveVoucher.expires_at > datetime.utcnow() ) ) voucher_result = await db.execute(voucher_stmt) voucher_balance = voucher_result.scalar() or Decimal('0') # Get recent transactions recent_transactions = await cls.get_transaction_history(db, user_id=user_id, limit=10) return { "wallet_id": wallet.id, "balances": { "earned": float(wallet.earned_credits), "purchased": float(wallet.purchased_credits), "service_coins": float(wallet.service_coins), "voucher": float(voucher_balance), "total": float( wallet.earned_credits + wallet.purchased_credits + wallet.service_coins + voucher_balance ) }, "recent_transactions": recent_transactions, "last_updated": datetime.utcnow().isoformat() } # Helper function for easy access async def create_billing_transaction( db: AsyncSession, user_id: int, amount: float, description: str, reference_type: Optional[str] = None, reference_id: Optional[int] = None ) -> Dict[str, Any]: """ Segédfüggvény számlázási tranzakció létrehozásához. """ return await AtomicTransactionManager.atomic_billing_transaction( db, user_id, amount, description, reference_type, reference_id ) async def calculate_price( db: AsyncSession, base_amount: float, country_code: str = "HU", user_role: UserRole = UserRole.user, individual_discounts: Optional[List[Dict[str, Any]]] = None ) -> float: """ Segédfüggvény ár kiszámításához. """ return await PricingCalculator.calculate_final_price( db, base_amount, country_code, user_role, individual_discounts ) async def get_wallet_info( db: AsyncSession, user_id: int ) -> Dict[str, Any]: """ Segédfüggvény pénztárca információk lekérdezéséhez. """ return await AtomicTransactionManager.get_wallet_summary(db, user_id) # ==================== Billing Engine Service Functions ==================== async def charge_user( db: AsyncSession, user_id: int, amount: float, currency: str = "EUR", transaction_type: str = "service_payment", description: Optional[str] = None ) -> Dict[str, Any]: """ Kredit levonás a felhasználótól intelligens levonási sorrendben. Args: db: Database session user_id: Felhasználó ID amount: Levonandó összeg currency: Pénznem (jelenleg csak EUR támogatott) transaction_type: Tranzakció típusa (pl. "service_payment", "subscription") description: Opcionális leírás Returns: Dict: Tranzakció részletei (AtomicTransactionManager.atomic_billing_transaction eredménye) """ if currency != "EUR": raise ValueError("Only EUR currency is currently supported") desc = description or f"Charge for {transaction_type}" return await AtomicTransactionManager.atomic_billing_transaction( db=db, user_id=user_id, amount=amount, description=desc, reference_type=transaction_type, reference_id=None ) async def upgrade_subscription( db: AsyncSession, user_id: int, target_package: str ) -> Dict[str, Any]: """ Felhasználó előfizetésének frissítése (csomagváltás). Args: db: Database session user_id: Felhasználó ID target_package: Cél csomag neve (pl. "premium", "vip") Returns: Dict: Tranzakció részletei és az új előfizetés információi """ from app.models.core_logic import SubscriptionTier from app.models.identity import User # 1. Ellenőrizze, hogy a cél csomag létezik-e stmt = select(SubscriptionTier).where(SubscriptionTier.name == target_package) result = await db.execute(stmt) tier = result.scalar_one_or_none() if not tier: raise ValueError(f"Subscription tier '{target_package}' not found") # 2. Számítsa ki az árát a csomagnak (egyszerűsítve: fix ár a tier.rules-ból) price = tier.rules.get("price", 0.0) if tier.rules else 0.0 if price <= 0: # Ingyenes csomag, nincs levonás logger.info(f"Upgrading user {user_id} to free tier {target_package}") # Frissítse a felhasználó subscription_plan mezőjét user_stmt = select(User).where(User.id == user_id) user_result = await db.execute(user_stmt) user = user_result.scalar_one() user.subscription_plan = target_package user.subscription_expires_at = datetime.utcnow() + timedelta(days=30) # 30 nap return { "success": True, "message": f"Upgraded to {target_package} (free)", "new_plan": target_package, "price_paid": 0.0 } # 3. Ár kiszámítása a PricingCalculator segítségével user_stmt = select(User).where(User.id == user_id) user_result = await db.execute(user_stmt) user = user_result.scalar_one() final_price = await PricingCalculator.calculate_final_price( db=db, base_amount=price, country_code=user.region_code, user_role=user.role ) # 4. Levonás a felhasználótól transaction = await charge_user( db=db, user_id=user_id, amount=final_price, currency="EUR", transaction_type="subscription_upgrade", description=f"Upgrade to {target_package} subscription" ) # 5. Frissítse a felhasználó előfizetési adatait user.subscription_plan = target_package user.subscription_expires_at = datetime.utcnow() + timedelta(days=30) # 30 nap logger.info(f"User {user_id} upgraded to {target_package} for {final_price} EUR") return { "success": True, "transaction": transaction, "new_plan": target_package, "price_paid": final_price, "expires_at": user.subscription_expires_at.isoformat() } async def record_ledger_entry( db: AsyncSession, user_id: int, amount: float, entry_type: LedgerEntryType, wallet_type: WalletType, transaction_type: str, description: str, reference_type: Optional[str] = None, reference_id: Optional[int] = None ) -> FinancialLedger: """ Közvetlen főkönyvbejegyzés létrehozása (pl. manuális korrekciók). Megjegyzés: Ez a függvény NEM végez levonást a pénztárcából, csak naplóbejegyzést készít. A pénztárca egyenleg frissítéséhez használd a charge_user vagy atomic_billing_transaction függvényeket. Args: db: Database session user_id: Felhasználó ID amount: Összeg entry_type: DEBIT vagy CREDIT wallet_type: Pénztárca típus transaction_type: Tranzakció típusa description: Leírás reference_type: Referencia típus reference_id: Referencia ID Returns: FinancialLedger: Létrehozott főkönyvbejegyzés """ ledger_entry = FinancialLedger( user_id=user_id, amount=Decimal(str(amount)), entry_type=entry_type, wallet_type=wallet_type, transaction_type=transaction_type, details={ "description": description, "reference_type": reference_type, "reference_id": reference_id, "wallet_type": wallet_type.value }, transaction_id=uuid.uuid4(), balance_after=None, # Később számolható currency="EUR" ) db.add(ledger_entry) await db.flush() logger.info(f"Ledger entry recorded: user={user_id}, amount={amount}, type={entry_type.value}") return ledger_entry async def get_user_balance( db: AsyncSession, user_id: int ) -> Dict[str, float]: """ Felhasználó pénztárca egyenlegének lekérdezése. Args: db: Database session user_id: Felhasználó ID Returns: Dict: Pénztárca típusonkénti egyenlegek """ wallet_summary = await AtomicTransactionManager.get_wallet_summary(db, user_id) return wallet_summary["balances"]