115 lines
4.8 KiB
Python
115 lines
4.8 KiB
Python
# /opt/docker/dev/service_finder/backend/app/scripts/generate_db_map.py
|
|
#!/usr/bin/env python3
|
|
import asyncio
|
|
import importlib
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from sqlalchemy.ext.asyncio import create_async_engine
|
|
from sqlalchemy import inspect
|
|
|
|
# ==============================================================================
|
|
# THOUGHT PROCESS (Gondolatmenet):
|
|
# 1. Biztonság: Nem módosítjuk a működő sync_engine.py-t, hanem új eszközt hozunk létre.
|
|
# 2. Útvonal (Path) dinamikus feloldása: Ahelyett, hogy fixen bedrótoznánk a
|
|
# '/app/docs/v02' útvonalat (ami Docker környezetben eltérhet), a Path(__file__)
|
|
# segítségével "visszamászunk" a könyvtárfában.
|
|
# Útvonal: scripts -> app -> backend -> service_finder -> docs/v02
|
|
# 3. Modellek betöltése: Újrahasznosítjuk a bevált dynamic_import_models() logikát,
|
|
# hogy a Base.metadata biztosan tartalmazzon minden táblát.
|
|
# ==============================================================================
|
|
|
|
# Alap elérési út beállítása
|
|
current_file = Path(__file__).resolve()
|
|
backend_dir = current_file.parent.parent.parent
|
|
project_root = backend_dir.parent
|
|
docs_dir = project_root / "docs" / "v02"
|
|
|
|
sys.path.insert(0, str(backend_dir))
|
|
|
|
from app.database import Base
|
|
from app.core.config import settings
|
|
|
|
def dynamic_import_models():
|
|
"""Modellek betöltése a Metadata feltöltéséhez (a sync_engine.py alapján)."""
|
|
models_dir = current_file.parent.parent / "models"
|
|
for py_file in models_dir.rglob("*.py"):
|
|
if py_file.name == "__init__.py": continue
|
|
relative_path = py_file.relative_to(models_dir)
|
|
module_stem = str(relative_path).replace('/', '.').replace('\\', '.')[:-3]
|
|
module_name = f"app.models.{module_stem}"
|
|
try:
|
|
importlib.import_module(module_name)
|
|
except Exception:
|
|
pass
|
|
|
|
async def generate_markdown():
|
|
engine = create_async_engine(str(settings.SQLALCHEMY_DATABASE_URI))
|
|
|
|
# Biztosítjuk, hogy a célkönyvtár létezik
|
|
docs_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"database_schema_{timestamp}.md"
|
|
filepath = docs_dir / filename
|
|
|
|
markdown_content = "# 🗺️ Service Finder Adatbázis Térkép\n\n"
|
|
markdown_content += f"> Generálva: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
|
|
|
|
def inspect_db(connection):
|
|
nonlocal markdown_content
|
|
inspector = inspect(connection)
|
|
metadata = Base.metadata
|
|
|
|
# Csak azokat a sémákat nézzük, amikben vannak modelljeink
|
|
model_schemas = sorted({t.schema for t in metadata.sorted_tables if t.schema})
|
|
|
|
for schema in model_schemas:
|
|
markdown_content += f"## Séma: `{schema}`\n\n"
|
|
db_tables = inspector.get_table_names(schema=schema)
|
|
|
|
if not db_tables:
|
|
markdown_content += "*Ebben a sémában még nincsenek táblák az adatbázisban.*\n\n"
|
|
continue
|
|
|
|
for table_name in sorted(db_tables):
|
|
# Oszlopok kinyerése a valós adatbázisból
|
|
columns = inspector.get_columns(table_name, schema=schema)
|
|
pk_constraint = inspector.get_pk_constraint(table_name, schema=schema)
|
|
pks = pk_constraint.get('constrained_columns', [])
|
|
fk_constraints = inspector.get_foreign_keys(table_name, schema=schema)
|
|
fk_cols = [col for fk in fk_constraints for col in fk.get('constrained_columns', [])]
|
|
|
|
markdown_content += f"### Tábla: `{table_name}`\n"
|
|
markdown_content += "| Oszlop | Típus | Nullable | Alapértelmezett | Extrák |\n"
|
|
markdown_content += "| :--- | :--- | :--- | :--- | :--- |\n"
|
|
|
|
for col in columns:
|
|
extras_list = []
|
|
if col['name'] in pks:
|
|
extras_list.append("🔑 PK")
|
|
if col['name'] in fk_cols:
|
|
extras_list.append("🔗 FK")
|
|
|
|
extras = " ".join(extras_list)
|
|
default_val = f"`{col['default']}`" if col.get('default') else ""
|
|
|
|
markdown_content += f"| **{col['name']}** | `{str(col['type'])}` | {col['nullable']} | {default_val} | {extras} |\n"
|
|
|
|
markdown_content += "\n"
|
|
|
|
async with engine.connect() as conn:
|
|
await conn.run_sync(inspect_db)
|
|
await engine.dispose()
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(markdown_content)
|
|
|
|
print(f"✅ Adatbázis térkép sikeresen legenerálva ide: {filepath}")
|
|
|
|
async def main():
|
|
dynamic_import_models()
|
|
await generate_markdown()
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |