name: migrations-api-reference description: Use when working with database migrations in pgdbm - provides complete AsyncMigrationManager API with all methods and migration file format
pgdbm Migrations API Reference
Overview
Complete API reference for AsyncMigrationManager and migration file format.
All signatures, parameters, return types. No documentation lookup needed.
AsyncMigrationManager
Initialization
from pgdbm import AsyncMigrationManager
migrations = AsyncMigrationManager(
db_manager: AsyncDatabaseManager,
migrations_path: str = "./migrations",
migrations_table: str = "schema_migrations",
module_name: Optional[str] = None,
)
Parameters:
db_manager: AsyncDatabaseManager instance (schema already set)migrations_path: Directory containing .sql migration filesmigrations_table: Table name for tracking migrations (default: "schema_migrations")module_name: CRITICAL - Unique identifier for this module's migrations
IMPORTANT:
- DO NOT pass
schemaparameter (doesn't exist, schema comes from db_manager) - ALWAYS specify
module_nameto prevent conflicts - For dual-mode libraries:
module_name=f"mylib_{schema}"(include schema)
Core Methods
# Apply all pending migrations
result = await migrations.apply_pending_migrations(
dry_run: bool = False
) -> dict[str, Any]
# Returns: {
# "status": "success" | "up_to_date" | "dry_run" | "error",
# "applied": [{"filename": "001_...", "execution_time_ms": 123.4}, ...],
# "skipped": ["001_already_applied.sql", ...],
# "total": 5,
# "total_time_ms": 456.7 # Only if status="success"
# }
# Get applied migrations
applied = await migrations.get_applied_migrations() -> dict[str, Migration]
# Returns: {"001_users.sql": Migration(...), ...}
# Get pending migrations
pending = await migrations.get_pending_migrations() -> list[Migration]
# Returns: [Migration(...), Migration(...)]
# Find migration files on disk
files = await migrations.find_migration_files() -> list[Migration]
# Returns: All .sql files in migrations_path
# Apply single migration
execution_time = await migrations.apply_migration(
migration: Migration
) -> float
# Returns: Execution time in milliseconds
# Get migration history (recent migrations)
history = await migrations.get_migration_history(
limit: int = 10
) -> list[dict[str, Any]]
# Returns: Recent migrations with timestamps, checksums
# Ensure migrations table exists
await migrations.ensure_migrations_table() -> None
# Creates schema_migrations table if doesn't exist
Development Methods
# Create new migration file
filepath = await migrations.create_migration(
name: str,
content: str,
auto_transaction: bool = True
) -> str
# Returns: Path to created file
# Example
path = await migrations.create_migration(
name="add_users_table",
content="CREATE TABLE {{tables.users}} (id SERIAL PRIMARY KEY)",
auto_transaction=True # Wraps in BEGIN/COMMIT
)
# Creates: migrations/20251025_120000_add_users_table.sql
# Rollback migration (removes from tracking, doesn't undo changes)
await migrations.rollback_migration(filename: str) -> None
# Marks migration as not applied
# WARNING: Doesn't undo the migration, just removes tracking
Migration File Format
File Naming
Must match one of these patterns:
001_description.sql(numeric prefix)V1__description.sql(Flyway pattern)20251025120000_description.sql(timestamp)
Examples:
migrations/
├── 001_create_users.sql
├── 002_add_profiles.sql
├── 003_create_sessions.sql
└── 004_add_indexes.sql
Template Syntax
Always use templates for schema portability:
-- All template placeholders
{{tables.tablename}} -- Table with schema qualification
{{schema}} -- Schema name only
Example migration:
-- migrations/001_create_users.sql
CREATE TABLE IF NOT EXISTS {{tables.users}} (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
full_name VARCHAR(255),
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS users_email
ON {{tables.users}} (email);
CREATE INDEX IF NOT EXISTS users_created
ON {{tables.users}} (created_at DESC);
-- Trigger using schema placeholder
CREATE OR REPLACE FUNCTION {{schema}}.update_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_updated_at
BEFORE UPDATE ON {{tables.users}}
FOR EACH ROW
EXECUTE FUNCTION {{schema}}.update_updated_at();
Transaction Handling
Automatic: Migrations run in transactions by default
Manual control:
-- migrations/002_manual_transaction.sql
BEGIN;
CREATE TABLE {{tables.posts}} (
id SERIAL PRIMARY KEY,
title TEXT NOT NULL
);
-- If this fails, whole migration rolls back
CREATE INDEX posts_title ON {{tables.posts}} (title);
COMMIT;
Non-transactional operations:
-- migrations/003_create_index_concurrently.sql
-- CREATE INDEX CONCURRENTLY cannot run in transaction
CREATE INDEX CONCURRENTLY users_email_concurrent
ON {{tables.users}} (email);
Complete Usage Examples
Basic Setup
from pgdbm import AsyncDatabaseManager, AsyncMigrationManager, DatabaseConfig
# Create database manager
config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
db = AsyncDatabaseManager(config)
await db.connect()
# Create migration manager
migrations = AsyncMigrationManager(
db,
migrations_path="./migrations",
module_name="myapp" # REQUIRED
)
# Apply all pending
result = await migrations.apply_pending_migrations()
if result["status"] == "success":
print(f"Applied {len(result['applied'])} migrations")
for mig in result["applied"]:
print(f" - {mig['filename']} ({mig['execution_time_ms']:.1f}ms)")
Dry Run
# Check what would be applied without applying
result = await migrations.apply_pending_migrations(dry_run=True)
if result["status"] == "dry_run":
print(f"Would apply {len(result['pending'])} migrations:")
for filename in result["pending"]:
print(f" - {filename}")
Check Migration Status
# Get applied migrations
applied = await migrations.get_applied_migrations()
print(f"Applied: {list(applied.keys())}")
# Get pending migrations
pending = await migrations.get_pending_migrations()
print(f"Pending: {[m.filename for m in pending]}")
# Get history
history = await migrations.get_migration_history(limit=5)
for entry in history:
print(f"{entry['applied_at']}: {entry['filename']} ({entry['execution_time_ms']:.1f}ms)")
Creating Migrations Programmatically
# Create new migration
path = await migrations.create_migration(
name="add_users_avatar",
content="""
ALTER TABLE {{tables.users}}
ADD COLUMN avatar_url VARCHAR(500);
""",
auto_transaction=True # Wraps in BEGIN/COMMIT
)
print(f"Created migration: {path}")
# Output: migrations/20251025_143022_add_users_avatar.sql
Development: Rollback Migration Record
# Remove migration from tracking (doesn't undo changes!)
await migrations.rollback_migration("003_add_column.sql")
# Migration can now be re-applied
result = await migrations.apply_pending_migrations()
# Will apply 003_add_column.sql again
WARNING: rollback_migration only removes tracking record. It does NOT undo the migration's database changes. For true rollback, write a down migration.
Migration Class
Used internally, but can be accessed:
class Migration:
filename: str
checksum: str
content: str
applied_at: Optional[datetime]
module_name: Optional[str]
@property
def is_applied(self) -> bool
@property
def version(self) -> str
Complete Method Summary
| Method | Parameters | Returns | Use Case |
|---|---|---|---|
apply_pending_migrations | dry_run=False | dict | Apply all pending |
get_applied_migrations | - | dict[str, Migration] | Check what's applied |
get_pending_migrations | - | list[Migration] | Check what's pending |
find_migration_files | - | list[Migration] | List files on disk |
apply_migration | migration | float | Apply single migration |
ensure_migrations_table | - | None | Create tracking table |
create_migration | name, content, auto_transaction | str | Create new file |
rollback_migration | filename | None | Remove tracking (dev only) |
get_migration_history | limit=10 | list[dict] | Recent migrations |
Migration Best Practices
1. Always Use Templates
-- ✅ CORRECT
CREATE TABLE {{tables.users}} (...);
CREATE INDEX users_email ON {{tables.users}} (email);
-- ❌ WRONG
CREATE TABLE users (...);
CREATE TABLE myschema.users (...);
2. Make Migrations Idempotent
-- ✅ CORRECT - Can run multiple times safely
CREATE TABLE IF NOT EXISTS {{tables.users}} (...);
CREATE INDEX IF NOT EXISTS users_email ON {{tables.users}} (email);
-- ❌ WRONG - Fails if already exists
CREATE TABLE {{tables.users}} (...);
3. Use Unique module_name
# ✅ CORRECT - Unique per module
AsyncMigrationManager(db, "migrations", module_name="users")
AsyncMigrationManager(db, "migrations", module_name="orders")
# ❌ WRONG - Default module name causes conflicts
AsyncMigrationManager(db, "migrations") # module_name="default"
AsyncMigrationManager(db, "migrations") # Same default - conflict!
4. For Dual-Mode Libraries: Include Schema in module_name
# ✅ CORRECT - Can use same library multiple times
module_name = f"mylib_{schema}" # "mylib_tenant1", "mylib_tenant2"
# ❌ WRONG - Conflicts if library used twice
module_name = "mylib" # Always the same
Migration File Examples
Basic Table Creation
-- migrations/001_initial_schema.sql
CREATE TABLE IF NOT EXISTS {{tables.users}} (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS {{tables.posts}} (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES {{tables.users}}(id) ON DELETE CASCADE,
title VARCHAR(500) NOT NULL,
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Adding Indexes
-- migrations/002_add_indexes.sql
CREATE INDEX IF NOT EXISTS users_email
ON {{tables.users}} (email);
CREATE INDEX IF NOT EXISTS posts_user_id
ON {{tables.posts}} (user_id);
CREATE INDEX IF NOT EXISTS posts_created
ON {{tables.posts}} (created_at DESC);
Schema-Qualified Functions
-- migrations/003_add_triggers.sql
CREATE OR REPLACE FUNCTION {{schema}}.update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_users_timestamp
BEFORE UPDATE ON {{tables.users}}
FOR EACH ROW
EXECUTE FUNCTION {{schema}}.update_timestamp();
Data Migrations
-- migrations/004_seed_data.sql
INSERT INTO {{tables.users}} (email, full_name)
VALUES
('admin@example.com', 'Admin User'),
('support@example.com', 'Support User')
ON CONFLICT (email) DO NOTHING;
Checksum Validation
Migrations are checksummed to detect modifications:
# If migration file changes after being applied
result = await migrations.apply_pending_migrations()
# Raises: MigrationError(
# "Migration '001_users.sql' has been modified after being applied!"
# "Expected checksum: abc123..."
# "Current checksum: def456..."
# )
Why: Prevents silent schema divergence. Applied migrations must not change.
If you need to modify: Create a new migration instead.
Migration Table Schema
Migrations tracked in schema_migrations table:
CREATE TABLE schema_migrations (
id SERIAL PRIMARY KEY,
filename VARCHAR(255) NOT NULL,
checksum VARCHAR(64) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
execution_time_ms REAL,
module_name VARCHAR(255),
UNIQUE(filename, module_name)
);
Key points:
filename: Migration file namechecksum: SHA256 of file contentsmodule_name: Isolates migrations per module- UNIQUE constraint on (filename, module_name)
This allows:
- Same filename in different modules (e.g., both "001_initial.sql")
- Each module tracks its own migrations
- No conflicts when modules share database
Error Handling
result = await migrations.apply_pending_migrations()
if result["status"] == "error":
print(f"Failed on: {result['failed_migration']}")
print(f"Error: {result['error']}")
print(f"Successfully applied: {result['applied']}")
elif result["status"] == "success":
print(f"Applied {len(result['applied'])} migrations")
elif result["status"] == "up_to_date":
print("No pending migrations")
Common Patterns
Standard Application Setup
# In application startup (FastAPI lifespan, etc.)
from pgdbm import AsyncDatabaseManager, AsyncMigrationManager, DatabaseConfig
config = DatabaseConfig(connection_string="postgresql://localhost/myapp")
db = AsyncDatabaseManager(config)
await db.connect()
migrations = AsyncMigrationManager(
db,
migrations_path="./migrations",
module_name="myapp"
)
result = await migrations.apply_pending_migrations()
if result["status"] != "success" and result["status"] != "up_to_date":
raise RuntimeError(f"Migration failed: {result}")
Multi-Service Setup
# Each service runs its own migrations
services = [
(users_db, "migrations/users", "users"),
(orders_db, "migrations/orders", "orders"),
(payments_db, "migrations/payments", "payments"),
]
for db, path, name in services:
migrations = AsyncMigrationManager(db, path, module_name=name)
result = await migrations.apply_pending_migrations()
if result["status"] == "success":
print(f"{name}: Applied {len(result['applied'])} migrations")
Dual-Mode Library
class MyLibrary:
async def initialize(self):
# Library ALWAYS runs its own migrations
migrations = AsyncMigrationManager(
self.db,
migrations_path=str(Path(__file__).parent / "migrations"),
module_name=f"mylib_{self.db.schema}" # Include schema!
)
result = await migrations.apply_pending_migrations()
Advanced Usage
Custom Migrations Table
# Use different table name (e.g., for multi-tenant)
migrations = AsyncMigrationManager(
db,
migrations_path="tenant_migrations",
migrations_table="tenant_migrations", # Custom table name
module_name=f"tenant_{tenant_id}"
)
Pre-Flight Checks
# Check pending before applying
pending = await migrations.get_pending_migrations()
if pending:
print(f"Found {len(pending)} pending migrations:")
for mig in pending:
print(f" - {mig.filename}")
# Ask user confirmation
if input("Apply? (y/n): ") == "y":
result = await migrations.apply_pending_migrations()
else:
print("Schema is up to date")
Development Workflow
# 1. Create migration
path = await migrations.create_migration(
name="add_user_roles",
content="""
CREATE TABLE {{tables.roles}} (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL
);
ALTER TABLE {{tables.users}}
ADD COLUMN role_id INTEGER REFERENCES {{tables.roles}}(id);
"""
)
# 2. Apply it
result = await migrations.apply_pending_migrations()
# 3. If something wrong, rollback the record
await migrations.rollback_migration("20251025_143022_add_user_roles.sql")
# 4. Fix the file, re-apply
result = await migrations.apply_pending_migrations()
Common Mistakes
❌ Passing schema Parameter
# WRONG - schema parameter doesn't exist
migrations = AsyncMigrationManager(
db,
"migrations",
schema="myschema" # TypeError!
)
Fix: Schema comes from db_manager:
db = AsyncDatabaseManager(pool=pool, schema="myschema")
migrations = AsyncMigrationManager(db, "migrations", module_name="myapp")
❌ Not Specifying module_name
# WRONG - Uses "default" module name
migrations = AsyncMigrationManager(db, "migrations")
Fix: Always specify unique module_name:
migrations = AsyncMigrationManager(db, "migrations", module_name="myservice")
❌ Same module_name for Different Schemas
# WRONG - Conflict if library used twice
migrations = AsyncMigrationManager(db1, "migrations", module_name="mylib")
migrations = AsyncMigrationManager(db2, "migrations", module_name="mylib")
Fix: Include schema in module_name:
migrations = AsyncMigrationManager(db1, "migrations", module_name=f"mylib_{schema1}")
migrations = AsyncMigrationManager(db2, "migrations", module_name=f"mylib_{schema2}")
❌ Hardcoding Table Names
-- WRONG
CREATE TABLE users (...);
CREATE TABLE myschema.users (...);
Fix: Use templates:
-- CORRECT
CREATE TABLE {{tables.users}} (...);
❌ Modifying Applied Migrations
-- You applied 001_users.sql yesterday
-- Today you edit it and add a column
-- Next deploy: ERROR - checksum mismatch!
Fix: Never modify applied migrations. Create new migration:
-- migrations/002_add_user_column.sql
ALTER TABLE {{tables.users}} ADD COLUMN new_field VARCHAR(255);
❌ Not Making Migrations Idempotent
-- WRONG - Fails if run twice
CREATE TABLE {{tables.users}} (...);
Fix: Use IF NOT EXISTS:
-- CORRECT
CREATE TABLE IF NOT EXISTS {{tables.users}} (...);
CREATE INDEX IF NOT EXISTS users_email ON {{tables.users}} (email);
Migration History
# Get recent migration history
history = await migrations.get_migration_history(limit=10)
for entry in history:
print(f"""
Migration: {entry['filename']}
Module: {entry['module_name']}
Applied: {entry['applied_at']}
Time: {entry['execution_time_ms']}ms
Checksum: {entry['checksum']}
""")
Quick Checklist
Before running migrations:
- All migrations use
{{tables.}}syntax - All migrations are idempotent (IF NOT EXISTS)
- Unique
module_namespecified - Migration files follow naming pattern (001_name.sql)
- No modifications to already-applied migrations
- For dual-mode libraries: module_name includes schema
Related Skills
- For patterns:
pgdbm:using-pgdbm,pgdbm:choosing-pattern - For implementation:
pgdbm:shared-pool-pattern,pgdbm:dual-mode-library - For complete API:
pgdbm:core-api-reference