Files
service-finder/backend/app/scripts/unified_db_sync.py
2026-03-13 10:22:41 +00:00

135 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import asyncio
import importlib
import os
import sys
import uuid
import enum
from pathlib import Path
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import inspect, text, UniqueConstraint, Index, Enum as SQLEnum
from sqlalchemy.schema import CreateTable
from sqlalchemy.sql import func
# Backend hozzáadása a path-hoz
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from app.database import Base
from app.core.config import settings
def dynamic_import_models():
models_dir = Path(__file__).parent.parent / "models"
for py_file in models_dir.glob("*.py"):
if py_file.name == "__init__.py": continue
module_name = f"app.models.{py_file.stem}"
try:
importlib.import_module(module_name)
print(f"✅ Imported {module_name}")
except Exception as e:
print(f"⚠️ Could not import {module_name}: {e}")
import app.models
print(f"📦 Total tables in Base.metadata: {len(Base.metadata.tables)}")
async def compare_and_repair(apply: bool = False):
print(f"🔗 Connecting to database... (Apply mode: {apply})")
engine = create_async_engine(str(settings.SQLALCHEMY_DATABASE_URI))
def get_diff_and_repair(connection):
inspector = inspect(connection)
# 1. KITERJESZTÉSEK ÉS SÉMÁK
schemas = sorted({t.schema for t in Base.metadata.sorted_tables if t.schema})
db_schemas = inspector.get_schema_names()
if apply:
print("🔧 Ensuring extensions and schemas...")
connection.execute(text('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"'))
try: connection.execute(text('CREATE EXTENSION IF NOT EXISTS "postgis"'))
except Exception: pass
for schema in schemas:
if schema not in db_schemas:
connection.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema}"'))
# Search path beállítása a típusok felismeréséhez
search_path = ", ".join([f'"{s}"' for s in schemas]) + ", public"
connection.execute(text(f"SET search_path TO {search_path}"))
# 2. OKOS ENUM LÉTREHOZÁS (Case-Insensitive támogatással)
print("🔧 Checking custom Enum types...")
for table in Base.metadata.sorted_tables:
for col in table.columns:
if isinstance(col.type, SQLEnum):
enum_name = col.type.name
schema = table.schema or 'public'
check_enum = connection.execute(text(
"SELECT 1 FROM pg_type t JOIN pg_namespace n ON n.oid = t.typnamespace "
"WHERE t.typname = :name AND n.nspname = :schema"
), {"name": enum_name, "schema": schema}).fetchone()
if not check_enum and apply:
# TRÜKK: Hozzáadjuk a kis- és nagybetűs változatokat is, hogy ne bukjon el a DEFAULT-on
all_variants = set()
for val in col.type.enums:
all_variants.add(val)
all_variants.add(val.lower())
all_variants.add(val.upper())
labels = ", ".join([f"'{l}'" for l in sorted(list(all_variants))])
print(f" Creating Enum {schema}.{enum_name} with variants...")
connection.execute(text(f'CREATE TYPE "{schema}"."{enum_name}" AS ENUM ({labels})'))
# 3. TÁBLÁK ÉS OSZLOPOK SZINKRONIZÁLÁSA
db_tables_cache = {s: inspector.get_table_names(schema=s) for s in schemas}
db_tables_cache[None] = inspector.get_table_names()
for table in Base.metadata.sorted_tables:
if table.name not in db_tables_cache.get(table.schema, []):
print(f"❌ Missing table: {table.schema}.{table.name}")
if apply:
try:
connection.execute(CreateTable(table))
print(f"✅ Table {table.schema}.{table.name} created.")
except Exception as e:
print(f"🔥 Error creating {table.name}: {e}")
continue
# Oszlop szinkronizálás
db_cols = {c['name']: c for c in inspector.get_columns(table.name, schema=table.schema)}
for col in table.columns:
if col.name not in db_cols:
col_type = col.type.compile(dialect=connection.dialect)
sql = f'ALTER TABLE "{table.schema}"."{table.name}" ADD COLUMN "{col.name}" {col_type}'
if not col.nullable: sql += " NOT NULL"
if col.default is not None:
arg = col.default.arg
if callable(arg):
if "uuid" in col.name.lower(): sql += " DEFAULT gen_random_uuid()"
elif "now" in str(arg).lower(): sql += " DEFAULT NOW()"
elif isinstance(arg, enum.Enum):
sql += f" DEFAULT '{arg.value}'"
else:
val = f"'{arg}'" if isinstance(arg, str) else arg
sql += f" DEFAULT {val}"
print(f"⚠️ Adding column: {table.schema}.{table.name}.{col.name}")
if apply: connection.execute(text(sql))
print("\n--- ✅ Synchronization cycle complete. ---")
async with engine.begin() as conn:
await conn.run_sync(get_diff_and_repair)
await engine.dispose()
async def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--apply', action='store_true')
args = parser.parse_args()
dynamic_import_models()
await compare_and_repair(apply=args.apply)
print("\n✨ Minden tábla és típus szinkronizálva!")
if __name__ == "__main__":
asyncio.run(main())