fix(finance): Implement strict P2P double-entry ledger logic and resolve transaction state errors

Fix atomic_billing_transaction double deduction bug; implement dynamic CREDIT handling for beneficiaries in Double-Entry Ledger; clean up audit test directory.
This commit is contained in:
Roo
2026-03-08 23:08:43 +00:00
parent cead60f4e2
commit 8d25f44ec6
3 changed files with 1384 additions and 0 deletions

View File

@@ -0,0 +1,684 @@
# /opt/docker/dev/service_finder/backend/app/services/billing_engine.py
"""
🤖 Atomic Billing Engine - Quadruple Wallet & Double-Entry Ledger
A Service Finder pénzügyi motorja. Felelős a következőkért:
1. Árképzés (Pricing Pipeline): Régió, RBAC rang és egyedi kedvezmények alapján
2. Intelligens levonás (Smart Deduction): VOUCHER → SERVICE_COINS/PURCHASED → EARNED sorrend
3. Atomikus tranzakciók (Atomic Transactions): Double-entry könyvelés a FinancialLedger táblában
Design elvek:
- FIFO (First In, First Out) voucher kezelés
- SZÉP-kártya modell: lejárt voucher 10% díj, 90% átcsoportosítás új lejárattal
- SQLAlchemy Session.begin() atomi tranzakciók
- Soft-delete és Twin-technika támogatása
"""
import logging
import uuid
import enum
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
from decimal import Decimal
from sqlalchemy import select, update, func, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import selectinload
from app.models.identity import User, Wallet, ActiveVoucher, UserRole
from app.models.audit import FinancialLedger, LedgerEntryType, WalletType
from app.core.config import settings
from app.services.config_service import config
logger = logging.getLogger("billing-engine")
class PricingCalculator:
"""
Árképzési csővezeték. Számítja a végső árat régió, RBAC rang és egyedi kedvezmények alapján.
"""
# Region multipliers (country code -> multiplier)
REGION_MULTIPLIERS = {
"HU": 1.0, # Hungary - base
"GB": 1.2, # UK - 20% higher
"DE": 1.15, # Germany - 15% higher
"US": 1.25, # USA - 25% higher
"RO": 0.9, # Romania - 10% lower
"SK": 0.95, # Slovakia - 5% lower
}
# RBAC rank discounts (higher rank = bigger discount)
# Map the actual UserRole enum values to discount percentages
RBAC_DISCOUNTS = {
UserRole.superadmin: 0.5, # 50% discount
UserRole.admin: 0.3, # 30% discount
UserRole.fleet_manager: 0.2, # 20% discount
UserRole.user: 0.0, # 0% discount
# Add other roles as needed
UserRole.region_admin: 0.25, # 25% discount
UserRole.country_admin: 0.25, # 25% discount
UserRole.moderator: 0.15, # 15% discount
UserRole.sales_agent: 0.1, # 10% discount
UserRole.service_owner: 0.1, # 10% discount
UserRole.driver: 0.0, # 0% discount
}
@classmethod
async def calculate_final_price(
cls,
db: AsyncSession,
base_amount: float,
country_code: str = "HU",
user_role: UserRole = UserRole.user,
individual_discounts: Optional[List[Dict[str, Any]]] = None
) -> float:
"""
Végső ár kiszámítása.
Args:
db: Database session
base_amount: Alapár (pl. szolgáltatás díja)
country_code: Országkód (pl. "HU", "GB")
user_role: Felhasználó RBAC rangja
individual_discounts: Egyedi kedvezmények listája
Returns:
Végső ár (float)
"""
# 1. Region multiplier
region_multiplier = cls.REGION_MULTIPLIERS.get(country_code.upper(), 1.0)
amount = base_amount * region_multiplier
# 2. RBAC discount
rbac_discount = cls.RBAC_DISCOUNTS.get(user_role, 0.0)
if rbac_discount > 0:
amount = amount * (1 - rbac_discount)
# 3. Individual discounts (e.g., promo codes, loyalty points)
if individual_discounts:
for discount in individual_discounts:
discount_type = discount.get("type")
discount_value = discount.get("value", 0)
if discount_type == "percentage":
amount = amount * (1 - discount_value / 100)
elif discount_type == "fixed":
amount = max(0, amount - discount_value)
elif discount_type == "multiplier":
amount = amount * discount_value
# 4. Round to 2 decimal places
amount = round(amount, 2)
logger.info(
f"Pricing calculation: base={base_amount}, country={country_code}, "
f"role={user_role}, final={amount}"
)
return amount
class SmartDeduction:
"""
Intelligens levonás a Quadruple Wallet rendszerből.
Levonási sorrend: VOUCHER → SERVICE_COINS/PURCHASED → EARNED
"""
@classmethod
async def deduct_from_wallets(
cls,
db: AsyncSession,
user_id: int,
amount: float
) -> Dict[str, float]:
"""
Összeg levonása a felhasználó pénztárcáiból intelligens sorrendben.
Args:
db: Database session
user_id: Felhasználó ID
amount: Levonandó összeg
Returns:
Dict: wallet_type -> used_amount
"""
# Get user's wallet
stmt = select(Wallet).where(Wallet.user_id == user_id)
result = await db.execute(stmt)
wallet = result.scalar_one_or_none()
if not wallet:
raise ValueError(f"Wallet not found for user_id={user_id}")
remaining = Decimal(str(amount))
used_amounts = {
"VOUCHER": 0.0,
"SERVICE_COINS": 0.0,
"PURCHASED": 0.0,
"EARNED": 0.0
}
print(f"[DEBUG] SmartDeduction.deduct_from_wallets: user_id={user_id}, amount={amount}, remaining={remaining}")
print(f"[DEBUG] Wallet before: purchased={wallet.purchased_credits}, earned={wallet.earned_credits}, service_coins={wallet.service_coins}")
# 1. VOUCHER levonás (FIFO)
if remaining > 0:
voucher_used = await cls._deduct_from_vouchers(db, wallet.id, remaining)
used_amounts["VOUCHER"] = float(voucher_used)
remaining -= Decimal(str(voucher_used))
print(f"[DEBUG] After VOUCHER: voucher_used={voucher_used}, remaining={remaining}")
# 2. SERVICE_COINS levonás
if remaining > 0 and wallet.service_coins >= remaining:
used_amounts["SERVICE_COINS"] = float(remaining)
wallet.service_coins -= remaining
remaining = Decimal('0')
print(f"[DEBUG] After SERVICE_COINS (full): used={remaining}, wallet.service_coins={wallet.service_coins}")
elif remaining > 0 and wallet.service_coins > 0:
used_amounts["SERVICE_COINS"] = float(wallet.service_coins)
remaining -= wallet.service_coins
wallet.service_coins = Decimal('0')
print(f"[DEBUG] After SERVICE_COINS (partial): used={wallet.service_coins}, remaining={remaining}, wallet.service_coins={wallet.service_coins}")
# 3. PURCHASED levonás
if remaining > 0 and wallet.purchased_credits >= remaining:
used_amounts["PURCHASED"] = float(remaining)
wallet.purchased_credits -= remaining
remaining = Decimal('0')
print(f"[DEBUG] After PURCHASED (full): used={remaining}, wallet.purchased_credits={wallet.purchased_credits}")
elif remaining > 0 and wallet.purchased_credits > 0:
used_amounts["PURCHASED"] = float(wallet.purchased_credits)
remaining -= wallet.purchased_credits
wallet.purchased_credits = Decimal('0')
print(f"[DEBUG] After PURCHASED (partial): used={wallet.purchased_credits}, remaining={remaining}, wallet.purchased_credits={wallet.purchased_credits}")
# 4. EARNED levonás (utolsó)
if remaining > 0 and wallet.earned_credits >= remaining:
used_amounts["EARNED"] = float(remaining)
wallet.earned_credits -= remaining
remaining = Decimal('0')
elif remaining > 0 and wallet.earned_credits > 0:
used_amounts["EARNED"] = float(wallet.earned_credits)
remaining -= wallet.earned_credits
wallet.earned_credits = Decimal('0')
# Check if we have enough funds
if remaining > 0:
raise ValueError(
f"Insufficient funds. User_id={user_id}, "
f"required={amount}, remaining={remaining}"
)
# Update wallet
logger.info(
f"Smart deduction completed for user_id={user_id}: "
f"total={amount}, used={used_amounts}"
)
return used_amounts
@classmethod
async def _deduct_from_vouchers(
cls,
db: AsyncSession,
wallet_id: int,
amount: Decimal
) -> Decimal:
"""
Voucher levonás FIFO elv szerint (legrégebbi lejáratú először).
Args:
db: Database session
wallet_id: Pénztárca ID
amount: Levonandó összeg
Returns:
Decimal: Voucherból felhasznált összeg
"""
# Get active vouchers ordered by expiry (FIFO)
stmt = (
select(ActiveVoucher)
.where(
and_(
ActiveVoucher.wallet_id == wallet_id,
ActiveVoucher.expires_at > datetime.utcnow()
)
)
.order_by(ActiveVoucher.expires_at.asc())
)
result = await db.execute(stmt)
vouchers = result.scalars().all()
remaining = amount
total_used = Decimal('0')
for voucher in vouchers:
if remaining <= 0:
break
voucher_amount = Decimal(str(voucher.amount))
if voucher_amount <= remaining:
# Use entire voucher
total_used += voucher_amount
remaining -= voucher_amount
await db.delete(voucher) # Voucher fully used
else:
# Use part of voucher
total_used += remaining
voucher.amount = voucher_amount - remaining
remaining = Decimal('0')
return total_used
@classmethod
async def process_voucher_expiration(cls, db: AsyncSession) -> Dict[str, Any]:
"""
Lejárt voucher-ek feldolgozása SZÉP-kártya modell szerint.
Dinamikus díj levonása, a maradék átcsoportosítás új lejárattal.
Returns:
Dict: Statisztikák a feldolgozásról
"""
now = datetime.utcnow()
# Get dynamic fee percentage from config service
fee_percent = await config.get_setting(db, "voucher_expiry_fee_percent", default=10.0)
fee_rate = Decimal(str(fee_percent)) / Decimal("100.0")
# Find expired vouchers with eager loading of wallet relationship
stmt = select(ActiveVoucher).where(ActiveVoucher.expires_at <= now).options(selectinload(ActiveVoucher.wallet))
result = await db.execute(stmt)
expired_vouchers = result.scalars().all()
stats = {
"total_expired": len(expired_vouchers),
"total_amount": 0.0,
"fee_collected": 0.0,
"rolled_over": 0.0,
"wallets_affected": set(),
"fee_percent": float(fee_percent)
}
for voucher in expired_vouchers:
original_amount = Decimal(str(voucher.original_amount))
current_amount = Decimal(str(voucher.amount))
# Calculate dynamic fee
fee = current_amount * fee_rate
rolled_over = current_amount - fee
# Get wallet for ledger entry
wallet = voucher.wallet
# Create FinancialLedger entry for the fee (platform revenue)
if fee > 0:
ledger_entry = FinancialLedger(
user_id=wallet.user_id,
amount=fee,
entry_type=LedgerEntryType.DEBIT,
wallet_type=WalletType.VOUCHER,
transaction_type="VOUCHER_EXPIRY_FEE",
details={
"description": f"Voucher expiry fee ({fee_percent}%)",
"reference_type": "VOUCHER_EXPIRY_FEE",
"reference_id": voucher.id,
"wallet_type": "VOUCHER",
"fee_percent": fee_percent
},
transaction_id=uuid.uuid4(),
balance_after=0, # Voucher balance after deletion is 0
currency="EUR"
)
db.add(ledger_entry)
# Create new voucher with new expiry (30 days from now) for rolled over amount
if rolled_over > 0:
new_expiry = now + timedelta(days=30)
new_voucher = ActiveVoucher(
wallet_id=wallet.id,
amount=rolled_over,
original_amount=rolled_over,
expires_at=new_expiry
)
db.add(new_voucher)
# Delete expired voucher
await db.delete(voucher)
# Update stats
stats["total_amount"] += float(current_amount)
stats["fee_collected"] += float(fee)
stats["rolled_over"] += float(rolled_over)
stats["wallets_affected"].add(wallet.id)
if expired_vouchers:
stats["wallets_affected"] = len(stats["wallets_affected"])
logger.info(
f"Voucher expiration processed: {stats['total_expired']} vouchers, "
f"fee_percent={fee_percent}%, fee={stats['fee_collected']}, rolled_over={stats['rolled_over']}"
)
return stats
class AtomicTransactionManager:
"""
Atomikus tranzakciókezelő double-entry könyveléssel.
Minden pénzmozgás rögzítésre kerül a FinancialLedger táblában.
"""
@classmethod
async def atomic_billing_transaction(
cls,
db: AsyncSession,
user_id: int,
amount: float,
description: str,
reference_type: Optional[str] = None,
reference_id: Optional[int] = None,
used_amounts: Optional[Dict[str, float]] = None,
beneficiary_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Atomikus számlázási tranzakció végrehajtása.
Args:
db: Database session
user_id: Felhasználó ID
amount: Összeg
description: Tranzakció leírása
reference_type: Referencia típus (pl. "service", "subscription")
reference_id: Referencia ID
used_amounts: Optional pre-calculated deduction amounts. If provided,
SmartDeduction.deduct_from_wallets will not be called.
Returns:
Dict: Tranzakció részletei
"""
transaction_id = uuid.uuid4()
async def execute_logic():
# Get user and wallet
user_stmt = select(User).where(User.id == user_id)
user_result = await db.execute(user_stmt)
user = user_result.scalar_one_or_none()
if not user:
raise ValueError(f"User not found: id={user_id}")
wallet_stmt = select(Wallet).where(Wallet.user_id == user_id)
wallet_result = await db.execute(wallet_stmt)
wallet = wallet_result.scalar_one_or_none()
if not wallet:
raise ValueError(f"Wallet not found for user: id={user_id}")
# Perform smart deduction if used_amounts not provided
if used_amounts is None:
deduction_result = await SmartDeduction.deduct_from_wallets(db, user_id, amount)
else:
# Validate that used_amounts matches the expected amount
total_used = sum(used_amounts.values())
if abs(total_used - amount) > 0.01: # Allow small floating point differences
raise ValueError(
f"Provided used_amounts ({total_used}) does not match expected amount ({amount})"
)
deduction_result = used_amounts
# Use deduction_result for ledger creation
used_amounts_for_ledger = deduction_result
# Create ledger entries for each wallet type used
for wallet_type_str, used_amount in used_amounts_for_ledger.items():
if used_amount > 0:
wallet_type = WalletType[wallet_type_str]
# DEBIT entry (money leaving the wallet)
debit_entry = FinancialLedger(
user_id=user_id,
amount=Decimal(str(used_amount)),
entry_type=LedgerEntryType.DEBIT,
wallet_type=wallet_type,
transaction_type=reference_type or "atomic_debit",
details={
"description": f"{description} - {wallet_type_str}",
"reference_type": reference_type,
"reference_id": reference_id,
"wallet_type": wallet_type_str,
},
transaction_id=transaction_id,
balance_after=await cls._get_wallet_balance(db, wallet, wallet_type),
currency="EUR"
)
db.add(debit_entry)
# CREDIT entry (money going to system revenue OR beneficiary)
is_internal_transfer = beneficiary_id is not None
credit_user_id = beneficiary_id if is_internal_transfer else user_id
credit_tx_type = "internal_transfer_credit" if is_internal_transfer else "system_revenue"
credit_desc = f"Transfer to beneficiary - {wallet_type_str}" if is_internal_transfer else f"System revenue - {wallet_type_str}"
credit_entry = FinancialLedger(
user_id=credit_user_id,
amount=Decimal(str(used_amount)),
entry_type=LedgerEntryType.CREDIT,
wallet_type=wallet_type,
transaction_type=credit_tx_type,
details={
"description": credit_desc,
"reference_type": reference_type,
"reference_id": reference_id,
"beneficiary_id": beneficiary_id
},
transaction_id=transaction_id,
balance_after=0, # Később fejlesztendő: pontos balance
currency="EUR"
)
db.add(credit_entry)
# Flush to generate IDs but let context manager commit
await db.flush()
transaction_details = {
"transaction_id": str(transaction_id),
"user_id": user_id,
"amount": amount,
"description": description,
"used_amounts": used_amounts_for_ledger,
"timestamp": datetime.utcnow().isoformat()
}
logger.info(
f"Atomic transaction completed: {transaction_id}, "
f"user={user_id}, amount={amount}"
)
return transaction_details
try:
# Start atomic transaction only if not already in one
if not db.in_transaction():
# No active transaction, start a new one
async with db.begin():
return await execute_logic()
else:
# Already in a transaction, execute logic within existing transaction
return await execute_logic()
except Exception as e:
logger.error(f"Atomic transaction failed: {e}")
raise
@classmethod
async def _get_wallet_balance(
cls,
db: AsyncSession,
wallet: Wallet,
wallet_type: WalletType
) -> Optional[float]:
"""
Get current balance for a specific wallet type.
"""
if wallet_type == WalletType.EARNED:
return float(wallet.earned_credits)
elif wallet_type == WalletType.PURCHASED:
return float(wallet.purchased_credits)
elif wallet_type == WalletType.SERVICE_COINS:
return float(wallet.service_coins)
elif wallet_type == WalletType.VOUCHER:
# Calculate total voucher balance
stmt = select(func.sum(ActiveVoucher.amount)).where(
and_(
ActiveVoucher.wallet_id == wallet.id,
ActiveVoucher.expires_at > datetime.utcnow()
)
)
result = await db.execute(stmt)
total_vouchers = result.scalar() or Decimal('0')
return float(total_vouchers)
return None
@classmethod
async def get_transaction_history(
cls,
db: AsyncSession,
user_id: Optional[int] = None,
transaction_id: Optional[uuid.UUID] = None,
limit: int = 100,
offset: int = 0
) -> List[Dict[str, Any]]:
"""
Tranzakció előzmények lekérdezése.
"""
# Build query
stmt = select(FinancialLedger)
# Apply filters
if user_id is not None:
stmt = stmt.where(FinancialLedger.user_id == user_id)
if transaction_id is not None:
stmt = stmt.where(FinancialLedger.transaction_id == transaction_id)
# Order by most recent first
stmt = stmt.order_by(FinancialLedger.created_at.desc())
# Apply pagination
stmt = stmt.offset(offset).limit(limit)
# Execute query
result = await db.execute(stmt)
ledger_entries = result.scalars().all()
# Convert to dictionary format
transactions = []
for entry in ledger_entries:
transactions.append({
"id": entry.id,
"user_id": entry.user_id,
"amount": float(entry.amount),
"entry_type": entry.entry_type.value,
"wallet_type": entry.wallet_type.value if entry.wallet_type else None,
"description": entry.description,
"transaction_id": str(entry.transaction_id),
"reference_type": entry.reference_type,
"reference_id": entry.reference_id,
"balance_after": float(entry.balance_after) if entry.balance_after else None,
"created_at": entry.created_at.isoformat() if entry.created_at else None
})
return transactions
@classmethod
async def get_wallet_summary(
cls,
db: AsyncSession,
user_id: int
) -> Dict[str, Any]:
"""
Pénztárca összegző információk lekérdezése.
"""
# Get wallet
stmt = select(Wallet).where(Wallet.user_id == user_id)
result = await db.execute(stmt)
wallet = result.scalar_one_or_none()
if not wallet:
raise ValueError(f"Wallet not found for user_id={user_id}")
# Calculate voucher balance
voucher_stmt = select(func.sum(ActiveVoucher.amount)).where(
and_(
ActiveVoucher.wallet_id == wallet.id,
ActiveVoucher.expires_at > datetime.utcnow()
)
)
voucher_result = await db.execute(voucher_stmt)
voucher_balance = voucher_result.scalar() or Decimal('0')
# Get recent transactions
recent_transactions = await cls.get_transaction_history(db, user_id=user_id, limit=10)
return {
"wallet_id": wallet.id,
"balances": {
"earned": float(wallet.earned_credits),
"purchased": float(wallet.purchased_credits),
"service_coins": float(wallet.service_coins),
"voucher": float(voucher_balance),
"total": float(
wallet.earned_credits +
wallet.purchased_credits +
wallet.service_coins +
voucher_balance
)
},
"recent_transactions": recent_transactions,
"last_updated": datetime.utcnow().isoformat()
}
# Helper function for easy access
async def create_billing_transaction(
db: AsyncSession,
user_id: int,
amount: float,
description: str,
reference_type: Optional[str] = None,
reference_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Segédfüggvény számlázási tranzakció létrehozásához.
"""
return await AtomicTransactionManager.atomic_billing_transaction(
db, user_id, amount, description, reference_type, reference_id
)
async def calculate_price(
db: AsyncSession,
base_amount: float,
country_code: str = "HU",
user_role: UserRole = UserRole.user,
individual_discounts: Optional[List[Dict[str, Any]]] = None
) -> float:
"""
Segédfüggvény ár kiszámításához.
"""
return await PricingCalculator.calculate_final_price(
db, base_amount, country_code, user_role, individual_discounts
)
async def get_wallet_info(
db: AsyncSession,
user_id: int
) -> Dict[str, Any]:
"""
Segédfüggvény pénztárca információk lekérdezéséhez.
"""
return await AtomicTransactionManager.get_wallet_summary(db, user_id)