backpressure-analyzer
/install backpressure-analyzer
Backpressure Analyzer
Find where your pipeline is backing up. Measure processing rates at each stage, identify the bottleneck, detect growing queues, and recommend flow control strategies — bounded buffers, rate limiting, load shedding, or autoscaling.
Use when: "pipeline is slow", "queue keeps growing", "messages backing up", "consumer can't keep up", "producer faster than consumer", "backpressure", "flow control", "bottleneck analysis", or when processing delays increase over time.
Commands
1. detect — Find Backpressure Points
Step 1: Measure Queue Depths
# Kafka consumer lag
kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --describe --all-groups 2>/dev/null | \
awk 'NR>1 && $6>0 {printf "%-30s %-20s lag=%s\
", $1, $4, $6}' | sort -t= -k2 -rn | head -20
# RabbitMQ queue depths
rabbitmqctl list_queues name messages consumers 2>/dev/null | \
awk '$2>0 {print $2 " " $1 " consumers=" $3}' | sort -rn | head -20
# AWS SQS
for queue_url in $(aws sqs list-queues --query 'QueueUrls[]' --output text); do
attrs=$(aws sqs get-queue-attributes --queue-url "$queue_url" \
--attribute-names ApproximateNumberOfMessages ApproximateNumberOfMessagesNotVisible \
--output json 2>/dev/null)
visible=$(echo "$attrs" | python3 -c "import json,sys;print(json.load(sys.stdin)['Attributes'].get('ApproximateNumberOfMessages','0'))")
inflight=$(echo "$attrs" | python3 -c "import json,sys;print(json.load(sys.stdin)['Attributes'].get('ApproximateNumberOfMessagesNotVisible','0'))")
if [ "$visible" -gt 0 ] 2>/dev/null; then
echo "Queue: $(basename $queue_url) — pending=$visible, in-flight=$inflight"
fi
done
# Redis Streams
redis-cli XINFO STREAM mystream 2>/dev/null | grep -E "length|groups"
redis-cli XINFO GROUPS mystream 2>/dev/null
Step 2: Measure Processing Rates
# Measure throughput at each pipeline stage
# Take two snapshots 60s apart and calculate rate
# Kafka: messages produced vs consumed per second
kafka-consumer-groups --bootstrap-server $KAFKA_BROKER --describe --group mygroup 2>/dev/null | \
awk 'NR>1 {lag+=$6; offset+=$4} END {print "Total lag:", lag, "Current offset:", offset}'
# Process-level: messages processed per second
# Check application metrics endpoint
curl -s http://localhost:9090/metrics | grep -E "messages_processed_total|items_processed_total"
Step 3: Identify Bottleneck
Map the pipeline stages and their rates:
Producer (1000 msg/s) → Queue A (depth: 5) → Stage 1 (800 msg/s) → Queue B (depth: 50000) → Stage 2 (200 msg/s) → Queue C (depth: 2) → Stage 3 (500 msg/s)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BOTTLENECK: Stage 2 can't keep up
The bottleneck is the stage with:
- Growing queue depth (input queue getting deeper over time)
- Lowest throughput relative to its input rate
- Highest resource utilization (CPU, memory, I/O at capacity)
Step 4: Generate Report
# Backpressure Analysis Report
## Pipeline: Order Processing
## Flow Map
API (1000 req/s) → order-events (Kafka, lag: 45,000 ⚠️, growing +200/min) → order-validator (3 pods, 350 msg/s each = 1050 total) → validated-orders (Kafka, lag: 200, stable ✅) → payment-processor (2 pods, 150 msg/s each = 300 total) → payment-results (Kafka, lag: 85,000 🔴, growing +700/min) → notification-sender (1 pod, 500 msg/s)
## Bottleneck: payment-processor
- **Input rate:** 1050 msg/s (from validator)
- **Processing rate:** 300 msg/s (2 pods × 150 msg/s)
- **Deficit:** 750 msg/s accumulating in queue
- **Current backlog:** 85,000 messages (~4.7 hours to drain at current rate)
- **Resource utilization:** CPU 95%, memory 60%, network 20%
- **Root cause:** CPU-bound — payment validation is computationally expensive
## Recommendations (in order)
1. **Scale out:** Increase payment-processor to 7 pods (7 × 150 = 1050 msg/s)
- Cost: +5 pods × $X/month
- Time to drain backlog: ~2.5 hours after scaling
2. **Optimize processing:** Profile payment validation for optimization
- Current: 6.7ms per message
- Target: 1ms per message (would need only 2 pods)
3. **Add backpressure signal:** Have payment-processor signal order-validator to slow down
- Reactive Streams-style demand signaling
- Or: consumer pause when lag > threshold
4. **Load shedding (last resort):** Drop low-priority messages when queue > 100K
- Only for non-critical notifications, never for payments
2. strategies — Recommend Flow Control
Based on the pipeline characteristics, recommend:
- Bounded buffers: Set max queue size, block producer when full
- Rate limiting: Limit producer rate to match slowest consumer
- Autoscaling: Scale consumers based on queue depth
- Load shedding: Drop low-priority messages under pressure
- Batch processing: Accumulate and process in batches for efficiency
- Circuit breaker: Stop sending to overwhelmed downstream
- Priority queues: Process critical messages first when backed up
3. monitor — Set Up Backpressure Alerts
Generate alerting rules:
# Prometheus alert rules
groups:
- name: backpressure
rules:
- alert: KafkaConsumerLagHigh
expr: kafka_consumergroup_lag_sum > 10000
for: 5m
labels:
severity: warning
- alert: KafkaConsumerLagCritical
expr: kafka_consumergroup_lag_sum > 100000
for: 5m
labels:
severity: critical
- alert: QueueDepthGrowing
expr: rate(kafka_consumergroup_lag_sum[5m]) > 0
for: 15m
labels:
severity: warning
- 确保已安装 OpenClaw(本地或 Docker 部署)
- 在对话框中输入安装命令:
/install backpressure-analyzer - 安装完成后,直接呼叫该 Skill 的名称或使用
/backpressure-analyzer触发 - 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
backpressure-analyzer 是什么?
Detect and resolve backpressure issues in data pipelines, message queues, and streaming systems. Identify bottleneck stages, measure queue depths and process... 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 27 次。
如何安装 backpressure-analyzer?
在 OpenClaw 或 Claude Code 对话框中运行命令「/install backpressure-analyzer」即可一键安装,无需额外配置。
backpressure-analyzer 是免费的吗?
是的,backpressure-analyzer 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。
backpressure-analyzer 支持哪些平台?
backpressure-analyzer 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。
谁开发了 backpressure-analyzer?
由 charlie-morrison(@charlie-morrison)开发并维护,当前版本 v1.0.0。