name: core-api-reference description: Use when implementing pgdbm database operations - provides complete AsyncDatabaseManager and DatabaseConfig API with all methods and parameters
pgdbm Core API Reference
Overview
Complete API reference for AsyncDatabaseManager, DatabaseConfig, and TransactionManager.
All signatures, parameters, return types, and usage examples. No documentation lookup needed.
AsyncDatabaseManager
Initialization
# Pattern 1: Create own pool
AsyncDatabaseManager(config: DatabaseConfig)
# Pattern 2: Use external pool
AsyncDatabaseManager(
pool: asyncpg.Pool,
schema: Optional[str] = None
)
Rules:
- Cannot provide both
configandpool schemaonly valid with external pool- Must call
connect()if using config - Never call
connect()if using external pool
Connection Lifecycle
# Create shared pool (class method)
pool = await AsyncDatabaseManager.create_shared_pool(config: DatabaseConfig) -> asyncpg.Pool
# Connect (only for config-based init)
await db.connect() -> None
# Raises PoolError if using external pool
# Disconnect (only for config-based init)
await db.disconnect() -> None
# Does nothing if using external pool
Query Methods
All methods automatically apply {{tables.}} template substitution.
# Execute without return
await db.execute(
query: str,
*args: Any,
timeout: Optional[float] = None
) -> str
# Returns: asyncpg status string like "INSERT 0 1"
# Execute and return generated ID
await db.execute_and_return_id(
query: str,
*args: Any
) -> Any
# Automatically appends RETURNING id if not present
# Returns: The id value
# Fetch single value
await db.fetch_value(
query: str,
*args: Any,
column: int = 0,
timeout: Optional[float] = None
) -> Any
# Returns: Single value from result (or None)
# Fetch single row
await db.fetch_one(
query: str,
*args: Any,
timeout: Optional[float] = None
) -> Optional[dict[str, Any]]
# Returns: Dictionary of column->value (or None if no results)
# Fetch all rows
await db.fetch_all(
query: str,
*args: Any,
timeout: Optional[float] = None
) -> list[dict[str, Any]]
# Returns: List of dictionaries
# Batch execute (multiple parameter sets)
await db.executemany(
query: str,
args_list: list[tuple]
) -> None
# Executes same query with different parameter sets
# More efficient than looping execute()
Examples:
# execute_and_return_id - Common for inserts
user_id = await db.execute_and_return_id(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
"alice@example.com",
"Alice"
)
# Automatically becomes: ... RETURNING id
# fetch_value with column parameter
email = await db.fetch_value(
"SELECT email, name FROM {{tables.users}} WHERE id = $1",
user_id,
column=0 # Get first column (email)
)
# executemany for batch inserts
users = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
("charlie@example.com", "Charlie"),
]
await db.executemany(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
users
)
Bulk Operations
# Copy records (MUCH faster than INSERT for bulk data)
await db.copy_records_to_table(
table_name: str,
records: list[tuple],
columns: Optional[list[str]] = None
) -> int
# Uses PostgreSQL COPY command
# Returns: Number of records copied
# Example
records = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
]
count = await db.copy_records_to_table(
"users", # Don't use {{tables.}} here - just table name
records=records,
columns=["email", "name"]
)
# Returns: 2
Pydantic Integration
from pydantic import BaseModel
class User(BaseModel):
id: int
email: str
name: str
# Fetch single row as model
user = await db.fetch_as_model(
User,
query: str,
*args: Any,
timeout: Optional[float] = None
) -> Optional[User]
# Fetch all rows as models
users = await db.fetch_all_as_model(
User,
query: str,
*args: Any,
timeout: Optional[float] = None
) -> list[User]
# Example
user = await db.fetch_as_model(
User,
"SELECT * FROM {{tables.users}} WHERE id = $1",
user_id
)
# Returns: User(id=1, email="alice@example.com", name="Alice")
Schema Operations
# Check if table exists
exists = await db.table_exists(table_name: str) -> bool
# Examples
exists = await db.table_exists("users") # Check in current schema
exists = await db.table_exists("other_schema.users") # Check in specific schema
Transaction Management
# Create transaction context
async with db.transaction() as tx:
# tx has same API as db (execute, fetch_one, fetch_all, etc.)
user_id = await tx.fetch_value(
"INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id",
email
)
await tx.execute(
"INSERT INTO {{tables.profiles}} (user_id) VALUES ($1)",
user_id
)
# Auto-commits on success, rolls back on exception
# Nested transactions (savepoints)
async with db.transaction() as tx:
await tx.execute("INSERT INTO {{tables.users}} ...")
async with tx.transaction() as nested:
await nested.execute("UPDATE {{tables.users}} ...")
# Nested transaction uses SAVEPOINT
Monitoring and Performance
# Get pool statistics
stats = await db.get_pool_stats() -> dict[str, Any]
# Returns: {
# "status": "connected",
# "min_size": 10,
# "max_size": 50,
# "size": 15, # Current total connections
# "free_size": 10, # Idle connections
# "used_size": 5, # Active connections
# "database": "myapp",
# "schema": "myschema",
# "pid": 12345,
# "version": "PostgreSQL 15.3"
# }
# Add prepared statement (performance optimization)
db.add_prepared_statement(
name: str,
query: str
) -> None
# Prepared statements created on all connections in pool
# Improves performance for frequently-used queries
Advanced Operations
# Acquire connection directly (advanced)
async with db.acquire() as conn:
# conn is raw asyncpg connection
# Use for operations not covered by AsyncDatabaseManager
await conn.execute("...")
DatabaseConfig
Complete Parameter Reference
from pgdbm import DatabaseConfig
config = DatabaseConfig(
# Connection (either connection_string OR individual params)
connection_string: Optional[str] = None, # e.g., "postgresql://user:pass@host/db"
host: str = "localhost",
port: int = 5432,
database: str = "postgres",
user: str = "postgres",
password: Optional[str] = None,
schema: Optional[str] = None, # Alias: schema_name
# Connection Pool
min_connections: int = 10,
max_connections: int = 20,
max_queries: int = 50000, # Queries per connection before recycling
max_inactive_connection_lifetime: float = 300.0, # Seconds
command_timeout: float = 60.0, # Default query timeout (seconds)
# Connection Initialization
server_settings: Optional[dict[str, str]] = None, # PostgreSQL settings
init_commands: Optional[list[str]] = None, # Run on each connection
# TLS/SSL Configuration
ssl_enabled: bool = False,
ssl_mode: Optional[str] = None, # 'require', 'verify-ca', 'verify-full'
ssl_ca_file: Optional[str] = None, # Path to CA certificate
ssl_cert_file: Optional[str] = None, # Path to client certificate
ssl_key_file: Optional[str] = None, # Path to client key
ssl_key_password: Optional[str] = None, # Key password if encrypted
# Server-Side Timeouts (milliseconds, None to disable)
statement_timeout_ms: Optional[int] = 60000, # Abort long queries
idle_in_transaction_session_timeout_ms: Optional[int] = 60000, # Abort idle transactions
lock_timeout_ms: Optional[int] = 5000, # Abort lock waits
# Retry Configuration
retry_attempts: int = 3,
retry_delay: float = 1.0, # Initial delay (seconds)
retry_backoff: float = 2.0, # Exponential backoff multiplier
retry_max_delay: float = 30.0, # Maximum delay (seconds)
)
Common Configurations
Development:
config = DatabaseConfig(
connection_string="postgresql://localhost/myapp_dev",
min_connections=2,
max_connections=10,
)
Production with TLS:
config = DatabaseConfig(
connection_string="postgresql://db.example.com/myapp",
min_connections=20,
max_connections=100,
ssl_enabled=True,
ssl_mode="verify-full",
ssl_ca_file="/etc/ssl/certs/ca.pem",
statement_timeout_ms=30000, # 30 second timeout
lock_timeout_ms=5000, # 5 second lock timeout
)
Custom initialization:
config = DatabaseConfig(
connection_string="postgresql://localhost/myapp",
init_commands=[
"SET timezone TO 'UTC'",
"SET statement_timeout TO '30s'",
],
server_settings={
"jit": "off", # Disable JIT compilation
"application_name": "myapp",
},
)
TransactionManager
Same API as AsyncDatabaseManager but within transaction context:
async with db.transaction() as tx:
# All methods available
await tx.execute(query, *args, timeout=None) -> str
await tx.executemany(query, args_list) -> None
await tx.fetch_one(query, *args, timeout=None) -> Optional[dict]
await tx.fetch_all(query, *args, timeout=None) -> list[dict]
await tx.fetch_value(query, *args, column=0, timeout=None) -> Any
# Nested transactions (savepoints)
async with tx.transaction() as nested_tx:
...
# Access underlying connection
conn = tx.connection # Property, not method
Complete Method Summary
AsyncDatabaseManager - All Methods
| Method | Parameters | Returns | Use Case |
|---|---|---|---|
execute | query, *args, timeout | str | No results needed |
execute_and_return_id | query, *args | Any | INSERT with auto RETURNING id |
executemany | query, args_list | None | Batch execute same query |
fetch_value | query, *args, column, timeout | Any | Single value |
fetch_one | query, *args, timeout | dict|None | Single row |
fetch_all | query, *args, timeout | list[dict] | Multiple rows |
fetch_as_model | model, query, *args, timeout | Model|None | Single row as Pydantic |
fetch_all_as_model | model, query, *args, timeout | list[Model] | Rows as Pydantic |
copy_records_to_table | table, records, columns | int | Bulk COPY (fast) |
table_exists | table_name | bool | Schema checking |
transaction | - | TransactionManager | Transaction context |
get_pool_stats | - | dict | Pool monitoring |
add_prepared_statement | name, query | None | Performance optimization |
acquire | - | Connection | Advanced: raw connection |
connect | - | None | Initialize pool (config-based only) |
disconnect | - | None | Close pool (config-based only) |
create_shared_pool | config | asyncpg.Pool | Class method: create shared pool |
Compatibility aliases
fetch_val(...)→fetch_value(...)execute_many(...)→executemany(...)
TransactionManager - All Methods
| Method | Parameters | Returns |
|---|---|---|
execute | query, *args, timeout | str |
executemany | query, args_list | None |
fetch_value | query, *args, column, timeout | Any |
fetch_one | query, *args, timeout | dict|None |
fetch_all | query, *args, timeout | list[dict] |
transaction | - | TransactionManager (nested) |
connection | - | Connection (property) |
Note: TransactionManager does NOT have:
- execute_and_return_id
- copy_records_to_table
- fetch_as_model
- table_exists
- Pool management methods
Use regular fetch_value for IDs within transactions.
Template Syntax
All query methods support template substitution:
# Available templates
{{tables.tablename}} # → "schema".tablename (or tablename if no schema)
{{schema}} # → "schema" (or empty)
# Example
query = "SELECT * FROM {{tables.users}} WHERE created_at > $1"
# With schema="myapp"
# Becomes: SELECT * FROM "myapp".users WHERE created_at > $1
# Without schema
# Becomes: SELECT * FROM users WHERE created_at > $1
Usage Examples
Basic Queries
# Insert and get ID
user_id = await db.execute_and_return_id(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
"alice@example.com",
"Alice"
)
# Fetch single value
count = await db.fetch_value(
"SELECT COUNT(*) FROM {{tables.users}}"
)
# Fetch with specific column
email = await db.fetch_value(
"SELECT email, name FROM {{tables.users}} WHERE id = $1",
user_id,
column=0 # Get email (first column)
)
# Fetch one row
user = await db.fetch_one(
"SELECT * FROM {{tables.users}} WHERE id = $1",
user_id
)
# user = {"id": 1, "email": "...", "name": "..."}
# Fetch all rows
users = await db.fetch_all(
"SELECT * FROM {{tables.users}} WHERE is_active = $1",
True
)
# users = [{"id": 1, ...}, {"id": 2, ...}]
# Execute without results
await db.execute(
"DELETE FROM {{tables.users}} WHERE id = $1",
user_id
)
# Check table exists
if await db.table_exists("users"):
print("Users table exists")
Batch Operations
# executemany - same query, different params
users = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
("charlie@example.com", "Charlie"),
]
await db.executemany(
"INSERT INTO {{tables.users}} (email, name) VALUES ($1, $2)",
users
)
# copy_records_to_table - fastest for bulk data
records = [
("alice@example.com", "Alice"),
("bob@example.com", "Bob"),
# ... thousands more
]
count = await db.copy_records_to_table(
"users", # Just table name (template applied internally)
records=records,
columns=["email", "name"]
)
# Much faster than executemany for >1000 rows
Pydantic Models
from pydantic import BaseModel
class User(BaseModel):
id: int
email: str
name: str
is_active: bool = True
# Fetch as model
user = await db.fetch_as_model(
User,
"SELECT * FROM {{tables.users}} WHERE id = $1",
user_id
)
# user is User instance (typed)
# Fetch all as models
users = await db.fetch_all_as_model(
User,
"SELECT * FROM {{tables.users}} WHERE is_active = $1",
True
)
# users is list[User] (typed)
Transactions
# Basic transaction
async with db.transaction() as tx:
user_id = await tx.fetch_value(
"INSERT INTO {{tables.users}} (email) VALUES ($1) RETURNING id",
email
)
await tx.execute(
"INSERT INTO {{tables.profiles}} (user_id, bio) VALUES ($1, $2)",
user_id,
"Bio text"
)
# Commits on success, rolls back on exception
# Nested transaction (savepoint)
async with db.transaction() as tx:
await tx.execute("INSERT INTO {{tables.users}} ...")
try:
async with tx.transaction() as nested:
await nested.execute("UPDATE {{tables.users}} SET risky_field = $1", value)
# This can rollback without affecting outer transaction
except Exception:
# Nested rolled back, outer transaction continues
pass
Monitoring
# Get pool statistics
stats = await db.get_pool_stats()
print(f"Total connections: {stats['size']}")
print(f"Active: {stats['used_size']}")
print(f"Idle: {stats['free_size']}")
print(f"Usage: {stats['used_size'] / stats['size']:.1%}")
# Monitor pool health
usage = stats['used_size'] / stats['size']
if usage > 0.8:
logger.warning(f"High pool usage: {usage:.1%}")
Prepared Statements
# Add frequently-used query as prepared statement
db.add_prepared_statement(
"get_user_by_email",
"SELECT * FROM {{tables.users}} WHERE email = $1"
)
# Prepared statements are created on all pool connections
# Improves performance for queries executed repeatedly
DatabaseConfig Complete Reference
Connection Parameters
# Use connection_string (recommended)
config = DatabaseConfig(
connection_string="postgresql://user:pass@host:port/database"
)
# OR use individual parameters
config = DatabaseConfig(
host="localhost",
port=5432,
database="myapp",
user="postgres",
password="secret",
schema="myschema", # Optional schema
)
Pool Configuration
config = DatabaseConfig(
connection_string="...",
# Pool sizing
min_connections=10, # Minimum idle connections
max_connections=50, # Maximum total connections
# Connection lifecycle
max_queries=50000, # Queries before recycling connection
max_inactive_connection_lifetime=300.0, # Seconds before closing idle
command_timeout=60.0, # Default query timeout (seconds)
)
SSL/TLS Configuration
config = DatabaseConfig(
connection_string="postgresql://db.example.com/myapp",
# Enable SSL
ssl_enabled=True,
ssl_mode="verify-full", # 'require', 'verify-ca', 'verify-full'
# Certificate files
ssl_ca_file="/etc/ssl/certs/ca.pem",
ssl_cert_file="/etc/ssl/certs/client.crt", # For mutual TLS
ssl_key_file="/etc/ssl/private/client.key",
ssl_key_password="keypass", # If key is encrypted
)
SSL Modes:
require: Encrypt connection (don't verify certificate)verify-ca: Verify certificate is signed by trusted CAverify-full: Verify certificate AND hostname match
Server-Side Timeouts
Prevent runaway queries and stuck transactions:
config = DatabaseConfig(
connection_string="...",
# Timeouts in milliseconds (None to disable)
statement_timeout_ms=30000, # Abort queries >30 seconds
idle_in_transaction_session_timeout_ms=60000, # Abort idle transactions >1 minute
lock_timeout_ms=5000, # Abort lock waits >5 seconds
)
Default values:
statement_timeout_ms: 60000 (60 seconds)idle_in_transaction_session_timeout_ms: 60000lock_timeout_ms: 5000
Set to None to disable.
Connection Initialization
config = DatabaseConfig(
connection_string="...",
# Custom server settings
server_settings={
"jit": "off", # Disable JIT (prevents latency spikes)
"application_name": "myapp",
"timezone": "UTC",
},
# Commands run on each new connection
init_commands=[
"SET timezone TO 'UTC'",
"SET work_mem TO '256MB'",
],
)
Retry Configuration
config = DatabaseConfig(
connection_string="...",
# Connection retry settings
retry_attempts=3, # Number of retries
retry_delay=1.0, # Initial delay (seconds)
retry_backoff=2.0, # Exponential backoff multiplier
retry_max_delay=30.0, # Maximum delay between retries
)
Related Skills
- For patterns:
pgdbm:using-pgdbm,pgdbm:choosing-pattern - For migrations:
pgdbm:migrations-api-reference - For testing:
pgdbm:testing-database-code