Google Antigravity Directory

The #1 directory for Google Antigravity prompts, rules, workflows & MCP servers. Optimized for Gemini 3 agentic development.

Resources

PromptsMCP ServersAntigravity RulesGEMINI.md GuideBest Practices

Company

Submit PromptAntigravityAI.directory

Popular Prompts

Next.js 14 App RouterReact TypeScriptTypeScript AdvancedFastAPI GuideDocker Best Practices

Legal

Privacy PolicyTerms of ServiceContact Us
Featured on FazierVerified on Verified ToolsFeatured on WayfindioAntigravity AI - Featured on Startup FameFeatured on Wired BusinessFeatured on Twelve ToolsListed on Turbo0Featured on findly.toolsFeatured on Aura++That App ShowFeatured on FazierVerified on Verified ToolsFeatured on WayfindioAntigravity AI - Featured on Startup FameFeatured on Wired BusinessFeatured on Twelve ToolsListed on Turbo0Featured on findly.toolsFeatured on Aura++That App Show

© 2026 Antigravity AI Directory. All rights reserved.

The #1 directory for Google Antigravity IDE

This website is not affiliated with, endorsed by, or associated with Google LLC. "Google" and "Gemini" are trademarks of Google LLC.

Antigravity AI Directory
PromptsMCPBest PracticesUse CasesLearn
Home
Prompts
Flask with Celery Background Tasks

Flask with Celery Background Tasks

Implement asynchronous task processing in Flask with Celery and Redis for scalable applications.

FlaskCeleryPythonAsync
by Community
⭐0Stars
👁️11Views
📋1Copies
.antigravity
# Flask with Celery Background Tasks for Google Antigravity

Implement robust background task processing with Flask and Celery in your Google Antigravity projects. This guide covers task configuration, scheduling, monitoring, and production patterns.

## Celery Configuration

Set up Celery with Flask:

```python
# app/celery_config.py
from celery import Celery
from kombu import Queue, Exchange

def make_celery(app_name: str = __name__) -> Celery:
    """Create and configure Celery instance."""
    
    celery = Celery(
        app_name,
        broker="redis://localhost:6379/0",
        backend="redis://localhost:6379/1",
        include=["app.tasks"],
    )
    
    celery.conf.update(
        # Task settings
        task_serializer="json",
        accept_content=["json"],
        result_serializer="json",
        timezone="UTC",
        enable_utc=True,
        
        # Task execution
        task_acks_late=True,
        task_reject_on_worker_lost=True,
        task_time_limit=3600,  # 1 hour hard limit
        task_soft_time_limit=3300,  # 55 minutes soft limit
        
        # Worker settings
        worker_prefetch_multiplier=1,
        worker_concurrency=4,
        worker_max_tasks_per_child=1000,
        
        # Result backend settings
        result_expires=86400,  # 24 hours
        result_extended=True,
        
        # Queue configuration
        task_queues=(
            Queue("default", Exchange("default"), routing_key="default"),
            Queue("high_priority", Exchange("high_priority"), routing_key="high_priority"),
            Queue("low_priority", Exchange("low_priority"), routing_key="low_priority"),
            Queue("emails", Exchange("emails"), routing_key="emails"),
        ),
        
        task_default_queue="default",
        task_default_exchange="default",
        task_default_routing_key="default",
        
        # Task routing
        task_routes={
            "app.tasks.send_email": {"queue": "emails"},
            "app.tasks.process_payment": {"queue": "high_priority"},
            "app.tasks.generate_report": {"queue": "low_priority"},
        },
        
        # Beat schedule for periodic tasks
        beat_schedule={
            "cleanup-old-sessions": {
                "task": "app.tasks.cleanup_sessions",
                "schedule": 3600.0,  # Every hour
            },
            "send-daily-digest": {
                "task": "app.tasks.send_daily_digest",
                "schedule": crontab(hour=9, minute=0),
            },
            "sync-external-data": {
                "task": "app.tasks.sync_external_data",
                "schedule": 300.0,  # Every 5 minutes
            },
        },
    )
    
    return celery


celery = make_celery("app")
```

## Flask Application Integration

Integrate Celery with Flask:

