Celery Development Best Practices - Complete Rules Reference
This document compiles all rules from the Celery Development Best Practices framework, organized by impact priority for comprehensive distributed task processing guidance.
1. Configuration & Setup (CRITICAL)
Configuration Broker Setup
Impact: CRITICAL - Ensures reliable message delivery and system stability
Problem: Improper broker configuration can lead to message loss, connection failures, and unreliable task execution. Celery applications depend on robust message broker setup for distributed task processing.
Solution: Configure Redis or RabbitMQ brokers with proper connection settings, persistence, and reliability features. Use connection pooling and implement retry mechanisms for broker connections.
❌ Wrong: Basic broker URL without resilience
# Insufficient for production
app.conf.broker_url = 'redis://localhost:6379/0'
# Missing connection pooling, retries, and error handling
✅ Correct: Redis broker configuration
from celery import Celery
app = Celery('myapp')
app.conf.update(
broker_url='redis://localhost:6379/0',
broker_transport_options={
'master_name': 'mymaster', # For Redis Sentinel
'socket_timeout': 5,
'socket_connect_timeout': 5,
'socket_keepalive': True,
'socket_keepalive_options': {
'TCP_KEEPIDLE': 60,
'TCP_KEEPINTVL': 10,
'TCP_KEEPCNT': 3,
},
'health_check_interval': 30,
},
broker_connection_retry=True,
broker_connection_retry_on_startup=True,
broker_connection_max_retries=10,
)
Result Backend Selection
Impact: MEDIUM-HIGH - Enables task result tracking and retrieval
Problem: Without proper result backend configuration, tasks cannot store or retrieve results, making it impossible to track task completion, handle asynchronous workflows, or implement result-dependent operations.
Solution: Configure appropriate result backends (Redis, database, etc.) with proper expiration policies, serialization settings, and storage optimizations.
✅ Correct: Redis result backend with proper configuration
app.conf.update(
result_backend='redis://localhost:6379/1',
result_backend_transport_options={
'master_name': 'mymaster',
'socket_timeout': 5,
'socket_connect_timeout': 5,
'socket_keepalive': True,
},
result_expires=3600, # 1 hour
result_cache_max=10000,
result_serializer='json',
accept_content=['json'],
result_extended=True, # Store additional metadata
)
2. Task Definition & Execution (CRITICAL)
Task Atomic Operations
Impact: CRITICAL - Prevents data corruption and ensures task reliability
Problem: Non-atomic tasks can leave systems in inconsistent states when failures occur midway through execution. Distributed task processing increases the risk of partial completion and data corruption.
Solution: Design tasks to be atomic - either complete fully or not at all. Use transactions, idempotency keys, and proper error handling to ensure task reliability.
✅ Correct: Atomic task with transaction
from django.db import transaction
from celery import shared_task
@shared_task(bind=True, max_retries=3)
def process_order_payment(self, order_id, payment_data):
"""Atomic payment processing task"""
try:
with transaction.atomic():
order = Order.objects.select_for_update().get(id=order_id)
if order.status != 'pending':
return f"Order {order_id} already processed"
payment_result = payment_gateway.charge(payment_data)
order.status = 'paid' if payment_result.success else 'failed'
order.payment_id = payment_result.transaction_id
order.save()
send_payment_confirmation.delay(order.customer_email, order.id)
return f"Payment processed for order {order_id}"
except Exception as exc:
logger.error(f"Payment processing failed for order {order_id}: {exc}")
raise self.retry(countdown=60 * (2 ** self.request.retries), exc=exc)
❌ Wrong: Non-atomic task with partial updates
@shared_task
def process_user_registration(user_data):
"""Dangerous: partial updates without atomicity"""
user = User.objects.create(email=user_data['email'], name=user_data['name'])
send_welcome_email(user.email) # What if this fails?
UserProfile.objects.create(user=user, preferences=user_data.get('preferences', {}))
# Inconsistent data if any step fails!
Task Timeout Management
Impact: CRITICAL - Prevents runaway tasks and resource exhaustion
Problem: Tasks without proper timeouts can run indefinitely, consuming resources and blocking worker processes. This can lead to system slowdowns and require manual intervention.
Solution: Implement appropriate timeouts for different types of tasks based on their expected execution time and resource requirements.
✅ Correct: Task timeout configuration
@shared_task(bind=True, soft_time_limit=300, time_limit=360)
def long_running_task(self, data):
"""Task with soft and hard timeouts"""
try:
# Soft time limit allows cleanup
for item in data:
if self.is_aborted():
# Handle abortion gracefully
cleanup_partial_work()
return "Task aborted"
except Exception as exc:
logger.error(f"Task failed: {exc}")
raise
Task Calling Methods
Impact: HIGH - Ensures proper task execution and result handling
Problem: Different calling methods have different behaviors and performance implications. Using the wrong method can lead to unexpected blocking, lost results, or performance issues.
Solution: Choose the appropriate calling method based on your needs: delay() for fire-and-forget, apply_async() for advanced options, and call() for synchronous execution.
✅ Correct: Fire-and-forget with delay()
# Simple async execution - no result needed
send_email.delay(user_id=123, template='welcome')
✅ Correct: Advanced options with apply_async()
# Advanced execution options
result = send_email.apply_async(
args=[user_id, template],
countdown=60, # Delay execution by 60 seconds
expires=3600, # Expire if not executed within 1 hour
queue='email_queue',
priority=9, # Higher priority
)
✅ Correct: Synchronous execution with call()
# Synchronous execution for testing or immediate results
total = calculate_total.call(items)
Task Options and Configuration
Impact: HIGH - Ensures proper task behavior and reliability
Problem: Celery tasks have many configuration options that control behavior, but using defaults can lead to unexpected behavior, performance issues, or reliability problems in production.
Solution: Configure task options appropriately based on use case requirements, including retry behavior, timeouts, naming, and execution parameters.
✅ Correct: Comprehensive task configuration
@shared_task(
name='myapp.tasks.process_user_data',
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
time_limit=3600,
soft_time_limit=3300,
acks_late=True,
ignore_result=False,
queue='default',
priority=5,
expires=86400,
)
def process_user_data(self, user_id):
"""Comprehensive task configuration example"""
# Implementation...
3. Error Handling & Reliability (HIGH)
Error Retry Strategy
Impact: HIGH - Ensures task reliability and system resilience
Problem: Tasks can fail due to temporary issues like network timeouts, database locks, or external service unavailability. Without proper retry mechanisms, failed tasks are lost and systems become unreliable.
Solution: Implement intelligent retry strategies with exponential backoff, maximum retry limits, and proper exception handling.
✅ Correct: Intelligent retry with exponential backoff
@shared_task(bind=True, max_retries=5, default_retry_delay=60)
def api_data_sync(self, api_endpoint, data):
"""Task with intelligent retry strategy"""
try:
response = requests.post(api_endpoint, json=data, timeout=30)
if response.status_code == 429: # Rate limited
raise self.retry(countdown=300, exc=Exception("Rate limited"))
response.raise_for_status()
return response.json()
except Timeout:
logger.warning(f"Timeout calling {api_endpoint}, retrying...")
raise self.retry(countdown=min(300, 30 * (2 ** self.request.retries)))
except RequestException as exc:
if self.request.retries < 3:
logger.warning(f"Network error: {exc}, retrying...")
raise self.retry(countdown=10 * (2 ** self.request.retries), exc=exc)
else:
logger.error(f"Persistent network error: {exc}")
raise
except Exception as exc:
logger.error(f"Unexpected error in api_data_sync: {exc}")
if self.request.retries == 0:
raise self.retry(countdown=60, exc=exc)
raise
❌ Wrong: Simple retry without intelligence
@shared_task(max_retries=3)
def unreliable_task():
"""Poor retry strategy"""
# Will retry immediately on any failure
external_api_call()
4. Canvas: Designing Work-flows (HIGH)
Canvas Chain Workflows
Impact: HIGH - Enables sequential task execution and error propagation
Problem: Complex business processes require tasks to execute in sequence, with each task depending on the result of the previous one. Without proper chaining, workflows become error-prone and hard to manage.
Solution: Use Celery's chain primitive to create sequential workflows where each task receives the result of the previous task as input.
✅ Correct: Simple sequential chain
from celery import chain
# Chain: validate -> process -> notify
workflow = chain(
validate_data.s(),
process_user_data.s(),
send_notification.s()
)
result = workflow.delay({'user_id': 123, 'data': '...'})
Canvas Group Parallel Execution
Impact: HIGH - Enables parallel task execution for performance optimization
Problem: Tasks that can run independently should execute in parallel to improve performance, but improper parallelization can lead to resource exhaustion, race conditions, or coordination issues.
Solution: Use Celery's group primitive to execute multiple tasks in parallel, with proper error handling and result aggregation.
✅ Correct: Parallel task execution with group
from celery import group
parallel_tasks = group(
process_image.s(image_id, op)
for image_id, op in zip(image_ids, operations)
)
results = parallel_tasks.apply_async().get(timeout=300)
Canvas Chord Synchronization
Impact: HIGH - Enables complex workflows with parallel execution and synchronization
Problem: Some workflows require parallel processing followed by a synchronization step that depends on all parallel tasks completing. Without chords, implementing these patterns is complex and error-prone.
Solution: Use Celery's chord primitive to execute a group of tasks in parallel, then run a callback task that receives all results once all parallel tasks complete.
✅ Correct: Basic chord pattern
from celery import chord, group
workflow = chord(
group(process_data_chunk.s(chunk_id, data) for chunk_id, data in data_chunks),
aggregate_results.s()
)
result = workflow.apply_async()
final_aggregate = result.get(timeout=300)
Canvas Map and Starmap Operations
Impact: MEDIUM-HIGH - Enables efficient batch processing and data transformations
Problem: Processing multiple items with similar operations requires efficient patterns. Manual iteration or individual task calls can be inefficient and hard to manage.
Solution: Use Celery's map and starmap primitives to apply tasks to sequences of data efficiently, with proper error handling and result aggregation.
✅ Correct: Using map for simple transformations
from celery import shared_task
@shared_task
def square_number(x):
return x * x
# Map: apply square_number to each element
result = square_number.map([1, 2, 3, 4, 5]).apply_async().get()
# Results: [1, 4, 9, 16, 25]
5. Monitoring & Logging (HIGH)
Monitoring Task Tracking
Impact: HIGH - Enables debugging and performance optimization
Problem: Without proper monitoring, failed tasks go undetected, performance issues remain hidden, and debugging distributed systems becomes nearly impossible.
Solution: Implement comprehensive task monitoring with logging, metrics collection, and alerting.
✅ Correct: Comprehensive task monitoring setup
from celery.signals import task_prerun, task_postrun, task_failure, task_success
import time
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kw):
"""Track task start"""
start_time = time.time()
cache.set(f"task_{task_id}_start", start_time, 3600)
logger.info(f"Task {task.name}[{task_id}] started", extra={
'task_id': task_id,
'task_name': task.name,
'args': str(args)[:100],
'timestamp': start_time
})
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
"""Track successful task completion"""
task_id = kwargs.get('task_id')
start_time = cache.get(f"task_{task_id}_start")
if start_time:
duration = time.time() - start_time
logger.info(f"Task {sender.name}[{task_id}] completed successfully", extra={
'task_id': task_id,
'task_name': sender.name,
'duration': duration
})
cache.delete(f"task_{task_id}_start")
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, **kw):
"""Track task failures"""
logger.error(f"Task {sender.name}[{task_id}] failed", extra={
'task_id': task_id,
'task_name': sender.name,
'exception': str(exception),
'retry_count': kw.get('request', {}).retries if 'request' in kw else 0
}, exc_info=True)
6. Performance & Scaling (MEDIUM-HIGH)
Performance Concurrency Tuning
Impact: MEDIUM-HIGH - Optimizes task processing throughput and resource utilization
Problem: Default Celery worker settings often result in suboptimal performance, either wasting resources or creating bottlenecks.
Solution: Tune worker concurrency based on workload characteristics, available resources, and performance requirements.
✅ Correct: Optimized worker concurrency configuration
app.conf.update(
worker_pool='prefork', # or 'eventlet', 'gevent' for I/O bound tasks
worker_concurrency=4, # Number of worker processes/threads
worker_prefetch_multiplier=1, # Conservative prefetching
broker_pool_limit=10, # Max broker connections per worker
redis_max_connections=20, # Redis connection pool size
)
❌ Wrong: Default settings without tuning
app = Celery('myapp')
# worker_prefetch_multiplier defaults to 4
# May cause memory issues or slow processing
7. Security (MEDIUM-HIGH)
Security Task Authentication
Impact: MEDIUM-HIGH - Prevents unauthorized task execution
Problem: Celery tasks can be invoked from various sources, and without proper authentication, malicious actors could execute tasks or access sensitive operations.
Solution: Implement task-level authentication and authorization checks to ensure only authorized sources can execute sensitive tasks.
✅ Correct: Task authentication with signatures
import hmac
import hashlib
from celery import shared_task
def generate_task_signature(task_name, args, secret_key):
"""Generate HMAC signature for task authentication"""
message = f"{task_name}:{str(args)}"
return hmac.new(secret_key.encode(), message.encode(), hashlib.sha256).hexdigest()
def verify_task_signature(task_name, args, signature, secret_key):
"""Verify task signature"""
expected = generate_task_signature(task_name, args, secret_key)
return hmac.compare_digest(expected, signature)
@shared_task(bind=True)
def secure_task(self, data, signature):
"""Task with signature-based authentication"""
# Verify signature before processing
if not verify_task_signature(self.name, data, signature, settings.CELERY_SECRET_KEY):
logger.error(f"Invalid signature for task {self.name}")
raise PermissionDenied("Invalid task signature")
# Process authenticated task
return process_secure_data(data)
8. Result Backends (MEDIUM-HIGH)
Result Expiry Management
Impact: MEDIUM-HIGH - Prevents storage bloat and improves performance
Problem: Task results accumulate over time without proper expiration, leading to storage bloat and degraded performance. Old results become irrelevant but continue consuming resources.
Solution: Implement appropriate result expiration policies based on use case requirements and storage constraints.
✅ Correct: Configured result expiration
app.conf.update(
result_expires={
'myapp.tasks.temporary_result': 300, # 5 minutes
'myapp.tasks.important_result': 86400, # 24 hours
'*': 3600, # Default 1 hour for all other tasks
},
# Custom expiration function
result_expires_callback=lambda task_name, args, kwargs: get_custom_expiry(task_name)
)
def get_custom_expiry(task_name):
"""Custom expiration logic based on task type"""
if 'report' in task_name:
return 604800 # 7 days for reports
elif 'cache' in task_name:
return 3600 # 1 hour for cache results
else:
return 86400 # 24 hours default
9. Routing & Queues (MEDIUM)
Routing Task Distribution
Impact: MEDIUM - Optimizes task processing efficiency
Problem: Without proper task routing, all tasks compete for the same worker resources, leading to inefficient processing and potential bottlenecks.
Solution: Implement intelligent task routing to distribute workloads across specialized queues and workers.
✅ Correct: Task routing configuration
app.conf.update(
task_routes={
'myapp.tasks.cpu_intensive': {'queue': 'cpu'},
'myapp.tasks.io_intensive': {'queue': 'io'},
'myapp.tasks.priority_high': {'queue': 'priority', 'routing_key': 'high'},
'myapp.tasks.scheduled': {'queue': 'scheduled'},
},
# Queue definitions
task_create_missing_queues=True,
task_default_queue='celery',
task_default_exchange='celery',
task_default_routing_key='celery',
)
# Worker command for specialized queues:
# celery -A myapp worker -Q cpu --concurrency=2 --pool=prefork
# celery -A myapp worker -Q io --concurrency=50 --pool=gevent
10. Periodic Tasks (MEDIUM)
Periodic Task Scheduling
Impact: MEDIUM - Enables reliable scheduled task execution
Problem: Periodic tasks require reliable scheduling and execution, but misconfiguration can lead to missed executions, duplicate runs, or system overload.
Solution: Configure Celery Beat with proper scheduling, timezone handling, and reliability features.
✅ Correct: Celery Beat configuration
from celery import Celery
from celery.schedules import crontab
app = Celery('myapp')
app.conf.update(
beat_schedule={
'cleanup-expired-sessions': {
'task': 'myapp.tasks.cleanup_sessions',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
'args': (),
},
'send-daily-reports': {
'task': 'myapp.tasks.generate_reports',
'schedule': crontab(hour=6, minute=0, day_of_week='mon-fri'),
'args': ('daily',),
},
'health-check': {
'task': 'myapp.tasks.health_check',
'schedule': 300.0, # Every 5 minutes
},
},
# Beat settings
beat_max_loop_interval=300, # 5 minutes
beat_schedule_filename='/var/run/celery/beat-schedule',
beat_sync_every=1,
)
# Timezone-aware scheduling
app.conf.timezone = 'UTC'
app.conf.enable_utc = True
11. Serialization (MEDIUM)
Serialization Data Handling
Impact: MEDIUM - Ensures reliable data transmission and storage
Problem: Improper serialization can lead to data corruption, compatibility issues, or security vulnerabilities when passing complex objects through Celery.
Solution: Choose appropriate serializers and handle complex data types properly.
✅ Correct: Serialization configuration
app.conf.update(
# Primary serializer
task_serializer='json',
accept_content=['json'],
# Result serialization
result_serializer='json',
# Custom serializer for complex objects
task_serializers={
'myjson': 'myapp.serializers.MyJSONSerializer',
}
)
from kombu.serialization import register
from myapp.models import CustomModel
class MyJSONSerializer:
"""Custom serializer for Django models"""
def serialize(self, data):
if isinstance(data, CustomModel):
return {
'type': 'CustomModel',
'id': data.id,
'name': data.name,
}
return data
def deserialize(self, data):
if isinstance(data, dict) and data.get('type') == 'CustomModel':
return CustomModel.objects.get(id=data['id'])
return data
# Register custom serializer
register('myjson', MyJSONSerializer().serialize, MyJSONSerializer().deserialize,
content_type='application/x-myjson', content_encoding='utf-8')
12. Worker Management (MEDIUM)
Worker Lifecycle Management
Impact: MEDIUM - Ensures reliable worker operation and resource management
Problem: Workers can crash, hang, or consume excessive resources without proper lifecycle management, leading to unreliable task processing.
Solution: Implement proper worker lifecycle management with health checks, graceful shutdown, and resource monitoring.
✅ Correct: Worker lifecycle signals
from celery.signals import worker_init, worker_ready, worker_shutdown
import signal
import os
@worker_init.connect
def worker_init_handler(**kwargs):
"""Initialize worker resources"""
logger.info("Worker initializing...")
# Setup database connections, cache, etc.
setup_worker_resources()
@worker_ready.connect
def worker_ready_handler(**kwargs):
"""Worker is ready to accept tasks"""
logger.info("Worker ready to accept tasks")
# Register with monitoring system
register_worker_health()
@worker_shutdown.connect
def worker_shutdown_handler(**kwargs):
"""Cleanup before shutdown"""
logger.info("Worker shutting down...")
# Cleanup resources
cleanup_worker_resources()
# Unregister from monitoring
unregister_worker_health()
def graceful_shutdown(signum, frame):
"""Handle shutdown signals gracefully"""
logger.info(f"Received signal {signum}, initiating graceful shutdown")
# Signal Celery to stop accepting new tasks
os._exit(0)
# Register signal handlers
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)
13. Signals and Events (MEDIUM)
Signals and Events System
Impact: MEDIUM - Enables monitoring, debugging, and extending Celery behavior
Problem: Without access to Celery's internal events, monitoring task lifecycle, debugging issues, and extending functionality becomes difficult. Important events like task start, success, failure, and worker status changes go unobserved.
Solution: Use Celery's signals system to hook into task and worker events for monitoring, logging, metrics collection, and custom behavior extension.
✅ Correct: Comprehensive signal-based monitoring
from celery.signals import task_prerun, task_success, task_failure
@task_prerun.connect
def task_started(sender=None, task_id=None, task=None, **kw):
logger.info(f"Task started: {task.name}[{task_id}]")
@task_success.connect
def task_completed(sender=None, result=None, **kwargs):
task_id = kwargs.get('task_id')
logger.info(f"Task succeeded: {sender.name}[{task_id}]")
@task_failure.connect
def task_failed(sender=None, task_id=None, exception=None, **kw):
logger.error(f"Task failed: {sender.name}[{task_id]} - {exception}")
14. Remote Control and Inspection (MEDIUM)
Remote Control and Worker Inspection
Impact: MEDIUM - Enables runtime management and troubleshooting of Celery workers
Problem: Managing distributed Celery workers requires visibility into their state, ability to inspect running tasks, and control over worker behavior. Without remote control capabilities, debugging issues and managing workers becomes difficult.
Solution: Use Celery's remote control commands and inspection API to monitor worker status, inspect active tasks, manage queues, and control worker behavior at runtime.
✅ Correct: Worker status monitoring and inspection
# Get active tasks on all workers
active_tasks = app.control.inspect().active()
# Check worker connectivity
ping_result = app.control.ping()
# Revoke tasks
app.control.revoke(task_ids, terminate=False)
# Add/remove queue consumers
app.control.add_consumer('new_queue')
app.control.cancel_consumer('old_queue')
15. Testing Celery Applications (MEDIUM-HIGH)
Testing Celery Applications
Impact: MEDIUM-HIGH - Ensures reliability and prevents production issues
Problem: Celery applications are distributed and asynchronous, making testing complex. Without proper testing strategies, bugs can remain undetected until production, causing data corruption, lost tasks, or system failures.
Solution: Implement comprehensive testing strategies including unit tests, integration tests, and end-to-end tests specifically designed for Celery's asynchronous nature.
✅ Correct: Unit testing tasks
from unittest.mock import patch
def test_process_payment_success(self, mock_charge):
mock_charge.return_value = MagicMock(success=True)
result = process_payment.apply(args=[123, 99.99])
self.assertEqual(result.get(), "Payment processed for order 123")
✅ Correct: Testing task chains and workflows
@patch('myapp.tasks.validate_order')
@patch('myapp.tasks.process_payment')
def test_order_workflow(self, mock_payment, mock_validate):
mock_validate.return_value = {'order_id': 123}
mock_payment.return_value = {'status': 'success'}
workflow = chain(validate_order.s(), process_payment.s())
result = workflow.apply()
self.assertTrue(result.get()['status'] == 'success')
17. Daemonization and Process Management (MEDIUM)
Daemonization and Process Management
Impact: MEDIUM - Ensures reliable production deployment and process lifecycle management
Problem: Celery workers need to run continuously in production environments, but manual process management leads to reliability issues, difficult monitoring, and improper resource cleanup when processes crash or need to be restarted.
Solution: Use proper daemonization tools and process supervisors to manage Celery workers as reliable system services with automatic restart, logging, and monitoring capabilities.
✅ Correct: Systemd service configuration
# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker Service
After=network.target redis-server.service postgresql.service
Requires=redis-server.service postgresql.service
[Service]
Type=simple
User=celery
Group=celery
Environment=PYTHONPATH=/opt/myapp
Environment=DJANGO_SETTINGS_MODULE=myapp.settings.production
WorkingDirectory=/opt/myapp
ExecStart=/opt/myapp/venv/bin/celery -A myapp worker \
--pool=prefork \
--concurrency=4 \
--loglevel=info \
--logfile=/var/log/celery/worker.log \
--pidfile=/var/run/celery/worker.pid \
--statedb=/var/run/celery/worker.state
ExecReload=/bin/kill -s HUP $MAINPID
Restart=always
RestartSec=10
LimitNOFILE=65536
LimitNPROC=65536
[Install]
WantedBy=multi-user.target
18. Debugging Celery Applications (HIGH)
Debugging Celery Applications
Impact: HIGH - Enables effective troubleshooting and issue resolution in distributed systems
Problem: Celery's distributed nature makes debugging complex. Without proper debugging techniques, developers struggle to identify root causes of task failures, performance issues, and system-wide problems in production environments.
Solution: Implement comprehensive debugging strategies including logging configuration, task inspection, worker debugging, and systematic troubleshooting approaches for common Celery issues.
✅ Correct: Comprehensive logging configuration
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def debug_task(self, data):
logger.info(f"Task started: {self.request.id}", extra={
'task_id': self.request.id,
'task_name': self.name,
'args': self.request.args,
'kwargs': self.request.kwargs,
})
try:
result = process_data(data)
logger.info(f"Task completed: {self.request.id}")
return result
except Exception as exc:
logger.error(f"Task failed: {self.request.id}", exc_info=True)
raise
19. Advanced Performance Optimization (MEDIUM-HIGH)
Advanced Performance Optimization
Impact: MEDIUM-HIGH - Maximizes throughput and minimizes resource usage in high-performance scenarios
Problem: Basic concurrency tuning isn't sufficient for high-performance Celery deployments. Advanced optimization techniques are needed to handle high-throughput scenarios, minimize latency, and optimize resource utilization across complex distributed systems.
Solution: Implement advanced performance optimization techniques including prefetch tuning, connection pooling optimization, task chunking, result backend optimization, and worker pool specialization.
✅ Correct: Advanced prefetch and acknowledgment optimization
app.conf.update(
worker_prefetch_multiplier=1, # Conservative prefetching
task_acks_late=True, # Acknowledge after execution
broker_pool_limit=20, # More connections for high throughput
worker_max_tasks_per_child=1000, # Restart worker after N tasks
)
20. Extensions and Bootsteps Customization (LOW)
Extensions and Bootsteps Customization
Impact: LOW - Enables advanced customization and extension of Celery behavior
Problem: Standard Celery configuration may not be sufficient for complex requirements. Organizations need to extend Celery's functionality with custom behaviors, monitoring, security features, or integration with other systems.
Solution: Use Celery's extension system and bootsteps to customize worker behavior, add middleware, implement custom monitoring, and extend Celery's capabilities.
✅ Correct: Custom bootstep for enhanced monitoring
from celery import bootsteps
class MonitoringBootstep(bootsteps.StartStopStep):
def __init__(self, parent, **kwargs):
super().__init__(parent, **kwargs)
self.metrics = {}
def start(self, parent):
print("Starting custom monitoring bootstep")
# Initialize monitoring
def stop(self, parent):
print("Stopping custom monitoring bootstep")
# Cleanup monitoring
21. Advanced Patterns (LOW)
Advanced Task Chaining
Impact: LOW - Enables complex workflow orchestration
Problem: Simple task execution may not suffice for complex workflows requiring conditional execution, error handling, or result-dependent processing.
Solution: Use Celery's advanced features like chains, groups, chords, and callbacks for complex workflow orchestration.
✅ Correct: Advanced task workflows
from celery import chain, group, chord
# Chain: sequential execution
workflow = chain(
validate_data.s(data),
process_data.s(),
send_notification.s()
)
result = workflow()
# Group: parallel execution
parallel_tasks = group(
process_chunk.s(chunk) for chunk in data_chunks
)
results = parallel_tasks.apply_async()
# Chord: parallel execution with callback
chord_tasks = chord(
group(process_item.s(item) for item in items),
process_results.s()
)
final_result = chord_tasks()
# Canvas with error handling
@shared_task
def complex_workflow(order_id):
"""Complex workflow with error handling"""
try:
# Step 1: Validate order
validation = validate_order.s(order_id)
# Step 2: Process payment (depends on validation)
payment = process_payment.s(order_id)
# Step 3: Update inventory and send confirmation
finalize = group(
update_inventory.s(order_id),
send_confirmation.s(order_id)
)
# Execute workflow
workflow = chain(validation, payment, finalize)
return workflow.apply_async()
except Exception as exc:
logger.error(f"Workflow failed for order {order_id}: {exc}")
# Compensate if needed
compensate_order.delay(order_id)
raise
This Celery skill provides comprehensive guidance for building reliable, scalable distributed task processing systems. Regular updates will add more rules and patterns as Celery evolves.</content> <parameter name="filePath">skills/celery-skill/AGENTS.md