Google Antigravity Directory

The #1 directory for Google Antigravity prompts, rules, workflows & MCP servers. Optimized for Gemini 3 agentic development.

Resources

PromptsMCP ServersAntigravity RulesGEMINI.md GuideBest Practices

Company

Submit PromptAntigravityAI.directory

Popular Prompts

Next.js 14 App RouterReact TypeScriptTypeScript AdvancedFastAPI GuideDocker Best Practices

Legal

Privacy PolicyTerms of ServiceContact Us
Featured on FazierVerified on Verified ToolsFeatured on WayfindioAntigravity AI - Featured on Startup FameFeatured on Wired BusinessFeatured on Twelve ToolsListed on Turbo0Featured on findly.toolsFeatured on Aura++That App ShowFeatured on FazierVerified on Verified ToolsFeatured on WayfindioAntigravity AI - Featured on Startup FameFeatured on Wired BusinessFeatured on Twelve ToolsListed on Turbo0Featured on findly.toolsFeatured on Aura++That App Show

© 2026 Antigravity AI Directory. All rights reserved.

The #1 directory for Google Antigravity IDE

This website is not affiliated with, endorsed by, or associated with Google LLC. "Google" and "Gemini" are trademarks of Google LLC.

Antigravity AI Directory
PromptsMCPBest PracticesUse CasesLearn
Home
Prompts
RabbitMQ Message Patterns

RabbitMQ Message Patterns

Reliable messaging with RabbitMQ

RabbitMQMessagingQueues
by Antigravity Team
⭐0Stars
👁️12Views
.antigravity
# RabbitMQ Message Patterns

You are an expert in RabbitMQ for building reliable, scalable message-driven architectures with proper exchange patterns and error handling.

## Key Principles
- Use the right exchange type for your use case
- Implement proper acknowledgment and retry patterns
- Design for message persistence and durability
- Handle dead letters gracefully
- Monitor queue depths and consumer health

## Exchange Types
```python
import pika
from pika.exchange_type import ExchangeType

# Connection setup
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host="localhost",
        credentials=pika.PlainCredentials("user", "password"),
        heartbeat=600,
        blocked_connection_timeout=300
    )
)
channel = connection.channel()

# Direct Exchange - exact routing key match
channel.exchange_declare(
    exchange="orders",
    exchange_type=ExchangeType.direct,
    durable=True
)

# Fanout Exchange - broadcast to all queues
channel.exchange_declare(
    exchange="notifications",
    exchange_type=ExchangeType.fanout,
    durable=True
)

# Topic Exchange - pattern matching routing
channel.exchange_declare(
    exchange="events",
    exchange_type=ExchangeType.topic,
    durable=True
)

# Headers Exchange - match on headers
channel.exchange_declare(
    exchange="audit",
    exchange_type=ExchangeType.headers,
    durable=True
)
```

## Queue Configuration with DLX
```python
# Dead Letter Exchange setup
channel.exchange_declare(
    exchange="dlx",
    exchange_type=ExchangeType.direct,
    durable=True
)

channel.queue_declare(
    queue="dlq.orders",
    durable=True
)

channel.queue_bind(
    queue="dlq.orders",
    exchange="dlx",
    routing_key="orders"
)

# Main queue with DLX
channel.queue_declare(
    queue="orders.processing",
    durable=True,
    arguments={
        "x-dead-letter-exchange": "dlx",
        "x-dead-letter-routing-key": "orders",
        "x-message-ttl": 86400000,  # 24 hours
        "x-max-length": 100000,
        "x-overflow": "reject-publish"
    }
)
```

## Publisher with Confirms
```python
from dataclasses import dataclass
import json
import uuid

@dataclass
class Message:
    id: str
    type: str
    payload: dict
    timestamp: str

class RabbitPublisher:
    def __init__(self, connection):
        self.channel = connection.channel()
        self.channel.confirm_delivery()
    
    def publish(
        self,
        exchange: str,
        routing_key: str,
        message: Message,
        priority: int = 0
    ) -> bool:
        try:
            self.channel.basic_publish(
                exchange=exchange,
                routing_key=routing_key,
                body=json.dumps(message.__dict__),
                properties=pika.BasicProperties(
                    delivery_mode=2,  # Persistent
                    content_type="application/json",
                    message_id=message.id,
                    timestamp=int(time.time()),
                    priority=priority,
                    headers={
                        "x-retry-count": 0,
                        "x-source": "order-service"
                    }
                ),
                mandatory=True
            )
            return True
        except pika.exceptions.UnroutableError:
            logger.error(f"Message {message.id} unroutable")
            return False

# Usage
publisher = RabbitPublisher(connection)
publisher.publish(
    exchange="orders",
    routing_key="orders.created",
    message=Message(
        id=str(uuid.uuid4()),
        type="order.created",
        payload={"order_id": 123, "total": 99.99},
        timestamp=datetime.utcnow().isoformat()
    )
)
```

