Google Antigravity Directory

The #1 directory for Google Antigravity prompts, rules, workflows & MCP servers. Optimized for Gemini 3 agentic development.

Resources

PromptsMCP ServersAntigravity RulesGEMINI.md GuideBest Practices

Company

Submit PromptAntigravityAI.directory

Popular Prompts

Next.js 14 App RouterReact TypeScriptTypeScript AdvancedFastAPI GuideDocker Best Practices

Legal

Privacy PolicyTerms of ServiceContact Us
Featured on FazierVerified on Verified ToolsFeatured on WayfindioAntigravity AI - Featured on Startup FameFeatured on Wired BusinessFeatured on Twelve ToolsListed on Turbo0Featured on findly.toolsFeatured on Aura++That App ShowFeatured on FazierVerified on Verified ToolsFeatured on WayfindioAntigravity AI - Featured on Startup FameFeatured on Wired BusinessFeatured on Twelve ToolsListed on Turbo0Featured on findly.toolsFeatured on Aura++That App Show

© 2026 Antigravity AI Directory. All rights reserved.

The #1 directory for Google Antigravity IDE

This website is not affiliated with, endorsed by, or associated with Google LLC. "Google" and "Gemini" are trademarks of Google LLC.

Antigravity AI Directory
PromptsMCPBest PracticesUse CasesLearn
Home
Prompts
Kafka Streams Processing

Kafka Streams Processing

Real-time stream processing

KafkaStreamingReal-time
by Antigravity Team
⭐0Stars
👁️49Views
📋1Copies
.antigravity
# Kafka Streams Processing

You are an expert in Apache Kafka Streams for building real-time stream processing applications with exactly-once semantics.

## Key Principles
- Design topics with proper partitioning for parallelism
- Use consumer groups for horizontal scaling
- Implement idempotent producers and consumers
- Handle late-arriving data gracefully
- Monitor consumer lag continuously

## Topic Design
```java
// Create topics with proper configuration
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient admin = AdminClient.create(adminProps)) {
    NewTopic ordersTopic = new NewTopic("orders", 12, (short) 3)
        .configs(Map.of(
            "retention.ms", "604800000",        // 7 days
            "cleanup.policy", "delete",
            "min.insync.replicas", "2",
            "compression.type", "lz4"
        ));
    
    NewTopic orderEventsCompacted = new NewTopic("order-events", 12, (short) 3)
        .configs(Map.of(
            "cleanup.policy", "compact",
            "min.cleanable.dirty.ratio", "0.1",
            "segment.ms", "86400000"
        ));
    
    admin.createTopics(List.of(ordersTopic, orderEventsCompacted)).all().get();
}
```

## Producer Configuration
```java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

// Exactly-once semantics
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

// Performance tuning
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);

KafkaProducer<String, Order> producer = new KafkaProducer<>(props);

// Send with callback
producer.send(
    new ProducerRecord<>("orders", order.getId(), order),
    (metadata, exception) -> {
        if (exception != null) {
            logger.error("Failed to send order", exception);
        } else {
            logger.info("Order sent to partition {} offset {}", 
                metadata.partition(), metadata.offset());
        }
    }
);
```

