name: "DB Schema Surgeon" description: "Gestión del esquema PostgreSQL, Auto-Healing y arquitectura RAG híbrida." trigger: "base de datos, modelos, migraciones, tablas, RAG, schema, SQL" scope: "DATABASE" auto-invoke: true
DB Schema Surgeon - Platform AI Solutions
1. Filosofía Auto-Healing (NO Alembic)
El Protocolo
Platform AI Solutions usa Self-Healing: el esquema se auto-repara en cada startup.
# orchestrator_service/main.py
from app.models import Base
from app.db import engine
@app.on_event("startup")
async def startup_event():
async with engine.begin() as conn:
# Crea SOLO las tablas que no existen
await conn.run_sync(Base.metadata.create_all)
# Ejecutar migraciones de esquema
await run_migration_steps()
Ventajas
- Sin historial de migraciones complejo
- Desarrollo rápido (cambios inmediatos)
- Funciona igual en local, staging y producción
- Idempotente (
create_allsolo crea lo que falta)
2. Crear Nuevas Tablas
Paso 1: Definir Modelo SQLAlchemy
# app/models/business_asset.py
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Integer, JSONB, ForeignKey
from datetime import datetime
from .base import Base
class BusinessAsset(Base):
__tablename__ = "business_assets"
id: Mapped[str] = mapped_column(String, primary_key=True) # UUID
tenant_id: Mapped[str] = mapped_column(String(50), nullable=False)
asset_type: Mapped[str] = mapped_column(String(50)) # branding, scripts, roi
content: Mapped[dict] = mapped_column(JSONB, nullable=False)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
is_active: Mapped[bool] = mapped_column(default=True)
Paso 2: Importar en main.py
# main.py
from app.models import (
Base,
Tenant,
Agent,
BusinessAsset, # ← Agregar aquí
Credential
)
# Esto asegura que SQLAlchemy conozca todos los modelos
Paso 3: Reiniciar
Al reiniciar el servicio, create_all ejecutará y la tabla se creará automáticamente.
3. Modificar Tablas Existentes
El Problema
create_all NO altera tablas existentes.
Para agregar columnas, cambiar tipos, o renombrar: Migration Script
Solución: Migration Steps
# orchestrator_service/scripts/migration_steps.py
from sqlalchemy import text
async def run_migration_steps(engine):
"""Migraciones idempotentes"""
async with engine.begin() as conn:
# Migración 1: Agregar columna
await conn.execute(text("""
ALTER TABLE agents
ADD COLUMN IF NOT EXISTS template_type VARCHAR(50) DEFAULT 'custom'
"""))
# Migración 2: Crear índice
await conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_agents_tenant_active
ON agents(tenant_id, is_active)
"""))
# Migración 3: Agregar columna JSONB
await conn.execute(text("""
ALTER TABLE tenants
ADD COLUMN IF NOT EXISTS tool_config JSONB DEFAULT '{}'::jsonb
"""))
Invocar en Startup
# main.py
from scripts.migration_steps import run_migration_steps
@app.on_event("startup")
async def startup_event():
# 1. Crear tablas nuevas
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 2. Ejecutar migraciones
await run_migration_steps(engine)
4. Arquitectura RAG Híbrida
Separación de Responsabilidades
| Almacenamiento | Responsabilidad | Tecnología |
|---|---|---|
| PostgreSQL Local | Metadata (filename, collection, file_path) | rag_documents table |
| Supabase Remote | Vectores (embeddings) | pgvector extension |
Modelo PostgreSQL
# app/models/rag_document.py
class RAGDocument(Base):
__tablename__ = "rag_documents"
id: Mapped[str] = mapped_column(String, primary_key=True) # UUID
tenant_id: Mapped[int] = mapped_column(
Integer,
ForeignKey("tenants.id"),
index=True
)
filename: Mapped[str] = mapped_column(String(255))
collection: Mapped[str] = mapped_column(String(100)) # General, ADN Personal, Shadow RAG
file_type: Mapped[str] = mapped_column(String(50)) # pdf, txt, docx
file_path: Mapped[str] = mapped_column(String(500), nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
Flujo de Creación
# 1. Guardar metadata en PostgreSQL
doc = RAGDocument(
id=str(uuid.uuid4()),
tenant_id=tenant_id,
filename=filename,
collection="General",
file_type=file_extension,
file_path=storage_path
)
session.add(doc)
await session.flush() # Obtener ID sin commit final
# 2. Procesar documento
chunks = process_document(file_content)
# 3. Vectorizar y guardar en Supabase
from app.services.rag.vector_store import SupabaseVectorStore
vector_store = SupabaseVectorStore(tenant_id)
await vector_store.add_documents(
chunks,
metadata={
"tenant_id": tenant_id,
"source_id": doc.id,
"collection": collection
}
)
# 4. Commit
await session.commit()
Flujo de Eliminación (Dual Delete Protocol)
CRÍTICO: Mantener coherencia entre PostgreSQL y Supabase
# Paso 1: Surgical Strike (Remoto) - Eliminar vectores de Supabase
await supabase.from_("documents").delete().eq(
"metadata->>source_id", doc_id
).execute()
# Paso 2: Metadata Cleanup (Local) - Eliminar de PostgreSQL
stmt = delete(RAGDocument).where(
RAGDocument.id == doc_id,
RAGDocument.tenant_id == tenant_id
)
await session.execute(stmt)
# Paso 3: Physical Sweep (Disco) - Best effort
try:
os.remove(file_path)
except FileNotFoundError:
logger.warning(f"File {file_path} not found, skipping physical delete")
await session.commit()
5. Tablas Core (Schema Reference)
credentials (The Vault)
CREATE TABLE IF NOT EXISTS credentials (
id_uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id INTEGER REFERENCES tenants(id),
name TEXT NOT NULL,
value TEXT NOT NULL, -- Encrypted AES-256
category TEXT DEFAULT 'general', -- openai, google, smtp
scope TEXT DEFAULT 'global', -- global o tenant
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(name, tenant_id) -- Unicidad multi-tenant
);
agents (AI Configuration)
CREATE TABLE IF NOT EXISTS agents (
id SERIAL PRIMARY KEY,
tenant_id INTEGER REFERENCES tenants(id),
name TEXT NOT NULL,
role TEXT DEFAULT 'sales',
model_provider TEXT DEFAULT 'openai',
model_version TEXT DEFAULT 'gpt-5-mini',
temperature FLOAT DEFAULT 0.7,
system_prompt_template TEXT NOT NULL,
enabled_tools JSONB DEFAULT '[]',
channels JSONB DEFAULT '["whatsapp", "instagram", "facebook", "web"]',
config JSONB DEFAULT '{}',
template_type VARCHAR(50) DEFAULT 'custom',
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
chat_conversations (Omnichannel)
CREATE TABLE IF NOT EXISTS chat_conversations (
id UUID PRIMARY KEY,
tenant_id INTEGER REFERENCES tenants(id),
channel VARCHAR(32) NOT NULL, -- whatsapp, instagram, facebook, web
channel_source VARCHAR(32) DEFAULT 'whatsapp',
display_name VARCHAR(255),
meta JSONB DEFAULT '{}',
last_message_preview TEXT,
last_message_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
tools (Brain Extensions)
CREATE TABLE IF NOT EXISTS tools (
id SERIAL PRIMARY KEY,
tenant_id INTEGER REFERENCES tenants(id), -- NULL para Global
name VARCHAR(255) NOT NULL,
type VARCHAR(32) NOT NULL, -- http, internal
description TEXT,
prompt_injection TEXT,
response_guide TEXT,
config JSONB DEFAULT '{}',
service_url TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id, name)
);
6. Tipos de Datos Comunes
from sqlalchemy import String, Integer, Boolean, DateTime, Text, JSONB, ARRAY
class ExampleModel(Base):
__tablename__ = "example"
# IDs
id_uuid: Mapped[str] = mapped_column(String, primary_key=True)
id_int: Mapped[int] = mapped_column(Integer, primary_key=True)
# Texto
short_text: Mapped[str] = mapped_column(String(255))
long_text: Mapped[str] = mapped_column(Text)
# JSON
metadata: Mapped[dict] = mapped_column(JSONB, default={})
# Arrays
tags: Mapped[list] = mapped_column(JSONB, default=[])
# Booleanos
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
# Timestamps
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
7. Identity Resolution (UUID vs INTEGER)
El Problema Crítico
- Tenants:
ides INTEGER - Users:
ides UUID,tenant_ides INTEGER - Agents, Tools, Credentials:
tenant_ides INTEGER (foreign key)
La Solución
# NUNCA asumir que current_user.tenant_id es UUID
# SIEMPRE resolver desde tabla users
user_row = await db.pool.fetchrow(
"SELECT tenant_id FROM users WHERE id = $1",
current_user.id # UUID
)
real_tenant_int = user_row['tenant_id'] # INTEGER
# Usar en queries
stmt = delete(Agent).where(Agent.tenant_id == real_tenant_int)
8. Índices y Performance
Crear Índices
from sqlalchemy import Index
class Message(Base):
__tablename__ = "messages"
# ... columnas ...
__table_args__ = (
Index('idx_messages_conversation', 'conversation_id', 'created_at'),
Index('idx_messages_tenant', 'tenant_id'),
)
Eager Loading (Evitar N+1)
from sqlalchemy.orm import selectinload
# ❌ MAL - N+1 queries
conversations = await session.execute(select(Conversation))
for conv in conversations:
messages = conv.messages # Query adicional
# ✅ BIEN - Single query
stmt = select(Conversation).options(
selectinload(Conversation.messages)
)
conversations = await session.execute(stmt)
9. Colecciones RAG
Tipos de Colecciones
- General: Manuales técnicos, políticas (PDF/DOCX)
- ADN Personal: Historiales de conversación (.txt) para clonación de estilo
- Shadow RAG: Chats vectorizados automáticamente (memoria largo plazo)
WhatsApp Parser (Identity Engine)
# Al subir .txt a "ADN Personal", se activa parser
if collection == "ADN Personal" and file_ext == ".txt":
from app.services.parsers import WhatsAppParser
parser = WhatsAppParser(hero_name=hero_name)
parsed = parser.parse(file_content)
# Genera pares context/response para stored patterns
10. Reset Industrial (Development Only)
-- Limpiar plataforma completa
TRUNCATE TABLE
users, tenants, credentials, agents, tools,
business_assets, chat_conversations, chat_messages,
rag_documents
RESTART IDENTITY CASCADE;
11. Checklist Pre-Deploy
- ¿El modelo está importado en
main.py? - ¿Las columnas tienen tipos explícitos (
Mapped[type])? - ¿Las foreign keys tienen índices?
- ¿
tenant_idestá indexado? - ¿Las migraciones usan
IF NOT EXISTS? - ¿En RAG, se sincroniza PostgreSQL + Supabase?
- ¿Los scripts son idempotentes?
- ¿Se resuelve UUID→INT para tenant_id?