Files
service-finder/backend/app/workers/service_hunter.py

161 lines
7.3 KiB
Python

import asyncio
import httpx
import logging
import os
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text
from app.db.session import SessionLocal
# Modellek - Az új v1.3 struktúra
from app.models.service import ServiceStaging, DiscoveryParameter
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Robot-v1.3-ContinentalScout")
class ServiceHunter:
"""
Robot v1.3.0: Continental Scout.
EU-szintű felderítő motor, Discovery tábla alapú vezérléssel.
"""
OVERPASS_URL = "http://overpass-api.de/api/interpreter"
PLACES_NEW_URL = "https://places.googleapis.com/v1/places:searchNearby"
GEOCODE_URL = "https://maps.googleapis.com/maps/api/geocode/json"
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
@classmethod
async def get_coordinates(cls, city, country_code):
"""Város központjának lekérése a keresés indításához."""
params = {"address": f"{city}, {country_code}", "key": cls.GOOGLE_API_KEY}
async with httpx.AsyncClient() as client:
resp = await client.get(cls.GEOCODE_URL, params=params)
if resp.status_code == 200:
results = resp.json().get("results")
if results:
loc = results[0]["geometry"]["location"]
return loc["lat"], loc["lng"]
return None, None
@classmethod
async def get_google_places(cls, lat, lon, keyword):
"""Google Places New API - Javított, 400-as hiba elleni védelemmel."""
if not cls.GOOGLE_API_KEY: return []
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": cls.GOOGLE_API_KEY,
"X-Goog-FieldMask": "places.displayName,places.id,places.types,places.internationalPhoneNumber,places.websiteUri,places.formattedAddress"
}
# A 'keyword' a TextQuery-hez kellene, a SearchNearby-nél típusokat (includedTypes) használunk.
# EU szintű trükk: Ha nincs pontos típus, a 'car_repair' az alapértelmezett.
payload = {
"includedTypes": ["car_repair", "gas_station", "car_wash", "motorcycle_repair"],
"maxResultCount": 20,
"locationRestriction": {
"circle": {
"center": {"latitude": lat, "longitude": lon},
"radius": 5000.0 # 5km körzet
}
}
}
async with httpx.AsyncClient() as client:
resp = await client.post(cls.PLACES_NEW_URL, json=payload, headers=headers)
if resp.status_code == 200:
return resp.json().get("places", [])
else:
logger.error(f"❌ Google API hiba ({resp.status_code}): {resp.text}")
return []
@classmethod
async def save_to_staging(cls, db: AsyncSession, data: dict):
"""Mentés a Staging táblába 9-mezős bontással."""
stmt = select(ServiceStaging).where(ServiceStaging.external_id == str(data['external_id']))
if (await db.execute(stmt)).scalar_one_or_none(): return
new_entry = ServiceStaging(
name=data['name'],
source=data['source'],
external_id=str(data['external_id']),
# Itt történik a 9-mezős bontás (ha érkezik adat)
postal_code=data.get('zip'),
city=data.get('city'),
street_name=data.get('street'),
street_type=data.get('street_type', 'utca'),
house_number=data.get('number'),
full_address=data.get('full_address'),
contact_phone=data.get('phone'),
website=data.get('website'),
raw_data=data.get('raw', {}),
status="pending",
trust_score=data.get('trust', 10)
)
db.add(new_entry)
@classmethod
async def run(cls):
logger.info("🤖 Robot v1.3.0: Continental Scout elindult...")
while True:
async with SessionLocal() as db:
try:
await db.execute(text("SET search_path TO data, public"))
# 1. Paraméterek lekérése a táblából
stmt = select(DiscoveryParameter).where(DiscoveryParameter.is_active == True)
tasks = (await db.execute(stmt)).scalars().all()
for task in tasks:
logger.info(f"🔎 Felderítés: {task.city} ({task.country_code}) -> {task.keyword}")
# Koordináták beszerzése a kereséshez
lat, lon = await cls.get_coordinates(task.city, task.country_code)
if not lat: continue
# --- GOOGLE FÁZIS ---
google_places = await cls.get_google_places(lat, lon, task.keyword)
for p in google_places:
await cls.save_to_staging(db, {
"external_id": p.get('id'),
"name": p.get('displayName', {}).get('text'),
"full_address": p.get('formattedAddress'),
"phone": p.get('internationalPhoneNumber'),
"website": p.get('websiteUri'),
"source": "google",
"raw": p,
"trust": 30
})
# --- OSM FÁZIS (EU kompatibilis lekérdezés) ---
osm_query = f"""[out:json][timeout:60];
(nwr["amenity"~"car_repair|fuel"](around:5000, {lat}, {lon}););
out center;"""
async with httpx.AsyncClient() as client:
resp = await client.post(cls.OVERPASS_URL, data={"data": osm_query})
if resp.status_code == 200:
for el in resp.json().get("elements", []):
t = el.get("tags", {})
await cls.save_to_staging(db, {
"external_id": f"osm_{el['id']}",
"name": t.get('name', 'Ismeretlen szerviz'),
"city": t.get('addr:city', task.city),
"zip": t.get('addr:postcode'),
"street": t.get('addr:street'),
"number": t.get('addr:housenumber'),
"source": "osm",
"raw": el,
"trust": 15
})
task.last_run_at = datetime.now(timezone.utc)
await db.commit()
logger.info(f"{task.city} felderítve.")
except Exception as e:
logger.error(f"💥 Kritikus hiba a ciklusban: {e}")
logger.info("😴 Minden aktív feladat kész. Alvás 1 órán át...")
await asyncio.sleep(3600)
if __name__ == "__main__":
asyncio.run(ServiceHunter.run())