```python
# app/__init__.py
from flask import Flask
from app.celery_config import celery

def create_app(config_name: str = "development") -> Flask:
    """Application factory."""
    
    app = Flask(__name__)
    app.config.from_object(f"app.config.{config_name.capitalize()}Config")
    
    # Initialize extensions
    from app.extensions import db, migrate, mail
    db.init_app(app)
    migrate.init_app(app, db)
    mail.init_app(app)
    
    # Configure Celery with Flask context
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    
    celery.Task = ContextTask
    celery.conf.update(app.config)
    
    # Register blueprints
    from app.api import api_bp
    app.register_blueprint(api_bp, url_prefix="/api")
    
    return app
```

## Task Definitions

Create robust task implementations:

```python
# app/tasks.py
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError, SoftTimeLimitExceeded
from celery.utils.log import get_task_logger
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import time

from app.extensions import db
from app.models import User, Email, Report
from app.services import EmailService, PaymentService, ReportGenerator

logger = get_task_logger(__name__)


@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(Exception,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def send_email(
    self,
    recipient: str,
    subject: str,
    template: str,
    context: Dict[str, Any],
) -> Dict[str, Any]:
    """Send an email asynchronously with retry logic."""
    
    logger.info(f"Sending email to {recipient}: {subject}")
    
    try:
        email_service = EmailService()
        result = email_service.send(
            to=recipient,
            subject=subject,
            template=template,
            context=context,
        )
        
        # Log to database
        email_record = Email(
            recipient=recipient,
            subject=subject,
            status="sent",
            sent_at=datetime.utcnow(),
            task_id=self.request.id,
        )
        db.session.add(email_record)
        db.session.commit()
        
        logger.info(f"Email sent successfully to {recipient}")
        return {"status": "success", "message_id": result.message_id}
        
    except Exception as exc:
        logger.error(f"Failed to send email to {recipient}: {exc}")
        
        try:
            raise self.retry(exc=exc)
        except MaxRetriesExceededError:
            # Log permanent failure
            email_record = Email(
                recipient=recipient,
                subject=subject,
                status="failed",
                error_message=str(exc),
                task_id=self.request.id,
            )
            db.session.add(email_record)
            db.session.commit()
            
            raise


@shared_task(
    bind=True,
    max_retries=5,
    default_retry_delay=30,
    rate_limit="10/m",  # 10 tasks per minute
)
def process_payment(
    self,
    user_id: int,
    amount: float,
    currency: str,
    payment_method_id: str,
) -> Dict[str, Any]:
    """Process a payment with rate limiting and retries."""
    
    logger.info(f"Processing payment for user {user_id}: {amount} {currency}")
    
    try:
        user = User.query.get(user_id)
        if not user:
            raise ValueError(f"User {user_id} not found")
        
        payment_service = PaymentService()
        result = payment_service.charge(
            user=user,
            amount=amount,
            currency=currency,
            payment_method_id=payment_method_id,
        )
        
        logger.info(f"Payment processed: {result.transaction_id}")
        return {
            "status": "success",
            "transaction_id": result.transaction_id,
            "amount": amount,
            "currency": currency,
        }
        
    except payment_service.RetryableError as exc:
        logger.warning(f"Retryable payment error: {exc}")
        raise self.retry(exc=exc)
        
    except payment_service.PermanentError as exc:
        logger.error(f"Permanent payment error: {exc}")
        raise


@shared_task(bind=True, soft_time_limit=1800, time_limit=2000)
def generate_report(
    self,
    report_type: str,
    user_id: int,
    params: Dict[str, Any],
) -> Dict[str, Any]:
    """Generate a report with progress tracking."""
    
    logger.info(f"Generating {report_type} report for user {user_id}")
    
    try:
        report_generator = ReportGenerator()
        
        # Update task state with progress
        self.update_state(
            state="PROGRESS",
            meta={"current": 0, "total": 100, "status": "Starting..."},
        )
        
        def progress_callback(current: int, total: int, status: str):
            self.update_state(
                state="PROGRESS",
                meta={"current": current, "total": total, "status": status},
            )
        
        report = report_generator.generate(
            report_type=report_type,
            user_id=user_id,
            params=params,
            progress_callback=progress_callback,
        )
        
        # Save report record
        report_record = Report(
            type=report_type,
            user_id=user_id,
            file_path=report.file_path,
            status="completed",
            task_id=self.request.id,
        )
        db.session.add(report_record)
        db.session.commit()
        
        return {
            "status": "success",
            "report_id": report_record.id,
            "file_path": report.file_path,
        }
        
    except SoftTimeLimitExceeded:
        logger.warning(f"Report generation soft time limit exceeded")
        # Save partial progress
        raise


@shared_task
def cleanup_sessions() -> Dict[str, int]:
    """Periodic task to clean up expired sessions."""
    
    logger.info("Starting session cleanup")
    
    from app.models import Session
    
    expired_before = datetime.utcnow() - timedelta(days=30)
    
    deleted_count = Session.query.filter(
        Session.last_activity < expired_before
    ).delete()
    
    db.session.commit()
    
    logger.info(f"Cleaned up {deleted_count} expired sessions")
    return {"deleted_count": deleted_count}


@shared_task
def chain_tasks_example(data: Dict[str, Any]) -> str:
    """Example of task chaining."""
    
    from celery import chain, group, chord
    
    # Chain: execute tasks sequentially
    workflow = chain(
        validate_data.s(data),
        process_data.s(),
        save_results.s(),
    )
    
    # Group: execute tasks in parallel
    parallel_tasks = group(
        send_notification.s(data, "email"),
        send_notification.s(data, "sms"),
        send_notification.s(data, "push"),
    )
    
    # Chord: parallel tasks with callback
    full_workflow = chord(parallel_tasks)(aggregate_results.s())
    
    return full_workflow.id
```