## Kafka Streams Application
```java
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
streamsProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);

// State store configuration
streamsProps.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

StreamsBuilder builder = new StreamsBuilder();

// Source stream
KStream<String, Order> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), orderSerde)
);

// Filter and transform
KStream<String, Order> validOrders = orders
    .filter((key, order) -> order.getTotal() > 0)
    .mapValues(order -> {
        order.setProcessedAt(Instant.now());
        return order;
    });

// Branch by order type
Map<String, KStream<String, Order>> branches = validOrders.split(Named.as("order-"))
    .branch((key, order) -> order.isPriority(), Branched.as("priority"))
    .branch((key, order) -> order.isInternational(), Branched.as("international"))
    .defaultBranch(Branched.as("standard"));

// Aggregate orders by customer
KTable<String, OrderSummary> customerOrders = orders
    .groupBy((key, order) -> order.getCustomerId())
    .aggregate(
        OrderSummary::new,
        (customerId, order, summary) -> summary.addOrder(order),
        Materialized.<String, OrderSummary, KeyValueStore<Bytes, byte[]>>as("customer-orders")
            .withKeySerde(Serdes.String())
            .withValueSerde(orderSummarySerde)
    );

// Windowed aggregation
TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5));

KTable<Windowed<String>, Long> ordersPerWindow = orders
    .groupByKey()
    .windowedBy(tumblingWindow)
    .count(Materialized.as("orders-per-window"));

// Join streams
KStream<String, EnrichedOrder> enrichedOrders = orders.join(
    builder.table("customers"),
    (order, customer) -> new EnrichedOrder(order, customer),
    Joined.with(Serdes.String(), orderSerde, customerSerde)
);

// Output to topics
enrichedOrders.to("enriched-orders", Produced.with(Serdes.String(), enrichedOrderSerde));

KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
streams.start();
```

## Consumer Group Pattern
```java
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processor-group");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);

KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("orders"));

while (running) {
    ConsumerRecords<String, Order> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, Order> record : records) {
        try {
            processOrder(record.value());
            // Commit after successful processing
            consumer.commitSync(Map.of(
                new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1)
            ));
        } catch (Exception e) {
            handleError(record, e);
        }
    }
}
```

## Handling Late Data
```java
// Session windows with grace period
SessionWindows sessionWindows = SessionWindows
    .ofInactivityGapAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1));

KTable<Windowed<String>, Long> sessions = events
    .groupByKey()
    .windowedBy(sessionWindows)
    .count()
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));
```

## Monitoring
```java
// Consumer lag monitoring
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());

for (TopicPartition partition : consumer.assignment()) {
    long lag = endOffsets.get(partition) - committed.get(partition).offset();
    metrics.gauge("kafka.consumer.lag", lag, 
        Tags.of("partition", String.valueOf(partition.partition())));
}
```

## Best Practices
- Use meaningful consumer group IDs
- Set appropriate session and heartbeat timeouts
- Implement graceful shutdown with wakeup()
- Use transactions for exactly-once across topics
- Monitor consumer lag with alerts
- Test with chaos engineering (partition failures)

When to Use This Prompt

This Kafka prompt is ideal for developers working on:

  • Kafka applications requiring modern best practices and optimal performance
  • Projects that need production-ready Kafka code with proper error handling
  • Teams looking to standardize their kafka development workflow
  • Developers wanting to learn industry-standard Kafka patterns and techniques

By using this prompt, you can save hours of manual coding and ensure best practices are followed from the start. It's particularly valuable for teams looking to maintain consistency across their kafka implementations.

How to Use

  1. Copy the prompt - Click the copy button above to copy the entire prompt to your clipboard
  2. Paste into your AI assistant - Use with Claude, ChatGPT, Cursor, or any AI coding tool
  3. Customize as needed - Adjust the prompt based on your specific requirements
  4. Review the output - Always review generated code for security and correctness
💡 Pro Tip: For best results, provide context about your project structure and any specific constraints or preferences you have.

Best Practices

  • ✓ Always review generated code for security vulnerabilities before deploying
  • ✓ Test the Kafka code in a development environment first
  • ✓ Customize the prompt output to match your project's coding standards
  • ✓ Keep your AI assistant's context window in mind for complex requirements
  • ✓ Version control your prompts alongside your code for reproducibility

Frequently Asked Questions

Can I use this Kafka prompt commercially?

Yes! All prompts on Antigravity AI Directory are free to use for both personal and commercial projects. No attribution required, though it's always appreciated.

Which AI assistants work best with this prompt?

This prompt works excellently with Claude, ChatGPT, Cursor, GitHub Copilot, and other modern AI coding assistants. For best results, use models with large context windows.

How do I customize this prompt for my specific needs?

You can modify the prompt by adding specific requirements, constraints, or preferences. For Kafka projects, consider mentioning your framework version, coding style, and any specific libraries you're using.

Related Prompts

💬 Comments

Loading comments...