439 lines
17 KiB
Python
439 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Robot Health & Integrity Audit Script - Recursive Deep Integrity Audit
|
||
|
||
Ez a szkript automatikusan diagnosztizálja az összes robotunk (Scout, Enricher, Validator, Auditor)
|
||
üzembiztonságát rekurzív felfedezéssel. A következő ellenőrzéseket végzi el:
|
||
|
||
1. Auto-Discovery: Rekurzívan bejárja a `backend/app/workers/` teljes könyvtárszerkezetét
|
||
2. Identification: Minden `.py` fájlt, ami nem `__init__.py` és nem segédfájl, kezel robotként/worker-ként
|
||
3. Deep Import Test: Megpróbálja importálni mindet, különös figyelemmel a kritikus modulokra
|
||
4. Model Sync 2.0: Ellenőrzi, hogy az összes robot a helyes modelleket használja-e
|
||
5. Interface Standardizálás: Ellenőrzi a `run()` metódus jelenlétét
|
||
6. Kategorizált jelentés: Service, Vehicle General, Vehicle Special, System & OCR kategóriák
|
||
"""
|
||
|
||
import sys
|
||
import importlib
|
||
import inspect
|
||
import asyncio
|
||
from pathlib import Path
|
||
from typing import List, Dict, Any, Tuple
|
||
import logging
|
||
import re
|
||
|
||
# Setup logging
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
|
||
logger = logging.getLogger("Robot-Integrity-Audit")
|
||
|
||
# Root directory for workers (relative to backend/app)
|
||
WORKERS_ROOT = Path(__file__).parent.parent / "workers"
|
||
|
||
# Exclusion patterns for non-robot files
|
||
EXCLUDE_PATTERNS = [
|
||
"__init__.py",
|
||
"__pycache__",
|
||
".pyc",
|
||
"test_",
|
||
"mapping_",
|
||
"config",
|
||
"dictionary",
|
||
"rules",
|
||
"report",
|
||
"monitor_",
|
||
"py_to_database",
|
||
"README",
|
||
# Files with dots in name (not valid Python module names)
|
||
r".*\..*\.py", # Matches files like "something.1.0.py"
|
||
]
|
||
|
||
# Categorization patterns
|
||
CATEGORY_PATTERNS = {
|
||
"Service Robots": [
|
||
r"service_robot_\d+",
|
||
r"service/.*\.py$",
|
||
],
|
||
"Vehicle General": [
|
||
r"vehicle_robot_[0-4]_.*",
|
||
r"R[0-4]_.*\.py$",
|
||
r"vehicle_robot_1_[245]_.*", # NHTSA, Heavy EU, GB
|
||
r"vehicle_robot_2_.*", # RDW, AutoData
|
||
],
|
||
"Vehicle Special": [
|
||
r"bike_.*\.py$",
|
||
r"vehicle_ultimate_.*\.py$",
|
||
r"ultimatespecs/.*\.py$",
|
||
],
|
||
"System & OCR": [
|
||
r"system_.*\.py$",
|
||
r"subscription_.*\.py$",
|
||
r"ocr/.*\.py$",
|
||
],
|
||
}
|
||
|
||
def discover_robot_files() -> List[Tuple[str, Path, str]]:
|
||
"""
|
||
Recursively discover all robot files in the workers directory.
|
||
Returns list of (module_name, file_path, category) tuples.
|
||
"""
|
||
robot_files = []
|
||
|
||
for py_file in WORKERS_ROOT.rglob("*.py"):
|
||
# Skip excluded files
|
||
file_name = py_file.name
|
||
# Check for simple pattern matches
|
||
skip = False
|
||
for pattern in EXCLUDE_PATTERNS:
|
||
if pattern.startswith('r.') and len(pattern) > 2:
|
||
# Regex pattern (simplified)
|
||
if re.match(pattern[2:], file_name):
|
||
skip = True
|
||
break
|
||
elif pattern in file_name:
|
||
skip = True
|
||
break
|
||
|
||
# Also skip files with multiple dots in name (not valid Python modules)
|
||
if file_name.count('.') > 1: # e.g., "something.1.0.py"
|
||
skip = True
|
||
|
||
if skip:
|
||
continue
|
||
|
||
# Skip directories
|
||
if not py_file.is_file():
|
||
continue
|
||
|
||
# Calculate module name (relative to backend/app)
|
||
try:
|
||
rel_path = py_file.relative_to(Path(__file__).parent.parent)
|
||
# Convert path parts to module names, handling dots in filenames
|
||
module_parts = []
|
||
for part in rel_path.parts:
|
||
if part.endswith('.py'):
|
||
part = part[:-3] # Remove .py
|
||
# Replace dots with underscores in filename (e.g., "1.0" -> "1_0")
|
||
part = part.replace('.', '_')
|
||
module_parts.append(part)
|
||
|
||
# Add 'app' prefix since we're in backend/app directory
|
||
module_name = "app." + ".".join(module_parts)
|
||
|
||
# Determine category
|
||
category = "Uncategorized"
|
||
for cat_name, patterns in CATEGORY_PATTERNS.items():
|
||
for pattern in patterns:
|
||
if re.search(pattern, str(rel_path), re.IGNORECASE):
|
||
category = cat_name
|
||
break
|
||
if category != "Uncategorized":
|
||
break
|
||
|
||
robot_files.append((module_name, py_file, category))
|
||
|
||
except ValueError as e:
|
||
logger.warning(f"Could not determine module for {py_file}: {e}")
|
||
|
||
# Sort by category and module name
|
||
robot_files.sort(key=lambda x: (x[2], x[0]))
|
||
return robot_files
|
||
|
||
async def test_import(module_name: str) -> Tuple[bool, str]:
|
||
"""Try to import a robot module and return (success, error_message)."""
|
||
try:
|
||
module = importlib.import_module(module_name)
|
||
logger.info(f"✅ {module_name} import successful")
|
||
return True, ""
|
||
except ImportError as e:
|
||
error_msg = f"ImportError: {e}"
|
||
logger.error(f"❌ {module_name} import failed: {e}")
|
||
return False, error_msg
|
||
except SyntaxError as e:
|
||
error_msg = f"SyntaxError at line {e.lineno}: {e.msg}"
|
||
logger.error(f"❌ {module_name} syntax error: {e}")
|
||
return False, error_msg
|
||
except Exception as e:
|
||
error_msg = f"Exception: {type(e).__name__}: {e}"
|
||
logger.error(f"❌ {module_name} import failed: {e}")
|
||
return False, error_msg
|
||
|
||
async def check_model_sync(module_name: str) -> List[str]:
|
||
"""Check if a robot uses correct model references."""
|
||
errors = []
|
||
try:
|
||
module = importlib.import_module(module_name)
|
||
|
||
# Get all classes in the module
|
||
classes = [cls for name, cls in inspect.getmembers(module, inspect.isclass)
|
||
if not name.startswith('_')]
|
||
|
||
for cls in classes:
|
||
# Check class source code for model references
|
||
try:
|
||
source = inspect.getsource(cls)
|
||
|
||
# Look for common model name issues
|
||
old_patterns = [
|
||
r"VehicleModelDefinitions", # Plural mistake
|
||
r"vehicle_model_definitions", # Old table name
|
||
r"ExternalReferenceQueues", # Plural mistake
|
||
]
|
||
|
||
for pattern in old_patterns:
|
||
if re.search(pattern, source):
|
||
errors.append(f"⚠️ {module_name}.{cls.__name__} uses old pattern: {pattern}")
|
||
|
||
except (OSError, TypeError):
|
||
pass # Can't get source for built-in or C extensions
|
||
|
||
except Exception as e:
|
||
# If we can't import, this will be caught in import test
|
||
pass
|
||
|
||
return errors
|
||
|
||
async def test_robot_interface(module_name: str) -> Tuple[bool, List[str]]:
|
||
"""Test if a robot has a proper interface (run method, etc.)."""
|
||
interface_issues = []
|
||
|
||
try:
|
||
module = importlib.import_module(module_name)
|
||
|
||
# Find the main robot class (usually ends with the module name or contains 'Robot')
|
||
classes = [cls for name, cls in inspect.getmembers(module, inspect.isclass)
|
||
if not name.startswith('_')]
|
||
|
||
if not classes:
|
||
interface_issues.append("No classes found")
|
||
return False, interface_issues
|
||
|
||
main_class = None
|
||
for cls in classes:
|
||
cls_name = cls.__name__
|
||
# Heuristic: class name contains 'Robot' or matches file name pattern
|
||
if 'Robot' in cls_name or cls_name.lower().replace('_', '') in module_name.lower().replace('_', ''):
|
||
main_class = cls
|
||
break
|
||
|
||
if main_class is None:
|
||
main_class = classes[0] # Fallback to first class
|
||
|
||
# Check for run/execute/process method (can be classmethod or instance method)
|
||
has_run_method = hasattr(main_class, 'run')
|
||
has_execute_method = hasattr(main_class, 'execute')
|
||
has_process_method = hasattr(main_class, 'process')
|
||
|
||
if not (has_run_method or has_execute_method or has_process_method):
|
||
interface_issues.append(f"No run/execute/process method in {main_class.__name__}")
|
||
else:
|
||
# Log which method is found
|
||
if has_run_method:
|
||
run_method = getattr(main_class, 'run')
|
||
# Check if it's a classmethod or instance method
|
||
if inspect.ismethod(run_method) and run_method.__self__ is main_class:
|
||
logger.debug(f"✅ {module_name}.{main_class.__name__}.run is classmethod")
|
||
elif inspect.iscoroutinefunction(run_method):
|
||
logger.debug(f"✅ {module_name}.{main_class.__name__}.run is async")
|
||
else:
|
||
logger.debug(f"ℹ️ {module_name}.{main_class.__name__}.run is sync")
|
||
|
||
# Try to instantiate only if the class appears to be instantiable (not abstract)
|
||
# Check if class has __init__ that doesn't require special arguments
|
||
try:
|
||
# First check if class can be instantiated with no arguments
|
||
sig = inspect.signature(main_class.__init__)
|
||
params = list(sig.parameters.keys())
|
||
# If only 'self' parameter, it's instantiable
|
||
if len(params) == 1: # only self
|
||
instance = main_class()
|
||
interface_issues.append(f"Instantiation successful")
|
||
else:
|
||
interface_issues.append(f"Instantiation requires arguments, skipping")
|
||
except (TypeError, AttributeError):
|
||
# __init__ may not be standard, try anyway
|
||
try:
|
||
instance = main_class()
|
||
interface_issues.append(f"Instantiation successful")
|
||
except Exception as e:
|
||
interface_issues.append(f"Instantiation failed (expected): {e}")
|
||
|
||
# If we found at least one of the required methods, consider interface OK
|
||
interface_ok = has_run_method or has_execute_method or has_process_method
|
||
|
||
return interface_ok, interface_issues
|
||
|
||
except Exception as e:
|
||
interface_issues.append(f"Interface test error: {e}")
|
||
return False, interface_issues
|
||
|
||
async def check_syntax_errors(file_path: Path) -> List[str]:
|
||
"""Check for syntax errors by attempting to compile the file."""
|
||
errors = []
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
source = f.read()
|
||
compile(source, str(file_path), 'exec')
|
||
except SyntaxError as e:
|
||
errors.append(f"Syntax error at line {e.lineno}: {e.msg}")
|
||
except Exception as e:
|
||
errors.append(f"Compilation error: {e}")
|
||
return errors
|
||
|
||
async def generate_categorized_report(results: Dict) -> str:
|
||
"""Generate a categorized audit report."""
|
||
report_lines = []
|
||
report_lines.append("# 🤖 Robot Integrity Audit Report")
|
||
report_lines.append(f"Generated: {importlib.import_module('datetime').datetime.now().isoformat()}")
|
||
report_lines.append(f"Total robots discovered: {results['total_robots']}")
|
||
report_lines.append("")
|
||
|
||
for category in ["Service Robots", "Vehicle General", "Vehicle Special", "System & OCR", "Uncategorized"]:
|
||
cat_robots = [r for r in results['robots'] if r['category'] == category]
|
||
if not cat_robots:
|
||
continue
|
||
|
||
report_lines.append(f"## {category}")
|
||
report_lines.append(f"**Count:** {len(cat_robots)}")
|
||
|
||
# Statistics
|
||
import_success = sum(1 for r in cat_robots if r['import_success'])
|
||
syntax_success = sum(1 for r in cat_robots if not r['syntax_errors'])
|
||
interface_ok = sum(1 for r in cat_robots if r['interface_ok'])
|
||
|
||
report_lines.append(f"- Import successful: {import_success}/{len(cat_robots)}")
|
||
report_lines.append(f"- Syntax clean: {syntax_success}/{len(cat_robots)}")
|
||
report_lines.append(f"- Interface OK: {interface_ok}/{len(cat_robots)}")
|
||
|
||
# List problematic robots
|
||
problematic = [r for r in cat_robots if not r['import_success'] or r['syntax_errors'] or not r['interface_ok']]
|
||
if problematic:
|
||
report_lines.append("\n**Problematic robots:**")
|
||
for robot in problematic:
|
||
issues = []
|
||
if not robot['import_success']:
|
||
issues.append("Import failed")
|
||
if robot['syntax_errors']:
|
||
issues.append(f"Syntax errors ({len(robot['syntax_errors'])})")
|
||
if not robot['interface_ok']:
|
||
issues.append("Interface issues")
|
||
report_lines.append(f"- `{robot['module']}`: {', '.join(issues)}")
|
||
|
||
report_lines.append("")
|
||
|
||
# Summary
|
||
report_lines.append("## 📊 Summary")
|
||
report_lines.append(f"- **Total robots:** {results['total_robots']}")
|
||
report_lines.append(f"- **Import successful:** {results['import_success']}/{results['total_robots']}")
|
||
report_lines.append(f"- **Syntax clean:** {results['syntax_clean']}/{results['total_robots']}")
|
||
report_lines.append(f"- **Interface OK:** {results['interface_ok']}/{results['total_robots']}")
|
||
|
||
# Critical issues
|
||
critical = [r for r in results['robots'] if not r['import_success']]
|
||
if critical:
|
||
report_lines.append("\n## 🚨 Critical Issues (Import Failed)")
|
||
for robot in critical:
|
||
report_lines.append(f"- `{robot['module']}`: {robot['import_error']}")
|
||
|
||
return "\n".join(report_lines)
|
||
|
||
async def main():
|
||
"""Main audit function with recursive discovery."""
|
||
logger.info("🤖 Starting Recursive Deep Integrity Audit")
|
||
logger.info("=" * 60)
|
||
|
||
# Discover all robot files
|
||
logger.info("\n🔍 STEP 1: Discovering robot files...")
|
||
robot_files = discover_robot_files()
|
||
|
||
if not robot_files:
|
||
logger.error("❌ No robot files found!")
|
||
return False
|
||
|
||
logger.info(f"📁 Found {len(robot_files)} robot files")
|
||
|
||
results = {
|
||
'robots': [],
|
||
'total_robots': len(robot_files),
|
||
'import_success': 0,
|
||
'syntax_clean': 0,
|
||
'interface_ok': 0,
|
||
}
|
||
|
||
# Process each robot
|
||
logger.info("\n📦 STEP 2: Import and syntax tests...")
|
||
logger.info("-" * 40)
|
||
|
||
for i, (module_name, file_path, category) in enumerate(robot_files, 1):
|
||
logger.info(f"\n[{i}/{len(robot_files)}] Testing: {module_name} ({category})")
|
||
|
||
# Check syntax first
|
||
syntax_errors = await check_syntax_errors(file_path)
|
||
|
||
# Test import
|
||
import_success, import_error = await test_import(module_name)
|
||
|
||
# Test interface
|
||
interface_ok, interface_issues = await test_robot_interface(module_name)
|
||
|
||
# Check model sync
|
||
model_errors = await check_model_sync(module_name)
|
||
|
||
robot_result = {
|
||
'module': module_name,
|
||
'file': str(file_path),
|
||
'category': category,
|
||
'import_success': import_success,
|
||
'import_error': import_error,
|
||
'syntax_errors': syntax_errors,
|
||
'interface_ok': interface_ok,
|
||
'interface_issues': interface_issues,
|
||
'model_errors': model_errors,
|
||
}
|
||
|
||
results['robots'].append(robot_result)
|
||
|
||
if import_success:
|
||
results['import_success'] += 1
|
||
if not syntax_errors:
|
||
results['syntax_clean'] += 1
|
||
if interface_ok:
|
||
results['interface_ok'] += 1
|
||
|
||
# Log summary for this robot
|
||
status_symbol = "✅" if import_success and not syntax_errors else "❌"
|
||
logger.info(f"{status_symbol} {module_name}: Import={import_success}, Syntax={len(syntax_errors)} errors, Interface={interface_ok}")
|
||
|
||
# Generate report
|
||
logger.info("\n📊 STEP 3: Generating categorized report...")
|
||
report = await generate_categorized_report(results)
|
||
|
||
# Print summary to console
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("📊 AUDIT SUMMARY")
|
||
logger.info("=" * 60)
|
||
logger.info(f"Total robots discovered: {results['total_robots']}")
|
||
logger.info(f"Import successful: {results['import_success']}/{results['total_robots']}")
|
||
logger.info(f"Syntax clean: {results['syntax_clean']}/{results['total_robots']}")
|
||
logger.info(f"Interface OK: {results['interface_ok']}/{results['total_robots']}")
|
||
|
||
# Save report to file
|
||
report_path = Path(__file__).parent.parent.parent / "audit_report_robots.md"
|
||
with open(report_path, 'w', encoding='utf-8') as f:
|
||
f.write(report)
|
||
logger.info(f"\n📄 Full report saved to: {report_path}")
|
||
|
||
# Determine overall status
|
||
critical_count = sum(1 for r in results['robots'] if not r['import_success'])
|
||
if critical_count > 0:
|
||
logger.error(f"🚨 ROBOT INTEGRITY CHECK FAILED - {critical_count} critical issues found!")
|
||
return False
|
||
elif results['import_success'] < results['total_robots']:
|
||
logger.warning("⚠️ ROBOT INTEGRITY CHECK PASSED with warnings")
|
||
return True
|
||
else:
|
||
logger.info("✅ ROBOT INTEGRITY CHECK PASSED - All systems operational!")
|
||
return True
|
||
|
||
if __name__ == "__main__":
|
||
success = asyncio.run(main())
|
||
sys.exit(0 if success else 1) |