## API Endpoints for Tasks

Create Flask endpoints for task management:

```python
# app/api/tasks.py
from flask import Blueprint, jsonify, request
from app.tasks import send_email, generate_report, process_payment
from celery.result import AsyncResult
from app.celery_config import celery

tasks_bp = Blueprint("tasks", __name__)


@tasks_bp.route("/tasks/<task_id>", methods=["GET"])
def get_task_status(task_id: str):
    """Get the status of a task."""
    
    result = AsyncResult(task_id, app=celery)
    
    response = {
        "task_id": task_id,
        "status": result.status,
        "ready": result.ready(),
    }
    
    if result.ready():
        if result.successful():
            response["result"] = result.result
        else:
            response["error"] = str(result.result)
    elif result.status == "PROGRESS":
        response["progress"] = result.info
    
    return jsonify(response)


@tasks_bp.route("/reports", methods=["POST"])
def create_report():
    """Queue a report generation task."""
    
    data = request.get_json()
    
    task = generate_report.delay(
        report_type=data["report_type"],
        user_id=data["user_id"],
        params=data.get("params", {}),
    )
    
    return jsonify({
        "task_id": task.id,
        "status": "queued",
    }), 202
```

Google Antigravity generates production-ready Celery configurations with proper task routing, error handling, and monitoring for reliable background processing.

When to Use This Prompt

This Flask prompt is ideal for developers working on:

  • Flask applications requiring modern best practices and optimal performance
  • Projects that need production-ready Flask code with proper error handling
  • Teams looking to standardize their flask development workflow
  • Developers wanting to learn industry-standard Flask patterns and techniques

By using this prompt, you can save hours of manual coding and ensure best practices are followed from the start. It's particularly valuable for teams looking to maintain consistency across their flask implementations.

How to Use

  1. Copy the prompt - Click the copy button above to copy the entire prompt to your clipboard
  2. Paste into your AI assistant - Use with Claude, ChatGPT, Cursor, or any AI coding tool
  3. Customize as needed - Adjust the prompt based on your specific requirements
  4. Review the output - Always review generated code for security and correctness
💡 Pro Tip: For best results, provide context about your project structure and any specific constraints or preferences you have.

Best Practices

  • ✓ Always review generated code for security vulnerabilities before deploying
  • ✓ Test the Flask code in a development environment first
  • ✓ Customize the prompt output to match your project's coding standards
  • ✓ Keep your AI assistant's context window in mind for complex requirements
  • ✓ Version control your prompts alongside your code for reproducibility

Frequently Asked Questions

Can I use this Flask prompt commercially?

Yes! All prompts on Antigravity AI Directory are free to use for both personal and commercial projects. No attribution required, though it's always appreciated.

Which AI assistants work best with this prompt?

This prompt works excellently with Claude, ChatGPT, Cursor, GitHub Copilot, and other modern AI coding assistants. For best results, use models with large context windows.

How do I customize this prompt for my specific needs?

You can modify the prompt by adding specific requirements, constraints, or preferences. For Flask projects, consider mentioning your framework version, coding style, and any specific libraries you're using.

Related Prompts

💬 Comments

Loading comments...