## Consumer with Retry Logic
```python
import time
from functools import wraps

class RabbitConsumer:
    MAX_RETRIES = 3
    RETRY_DELAYS = [1, 5, 30]  # seconds
    
    def __init__(self, connection, queue: str, handler):
        self.channel = connection.channel()
        self.channel.basic_qos(prefetch_count=10)
        self.queue = queue
        self.handler = handler
    
    def start(self):
        self.channel.basic_consume(
            queue=self.queue,
            on_message_callback=self._process_message,
            auto_ack=False
        )
        self.channel.start_consuming()
    
    def _process_message(self, channel, method, properties, body):
        message = json.loads(body)
        retry_count = properties.headers.get("x-retry-count", 0) if properties.headers else 0
        
        try:
            self.handler(message)
            channel.basic_ack(delivery_tag=method.delivery_tag)
            
        except RecoverableError as e:
            if retry_count < self.MAX_RETRIES:
                self._retry_message(
                    channel, properties, body,
                    retry_count + 1
                )
                channel.basic_ack(delivery_tag=method.delivery_tag)
            else:
                # Send to DLQ
                channel.basic_nack(
                    delivery_tag=method.delivery_tag,
                    requeue=False
                )
                
        except Exception as e:
            logger.exception(f"Unrecoverable error: {e}")
            channel.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=False
            )
    
    def _retry_message(self, channel, properties, body, retry_count):
        delay = self.RETRY_DELAYS[min(retry_count - 1, len(self.RETRY_DELAYS) - 1)]
        
        # Publish to delay queue
        channel.basic_publish(
            exchange="",
            routing_key=f"delay.{delay}s",
            body=body,
            properties=pika.BasicProperties(
                delivery_mode=2,
                headers={
                    **properties.headers,
                    "x-retry-count": retry_count,
                    "x-original-queue": self.queue
                }
            )
        )
```

## Topic Routing Patterns
```python
# Publisher sends with routing keys
publisher.publish(exchange="events", routing_key="user.created.us", message=msg)
publisher.publish(exchange="events", routing_key="order.shipped.eu", message=msg)

# Consumers bind with patterns
# All user events
channel.queue_bind(queue="user-service", exchange="events", routing_key="user.*.*")

# All events from EU
channel.queue_bind(queue="eu-analytics", exchange="events", routing_key="*.*.eu")

# All shipped orders
channel.queue_bind(queue="shipping", exchange="events", routing_key="order.shipped.#")
```

## Monitoring Metrics
```python
# Get queue stats
queue_info = channel.queue_declare(queue="orders", passive=True)
print(f"Messages: {queue_info.method.message_count}")
print(f"Consumers: {queue_info.method.consumer_count}")

# Prometheus metrics
from prometheus_client import Gauge, Counter

messages_published = Counter(
    "rabbitmq_messages_published_total",
    "Total messages published",
    ["exchange", "routing_key"]
)

messages_consumed = Counter(
    "rabbitmq_messages_consumed_total", 
    "Total messages consumed",
    ["queue", "status"]
)

queue_depth = Gauge(
    "rabbitmq_queue_depth",
    "Current queue depth",
    ["queue"]
)
```

## Best Practices
- Always use publisher confirms in production
- Set prefetch count based on processing time
- Use lazy queues for large backlogs
- Implement circuit breakers for consumers
- Monitor consumer lag and processing latency
- Use quorum queues for high availability

When to Use This Prompt

This RabbitMQ prompt is ideal for developers working on:

  • RabbitMQ applications requiring modern best practices and optimal performance
  • Projects that need production-ready RabbitMQ code with proper error handling
  • Teams looking to standardize their rabbitmq development workflow
  • Developers wanting to learn industry-standard RabbitMQ patterns and techniques

By using this prompt, you can save hours of manual coding and ensure best practices are followed from the start. It's particularly valuable for teams looking to maintain consistency across their rabbitmq implementations.

How to Use

  1. Copy the prompt - Click the copy button above to copy the entire prompt to your clipboard
  2. Paste into your AI assistant - Use with Claude, ChatGPT, Cursor, or any AI coding tool
  3. Customize as needed - Adjust the prompt based on your specific requirements
  4. Review the output - Always review generated code for security and correctness
💡 Pro Tip: For best results, provide context about your project structure and any specific constraints or preferences you have.

Best Practices

  • ✓ Always review generated code for security vulnerabilities before deploying
  • ✓ Test the RabbitMQ code in a development environment first
  • ✓ Customize the prompt output to match your project's coding standards
  • ✓ Keep your AI assistant's context window in mind for complex requirements
  • ✓ Version control your prompts alongside your code for reproducibility

Frequently Asked Questions

Can I use this RabbitMQ prompt commercially?

Yes! All prompts on Antigravity AI Directory are free to use for both personal and commercial projects. No attribution required, though it's always appreciated.

Which AI assistants work best with this prompt?

This prompt works excellently with Claude, ChatGPT, Cursor, GitHub Copilot, and other modern AI coding assistants. For best results, use models with large context windows.

How do I customize this prompt for my specific needs?

You can modify the prompt by adding specific requirements, constraints, or preferences. For RabbitMQ projects, consider mentioning your framework version, coding style, and any specific libraries you're using.

Related Prompts

💬 Comments

Loading comments...