Implement asynchronous task processing in Flask with Celery and Redis for scalable applications.
# Flask with Celery Background Tasks for Google Antigravity
Implement robust background task processing with Flask and Celery in your Google Antigravity projects. This guide covers task configuration, scheduling, monitoring, and production patterns.
## Celery Configuration
Set up Celery with Flask:
```python
# app/celery_config.py
from celery import Celery
from kombu import Queue, Exchange
def make_celery(app_name: str = __name__) -> Celery:
"""Create and configure Celery instance."""
celery = Celery(
app_name,
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=["app.tasks"],
)
celery.conf.update(
# Task settings
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
# Task execution
task_acks_late=True,
task_reject_on_worker_lost=True,
task_time_limit=3600, # 1 hour hard limit
task_soft_time_limit=3300, # 55 minutes soft limit
# Worker settings
worker_prefetch_multiplier=1,
worker_concurrency=4,
worker_max_tasks_per_child=1000,
# Result backend settings
result_expires=86400, # 24 hours
result_extended=True,
# Queue configuration
task_queues=(
Queue("default", Exchange("default"), routing_key="default"),
Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
Queue("low_priority", Exchange("low_priority"), routing_key="low_priority"),
Queue("emails", Exchange("emails"), routing_key="emails"),
),
task_default_queue="default",
task_default_exchange="default",
task_default_routing_key="default",
# Task routing
task_routes={
"app.tasks.send_email": {"queue": "emails"},
"app.tasks.process_payment": {"queue": "high_priority"},
"app.tasks.generate_report": {"queue": "low_priority"},
},
# Beat schedule for periodic tasks
beat_schedule={
"cleanup-old-sessions": {
"task": "app.tasks.cleanup_sessions",
"schedule": 3600.0, # Every hour
},
"send-daily-digest": {
"task": "app.tasks.send_daily_digest",
"schedule": crontab(hour=9, minute=0),
},
"sync-external-data": {
"task": "app.tasks.sync_external_data",
"schedule": 300.0, # Every 5 minutes
},
},
)
return celery
celery = make_celery("app")
```
## Flask Application Integration
Integrate Celery with Flask:
```python
# app/__init__.py
from flask import Flask
from app.celery_config import celery
def create_app(config_name: str = "development") -> Flask:
"""Application factory."""
app = Flask(__name__)
app.config.from_object(f"app.config.{config_name.capitalize()}Config")
# Initialize extensions
from app.extensions import db, migrate, mail
db.init_app(app)
migrate.init_app(app, db)
mail.init_app(app)
# Configure Celery with Flask context
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
celery.conf.update(app.config)
# Register blueprints
from app.api import api_bp
app.register_blueprint(api_bp, url_prefix="/api")
return app
```
## Task Definitions
Create robust task implementations:
```python
# app/tasks.py
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError, SoftTimeLimitExceeded
from celery.utils.log import get_task_logger
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import time
from app.extensions import db
from app.models import User, Email, Report
from app.services import EmailService, PaymentService, ReportGenerator
logger = get_task_logger(__name__)
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def send_email(
self,
recipient: str,
subject: str,
template: str,
context: Dict[str, Any],
) -> Dict[str, Any]:
"""Send an email asynchronously with retry logic."""
logger.info(f"Sending email to {recipient}: {subject}")
try:
email_service = EmailService()
result = email_service.send(
to=recipient,
subject=subject,
template=template,
context=context,
)
# Log to database
email_record = Email(
recipient=recipient,
subject=subject,
status="sent",
sent_at=datetime.utcnow(),
task_id=self.request.id,
)
db.session.add(email_record)
db.session.commit()
logger.info(f"Email sent successfully to {recipient}")
return {"status": "success", "message_id": result.message_id}
except Exception as exc:
logger.error(f"Failed to send email to {recipient}: {exc}")
try:
raise self.retry(exc=exc)
except MaxRetriesExceededError:
# Log permanent failure
email_record = Email(
recipient=recipient,
subject=subject,
status="failed",
error_message=str(exc),
task_id=self.request.id,
)
db.session.add(email_record)
db.session.commit()
raise
@shared_task(
bind=True,
max_retries=5,
default_retry_delay=30,
rate_limit="10/m", # 10 tasks per minute
)
def process_payment(
self,
user_id: int,
amount: float,
currency: str,
payment_method_id: str,
) -> Dict[str, Any]:
"""Process a payment with rate limiting and retries."""
logger.info(f"Processing payment for user {user_id}: {amount} {currency}")
try:
user = User.query.get(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
payment_service = PaymentService()
result = payment_service.charge(
user=user,
amount=amount,
currency=currency,
payment_method_id=payment_method_id,
)
logger.info(f"Payment processed: {result.transaction_id}")
return {
"status": "success",
"transaction_id": result.transaction_id,
"amount": amount,
"currency": currency,
}
except payment_service.RetryableError as exc:
logger.warning(f"Retryable payment error: {exc}")
raise self.retry(exc=exc)
except payment_service.PermanentError as exc:
logger.error(f"Permanent payment error: {exc}")
raise
@shared_task(bind=True, soft_time_limit=1800, time_limit=2000)
def generate_report(
self,
report_type: str,
user_id: int,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""Generate a report with progress tracking."""
logger.info(f"Generating {report_type} report for user {user_id}")
try:
report_generator = ReportGenerator()
# Update task state with progress
self.update_state(
state="PROGRESS",
meta={"current": 0, "total": 100, "status": "Starting..."},
)
def progress_callback(current: int, total: int, status: str):
self.update_state(
state="PROGRESS",
meta={"current": current, "total": total, "status": status},
)
report = report_generator.generate(
report_type=report_type,
user_id=user_id,
params=params,
progress_callback=progress_callback,
)
# Save report record
report_record = Report(
type=report_type,
user_id=user_id,
file_path=report.file_path,
status="completed",
task_id=self.request.id,
)
db.session.add(report_record)
db.session.commit()
return {
"status": "success",
"report_id": report_record.id,
"file_path": report.file_path,
}
except SoftTimeLimitExceeded:
logger.warning(f"Report generation soft time limit exceeded")
# Save partial progress
raise
@shared_task
def cleanup_sessions() -> Dict[str, int]:
"""Periodic task to clean up expired sessions."""
logger.info("Starting session cleanup")
from app.models import Session
expired_before = datetime.utcnow() - timedelta(days=30)
deleted_count = Session.query.filter(
Session.last_activity < expired_before
).delete()
db.session.commit()
logger.info(f"Cleaned up {deleted_count} expired sessions")
return {"deleted_count": deleted_count}
@shared_task
def chain_tasks_example(data: Dict[str, Any]) -> str:
"""Example of task chaining."""
from celery import chain, group, chord
# Chain: execute tasks sequentially
workflow = chain(
validate_data.s(data),
process_data.s(),
save_results.s(),
)
# Group: execute tasks in parallel
parallel_tasks = group(
send_notification.s(data, "email"),
send_notification.s(data, "sms"),
send_notification.s(data, "push"),
)
# Chord: parallel tasks with callback
full_workflow = chord(parallel_tasks)(aggregate_results.s())
return full_workflow.id
```
## API Endpoints for Tasks
Create Flask endpoints for task management:
```python
# app/api/tasks.py
from flask import Blueprint, jsonify, request
from app.tasks import send_email, generate_report, process_payment
from celery.result import AsyncResult
from app.celery_config import celery
tasks_bp = Blueprint("tasks", __name__)
@tasks_bp.route("/tasks/<task_id>", methods=["GET"])
def get_task_status(task_id: str):
"""Get the status of a task."""
result = AsyncResult(task_id, app=celery)
response = {
"task_id": task_id,
"status": result.status,
"ready": result.ready(),
}
if result.ready():
if result.successful():
response["result"] = result.result
else:
response["error"] = str(result.result)
elif result.status == "PROGRESS":
response["progress"] = result.info
return jsonify(response)
@tasks_bp.route("/reports", methods=["POST"])
def create_report():
"""Queue a report generation task."""
data = request.get_json()
task = generate_report.delay(
report_type=data["report_type"],
user_id=data["user_id"],
params=data.get("params", {}),
)
return jsonify({
"task_id": task.id,
"status": "queued",
}), 202
```
Google Antigravity generates production-ready Celery configurations with proper task routing, error handling, and monitoring for reliable background processing.This Flask prompt is ideal for developers working on:
By using this prompt, you can save hours of manual coding and ensure best practices are followed from the start. It's particularly valuable for teams looking to maintain consistency across their flask implementations.
Yes! All prompts on Antigravity AI Directory are free to use for both personal and commercial projects. No attribution required, though it's always appreciated.
This prompt works excellently with Claude, ChatGPT, Cursor, GitHub Copilot, and other modern AI coding assistants. For best results, use models with large context windows.
You can modify the prompt by adding specific requirements, constraints, or preferences. For Flask projects, consider mentioning your framework version, coding style, and any specific libraries you're using.