import asyncio import httpx import logging import os import datetime import random import sys from sqlalchemy import select, and_, update, text, func from sqlalchemy.ext.asyncio import AsyncSession from app.db.session import SessionLocal from app.models.vehicle_definitions import VehicleModelDefinition from app.models.asset import AssetCatalog from app.services.ai_service import AIService from duckduckgo_search import DDGS # --- SZIGORÚ NAPLÓZÁS KONFIGURÁCIÓ --- for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) logging.basicConfig( level=logging.INFO, format='%(asctime)s.%(msecs)03d [%(levelname)s] Alchemist: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', stream=sys.stdout ) logger = logging.getLogger("Robot-Enricher-v1.3.0") class TechEnricher: """ Industrial TechEnricher v1.3.0 - Fix: Deadlock elkerülése izolált session-kezeléssel. - Logika: Napi 500 AI hívás, Smart Merge, Web Fallback. """ def __init__(self): self.max_attempts = 5 self.batch_size = 15 self.daily_ai_limit = 500 self.ai_calls_today = 0 self.last_reset_date = datetime.date.today() def check_budget(self) -> bool: if datetime.date.today() > self.last_reset_date: self.ai_calls_today = 0 self.last_reset_date = datetime.date.today() return self.ai_calls_today < self.daily_ai_limit def is_data_sane(self, data: dict) -> bool: try: if not data: return False ccm = int(data.get("ccm", 0) or 0) kw = int(data.get("kw", 0) or 0) if ccm > 15000 or kw > 2000: return False return True except: return False async def get_web_wisdom(self, make: str, model: str) -> str: """Keresés a neten izolált szálon (nem blokkolja az aszinkron loopot).""" query = f"{make} {model} technical specs maintenance oil qty tire size" try: def sync_search(): with DDGS() as ddgs: return "\n".join([r['body'] for r in ddgs.text(query, max_results=3)]) return await asyncio.to_thread(sync_search) except Exception as e: logger.warning(f"🌐 Web hiba ({make}): {e}") return "" async def process_single_record(self, record_id: int): """ Dúsítási folyamat 3 szigorúan elválasztott lépésben a fagyás ellen: 1. Adat lekérése és DB bezárása. 2. AI munka (DB nélkül). 3. Mentés új sessionben. """ # --- 1. LÉPÉS: ADAT LEKÉRÉSE --- async with SessionLocal() as db: stmt = select(VehicleModelDefinition).where(VehicleModelDefinition.id == record_id) res = await db.execute(stmt) rec = res.scalar_one_or_none() if not rec: return make, m_name, v_type = rec.make, rec.marketing_name, (rec.vehicle_type or "car") logger.info(f"🧪 >>> Dúsítás indítása: {make} {m_name}") # --- 2. LÉPÉS: AI MUNKA (DB session itt nincs nyitva!) --- try: # AIService hívása a kötelező 4. 'sources' paraméterrel ai_data = await AIService.get_clean_vehicle_data(make, m_name, v_type, {}) if not ai_data or not ai_data.get("kw"): logger.info(f"🔍 AI bizonytalan, webes dúsítás indul: {make} {m_name}") web_info = await self.get_web_wisdom(make, m_name) ai_data = await AIService.get_clean_vehicle_data(make, m_name, v_type, {"web_context": web_info}) if not ai_data: raise ValueError("Az AI nem adott értékelhető választ.") # --- 3. LÉPÉS: MENTÉS (Új session nyitása) --- async with SessionLocal() as db: # MDM (AssetCatalog) Smart Merge cat_stmt = select(AssetCatalog).where(and_( AssetCatalog.make == make.upper(), AssetCatalog.model == ai_data.get("marketing_name", m_name)[:50], AssetCatalog.power_kw == ai_data.get("kw") )).limit(1) if not (await db.execute(cat_stmt)).scalar_one_or_none(): db.add(AssetCatalog( make=make.upper(), model=ai_data.get("marketing_name", m_name)[:50], power_kw=ai_data.get("kw"), engine_capacity=ai_data.get("ccm"), factory_data=ai_data )) logger.info(f"✅ Mentve az MDM-be: {make} {m_name}") # Staging frissítése await db.execute( update(VehicleModelDefinition) .where(VehicleModelDefinition.id == record_id) .values( status="ai_enriched", technical_code=ai_data.get("technical_code") or f"GEN-{record_id}", engine_capacity=ai_data.get("ccm"), power_kw=ai_data.get("kw"), updated_at=func.now() ) ) await db.commit() self.ai_calls_today += 1 except Exception as e: logger.error(f"🚨 Hiba a(z) {record_id} rekordnál: {e}") async with SessionLocal() as db: await db.execute(update(VehicleModelDefinition).where(VehicleModelDefinition.id == record_id).values( attempts=VehicleModelDefinition.attempts + 1, last_error=str(e)[:200], status=text("CASE WHEN attempts >= 4 THEN 'suspended' ELSE 'unverified' END"), updated_at=func.now() )) await db.commit() async def run(self): logger.info(f"🚀 Robot 2 v1.3.0 ONLINE (Limit: {self.daily_ai_limit})") while True: if not self.check_budget(): await asyncio.sleep(3600); continue try: async with SessionLocal() as db: # Csak az ID-kat kérjük le, hogy ne tartsuk nyitva a session-t a dúsítás alatt stmt = select(VehicleModelDefinition.id).where(and_( VehicleModelDefinition.status == "unverified", VehicleModelDefinition.attempts < self.max_attempts )).limit(self.batch_size) ids = [r[0] for r in (await db.execute(stmt)).fetchall()] if not ids: await asyncio.sleep(60); continue logger.info(f"📦 Batch indul: {len(ids)} rekord.") for rid in ids: await self.process_single_record(rid) await asyncio.sleep(random.uniform(10.0, 30.0)) # VGA kímélése except Exception as e: logger.error(f"🚨 Főciklus hiba: {e}") await asyncio.sleep(30) if __name__ == "__main__": enricher = TechEnricher() asyncio.run(enricher.run())