可观测性:日志、追踪、成本控制与告警体系
第20章:可观测性——日志、追踪、成本控制与告警体系
你无法管理你看不见的东西——本章教你为 Dify 构建完整的可观测性体系,让每一次 LLM 调用、每一分 Token 消耗都清晰可查。
本章导读
生产环境的 Dify 就像一辆高速行驶的赛车,没有仪表盘你根本不知道何时会爆胎。可观测性(Observability)的三个支柱是:
- 日志(Logs):发生了什么?谁在何时做了什么操作?
- 指标(Metrics):系统健康状况如何?QPS、延迟、错误率?
- 追踪(Traces):一次请求经历了哪些服务?哪里慢?
对于 Dify 这类 LLM 应用平台,还有第四个关键维度:
- 成本(Cost):调用了哪些模型?消耗了多少 Token?每天花了多少钱?
本章将带你从零构建完整的可观测性栈:Prometheus + Grafana 指标监控、ELK/Loki 日志聚合、OpenTelemetry 分布式追踪、以及基于 Token 消耗的成本控制与告警。
读完本章,你将能够:
- 部署 Prometheus + Grafana 监控 Dify 所有核心指标
- 配置 ELK/Loki 集中收集和搜索 Dify 日志
- 使用 OpenTelemetry 追踪每次 LLM 调用链路
- 设计成本预算告警,防止意外超支
- 构建生产级告警规则,异常 5 分钟内通知到位
Level 1:基础认知(1-3 年经验)
为什么 LLM 应用需要特殊的可观测性
传统 Web 应用的可观测性重点是延迟和错误率,而 LLM 应用有额外的挑战:
- 响应时间高度不确定:同一问题,GPT-4 可能返回 500ms,也可能返回 30s,这取决于 Token 数量和模型负载
- 成本与使用量强相关:一个写错的 Prompt 可能让 Token 消耗暴涨 10 倍
- 质量难以用传统指标衡量:HTTP 200 不代表答案正确,需要额外的业务指标
- 调用链复杂:一次用户请求可能触发 RAG 检索 → 多次 LLM 调用 → Tool 执行 → 再次 LLM 调用
类比:传统可观测性像看汽车的油表和时速表,而 LLM 可观测性还要看"这段路花了多少油费"和"乘客是否满意这段旅途"。
Dify 内置监控功能
Dify 在管理控制台提供了基础的内置监控,无需额外配置:
应用监控(Console → 应用 → 监控):
- 对话总量、活跃用户数
- 平均响应时间趋势
- Token 消耗量(按天/周/月)
- 模型费用估算
- 用户反馈(点赞/点踩)统计
日志查看(Console → 日志):
- 每次对话完整记录(输入/输出/使用的 Prompt)
- Token 消耗详情
- 执行耗时
局限性:内置监控只能看单个应用,无法跨应用聚合,无法设置告警,无法保留超过 30 天的数据。
Prometheus 快速接入
Dify API 暴露了 /metrics 端点(Prometheus 格式),可以直接抓取。
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
# Dify API 指标
- job_name: 'dify-api'
static_configs:
- targets: ['dify-api:5001']
metrics_path: '/metrics'
scrape_interval: 30s
# PostgreSQL 指标(通过 postgres_exporter)
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
# Redis 指标(通过 redis_exporter)
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
# Weaviate 指标
- job_name: 'weaviate'
static_configs:
- targets: ['weaviate:2112']
metrics_path: '/metrics'
# Node 主机指标
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
# docker-compose 添加监控组件
services:
prometheus:
image: prom/prometheus:v2.48.0
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
- '--web.enable-lifecycle'
ports:
- "9090:9090"
grafana:
image: grafana/grafana:10.2.0
environment:
GF_SECURITY_ADMIN_PASSWORD: grafana强密码
GF_USERS_ALLOW_SIGN_UP: 'false'
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards
- ./grafana/datasources:/etc/grafana/provisioning/datasources
ports:
- "3001:3000"
postgres-exporter:
image: prometheuscommunity/postgres-exporter:v0.15.0
environment:
DATA_SOURCE_NAME: postgresql://dify:密码@db:5432/dify?sslmode=disable
redis-exporter:
image: oliver006/redis_exporter:v1.55.0
environment:
REDIS_ADDR: redis://redis:6379
REDIS_PASSWORD: Redis密码
node-exporter:
image: prom/node-exporter:v1.7.0
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
command:
- '--path.procfs=/host/proc'
- '--path.sysfs=/host/sys'
Level 2:机制深解(3-5 年经验)
Grafana Dashboard 核心面板配置
下面是一套覆盖 Dify 核心场景的 Grafana Dashboard 配置(JSON 片段):
Panel 1:API 请求速率
{
"title": "API Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total{job='dify-api'}[5m])",
"legendFormat": "{{method}} {{path}}"
}
]
}
Panel 2:LLM 调用延迟分布
# P50 延迟
histogram_quantile(0.50, rate(dify_llm_request_duration_seconds_bucket[5m]))
# P95 延迟
histogram_quantile(0.95, rate(dify_llm_request_duration_seconds_bucket[5m]))
# P99 延迟
histogram_quantile(0.99, rate(dify_llm_request_duration_seconds_bucket[5m]))
Panel 3:Token 消耗趋势
# 每分钟 Token 消耗速率
rate(dify_llm_tokens_total[5m])
# 按模型分组
rate(dify_llm_tokens_total{model=~"gpt-4.*"}[5m])
rate(dify_llm_tokens_total{model=~"gpt-3.5.*"}[5m])
Panel 4:向量检索耗时
histogram_quantile(0.95,
rate(dify_vector_search_duration_seconds_bucket[5m])
)
Panel 5:Celery Worker 队列积压
# 待处理任务数量
celery_tasks_received_total - celery_tasks_succeeded_total - celery_tasks_failed_total
日志聚合:Loki + Promtail
对于中小规模部署,Loki 比 ELK 更轻量:
# loki 配置(docker-compose 添加)
services:
loki:
image: grafana/loki:2.9.0
volumes:
- ./loki-config.yaml:/etc/loki/local-config.yaml
- loki_data:/loki
command: -config.file=/etc/loki/local-config.yaml
ports:
- "3100:3100"
promtail:
image: grafana/promtail:2.9.0
volumes:
- /var/log:/var/log
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- ./promtail-config.yaml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml
# loki-config.yaml
auth_enabled: false
server:
http_listen_port: 3100
ingester:
lifecycler:
ring:
kvstore:
store: inmemory
replication_factor: 1
schema_config:
configs:
- from: 2024-01-01
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
storage_config:
boltdb_shipper:
active_index_directory: /loki/boltdb-shipper-active
cache_location: /loki/boltdb-shipper-cache
filesystem:
directory: /loki/chunks
limits_config:
retention_period: 30d
ingestion_rate_mb: 16
ingestion_burst_size_mb: 32
# promtail-config.yaml
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: dify-containers
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
relabel_configs:
# 只采集 dify 相关容器
- source_labels: [__meta_docker_container_name]
regex: /dify.*
action: keep
# 提取容器名作为标签
- source_labels: [__meta_docker_container_name]
target_label: container
# 提取服务名
- source_labels: [__meta_docker_container_label_com_docker_compose_service]
target_label: service
pipeline_stages:
# 解析 JSON 格式日志
- json:
expressions:
level: level
message: message
timestamp: timestamp
request_id: request_id
# 将 JSON 字段提取为 Loki 标签
- labels:
level:
service:
# 设置时间戳
- timestamp:
source: timestamp
format: RFC3339
在 Grafana 中查询日志:
# 查看 dify-api 的所有错误日志
{service="api"} |= "ERROR"
# 查看特定 request_id 的完整调用链
{service=~"api|worker"} |= "req-abc-12345"
# 统计过去1小时每分钟错误数
rate({service="api"} |= "ERROR" [1m])
# 查看 LLM 调用超时
{service="api"} |~ "timeout|LLM.*error"
OpenTelemetry 分布式追踪
Dify 支持通过 OpenTelemetry 将追踪数据发送到 Jaeger 或 Zipkin:
# .env 中启用 OpenTelemetry
ENABLE_OTEL=true
OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
OTEL_SERVICE_NAME=dify-api
OTEL_TRACES_SAMPLER=parentbased_traceidratio
OTEL_TRACES_SAMPLER_ARG=0.1 # 采样 10%,降低开销
# docker-compose 添加 Jaeger
services:
jaeger:
image: jaegertracing/all-in-one:1.52
environment:
COLLECTOR_OTLP_ENABLED: 'true'
SPAN_STORAGE_TYPE: elasticsearch
ES_SERVER_URLS: http://elasticsearch:9200
ports:
- "16686:16686" # Jaeger UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
自定义 Span 追踪(在 Dify 自定义工具或 API 扩展中):
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
def my_custom_tool_logic(query: str, context: dict) -> str:
with tracer.start_as_current_span("custom_tool.execute") as span:
span.set_attribute("tool.name", "my_custom_tool")
span.set_attribute("query.length", len(query))
try:
# 子 span:数据库查询
with tracer.start_as_current_span("db.query") as db_span:
db_span.set_attribute("db.statement", "SELECT ...")
result = db.query(query)
db_span.set_attribute("db.rows_returned", len(result))
# 子 span:LLM 调用
with tracer.start_as_current_span("llm.call") as llm_span:
llm_span.set_attribute("llm.model", "gpt-4")
response = llm.invoke(result)
llm_span.set_attribute("llm.tokens_used", response.usage.total_tokens)
span.set_status(Status(StatusCode.OK))
return response.content
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
成本控制与预算告警
Token 成本计算逻辑(Dify 后端实际计算方式):
# 不同模型的价格(USD/1K tokens,2024年价格)
MODEL_PRICING = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.001, "output": 0.002},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
"claude-3-haiku": {"input": 0.00025, "output": 0.00125},
}
def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""计算单次调用费用(美元)"""
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = (input_tokens / 1000) * pricing["input"]
cost += (output_tokens / 1000) * pricing["output"]
return cost
Prometheus 告警规则:成本超预算
# alerts.yaml
groups:
- name: dify_cost_alerts
interval: 5m
rules:
# 每小时 Token 消耗超过 10万
- alert: HighTokenConsumption
expr: >
increase(dify_llm_tokens_total[1h]) > 100000
for: 5m
labels:
severity: warning
annotations:
summary: "Token 消耗过高"
description: "过去1小时 Token 消耗 {{ $value }} 超过阈值 100,000"
# 估算日费用超过 100 美元
- alert: DailyBudgetExceeded
expr: >
sum(increase(dify_llm_cost_usd_total[24h])) > 100
for: 5m
labels:
severity: critical
annotations:
summary: "日费用预算超标"
description: "过去24小时估算费用 ${{ $value }} 超过日预算 $100"
# 错误率超过 5%
- alert: HighErrorRate
expr: >
rate(http_requests_total{job="dify-api",status=~"5.."}[5m])
/
rate(http_requests_total{job="dify-api"}[5m]) > 0.05
for: 2m
labels:
severity: critical
annotations:
summary: "API 错误率过高"
description: "API 错误率 {{ $value | humanizePercentage }} 超过 5%"
# P95 延迟超过 10 秒
- alert: HighLatency
expr: >
histogram_quantile(0.95,
rate(dify_llm_request_duration_seconds_bucket[5m])
) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "LLM 调用延迟过高"
description: "P95 延迟 {{ $value }}s 超过 10s 阈值"
# Worker 队列积压超过 100
- alert: WorkerQueueBacklog
expr: >
celery_tasks_received_total - on()
(celery_tasks_succeeded_total + celery_tasks_failed_total) > 100
for: 10m
labels:
severity: warning
annotations:
summary: "Worker 队列积压"
description: "待处理任务 {{ $value }} 超过阈值"
AlertManager 配置(飞书/钉钉通知)
# alertmanager.yml
global:
resolve_timeout: 5m
route:
group_by: ['alertname', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
receiver: 'feishu-webhook'
routes:
- match:
severity: critical
receiver: 'feishu-webhook'
repeat_interval: 1h
- match:
severity: warning
receiver: 'feishu-webhook'
repeat_interval: 4h
receivers:
- name: 'feishu-webhook'
webhook_configs:
- url: 'https://open.feishu.cn/open-apis/bot/v2/hook/your-webhook-token'
send_resolved: true
http_config:
# 飞书 webhook 需要自定义 body
# 使用 alertmanager-webhook-adapter 转换格式
# alertmanager-webhook-adapter.py
# 将 Prometheus AlertManager 告警转换为飞书消息格式
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/your-token"
@app.route('/alert', methods=['POST'])
def handle_alert():
data = request.json
alerts = data.get('alerts', [])
for alert in alerts:
status = alert['status']
name = alert['labels'].get('alertname', 'Unknown')
severity = alert['labels'].get('severity', 'info')
summary = alert['annotations'].get('summary', '')
description = alert['annotations'].get('description', '')
# 根据严重程度选择颜色
color = {
'critical': 'red',
'warning': 'orange',
'info': 'blue'
}.get(severity, 'grey')
# 飞书卡片消息格式
card = {
"msg_type": "interactive",
"card": {
"header": {
"title": {
"content": f"{'🔴' if severity == 'critical' else '🟡'} [{status.upper()}] {name}",
"tag": "plain_text"
},
"template": color
},
"elements": [
{
"tag": "div",
"text": {
"content": f"**摘要**: {summary}\n**详情**: {description}",
"tag": "lark_md"
}
}
]
}
}
requests.post(FEISHU_WEBHOOK, json=card)
return jsonify({"status": "ok"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Level 3:源码与原理(5 年以上)
Dify 可观测性内部实现
Dify 的 API 服务基于 Flask,其指标暴露使用 prometheus_flask_exporter 库:
# api/app.py(简化版)
from prometheus_flask_exporter import PrometheusMetrics
app = Flask(__name__)
metrics = PrometheusMetrics(app)
# 自定义 LLM 调用计数器
llm_request_counter = metrics.counter(
'dify_llm_requests_total',
'Total LLM API requests',
labels={'model': lambda: g.current_model, 'status': lambda: g.request_status}
)
# LLM 调用延迟直方图
llm_latency_histogram = metrics.histogram(
'dify_llm_request_duration_seconds',
'LLM request duration in seconds',
labels={'model': lambda: g.current_model},
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, float('inf')]
)
# Token 消耗计数器
token_counter = metrics.counter(
'dify_llm_tokens_total',
'Total LLM tokens consumed',
labels={'model': lambda: g.current_model, 'type': 'total'}
)
LLM 调用中间件追踪:
# api/core/model_runtime/model_providers/_base.py(简化)
from opentelemetry import trace
tracer = trace.get_tracer('dify.model_runtime')
class BaseModelProvider:
def invoke(self, model_parameters: dict, **kwargs):
with tracer.start_as_current_span(f"llm.invoke.{self.model}") as span:
span.set_attribute("llm.model", self.model)
span.set_attribute("llm.provider", self.provider)
span.set_attribute("llm.temperature", model_parameters.get('temperature', 1.0))
start_time = time.time()
try:
response = self._invoke_model(model_parameters, **kwargs)
# 记录 Token 使用量
if hasattr(response, 'usage'):
span.set_attribute("llm.input_tokens", response.usage.prompt_tokens)
span.set_attribute("llm.output_tokens", response.usage.completion_tokens)
span.set_attribute("llm.total_tokens", response.usage.total_tokens)
duration = time.time() - start_time
span.set_attribute("llm.duration_seconds", duration)
return response
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR))
raise
向量检索的可观测性
向量检索是 RAG 系统中最难调优的环节,需要特别关注:
# 自定义向量检索追踪中间件
class TracedVectorStore:
def __init__(self, vector_store, tracer):
self.vector_store = vector_store
self.tracer = tracer
def similarity_search(self, query: str, k: int = 4, **kwargs):
with self.tracer.start_as_current_span("vector.similarity_search") as span:
span.set_attribute("vector.query_length", len(query))
span.set_attribute("vector.top_k", k)
span.set_attribute("vector.store_type", type(self.vector_store).__name__)
start = time.time()
results = self.vector_store.similarity_search(query, k=k, **kwargs)
elapsed = time.time() - start
span.set_attribute("vector.results_count", len(results))
span.set_attribute("vector.duration_ms", elapsed * 1000)
if results:
# 记录最高相似度分数
if hasattr(results[0], 'score'):
span.set_attribute("vector.top_score", results[0].score)
# 推送到 Prometheus
vector_search_histogram.observe(elapsed, labels={
'store': type(self.vector_store).__name__
})
return results
生产级日志结构化方案
# api/core/logging.py
import structlog
import logging
def configure_structlog():
"""配置结构化日志,便于 Loki/ELK 解析"""
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
# 添加追踪上下文(自动关联 Trace ID)
add_open_telemetry_spans,
structlog.processors.JSONRenderer() # 输出 JSON 格式
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
def add_open_telemetry_spans(logger, method, event_dict):
"""将当前 Span ID 注入到日志中,方便 Trace 和 Log 联动"""
from opentelemetry import trace
span = trace.get_current_span()
if span.is_recording():
ctx = span.get_span_context()
event_dict['trace_id'] = format(ctx.trace_id, '032x')
event_dict['span_id'] = format(ctx.span_id, '016x')
return event_dict
# 使用示例
logger = structlog.get_logger()
def process_conversation(conversation_id: str, user_input: str):
log = logger.bind(
conversation_id=conversation_id,
user_id=g.user_id,
app_id=g.app_id
)
log.info("conversation.started", input_length=len(user_input))
try:
result = run_chain(user_input)
log.info("conversation.completed",
output_length=len(result.output),
tokens_used=result.tokens,
duration_ms=result.duration * 1000)
return result
except Exception as e:
log.error("conversation.failed",
error=str(e),
error_type=type(e).__name__)
raise
Level 4:生产陷阱与决策(专家视角)
某 500 人企业的真实监控问题复盘
背景:某制造业企业使用 Dify 搭建了 4 个内部 AI 应用,2024 年初上线。上线 3 个月后遭遇以下问题:
问题1:OpenAI API 费用失控
某天早上突然发现 OpenAI 账单暴增,单日消耗 $850,是平时的 10 倍。
调查过程:
- 查 Grafana Token 消耗面板 → 发现凌晨 2:00 开始异常
- 查 Loki 日志 → 找到大量
app_id=prod-sales-assistant的请求 - 深入查日志 → 发现某销售助理 Prompt 被修改,Prompt 长度从 1000 token 暴增到 12000 token
- 查操作日志 → 确认是某员工在 Console 中误操作,将示例文档整个复制进了 System Prompt
根因:缺少 Prompt 长度告警,缺少关键操作的二次确认。
解决方案:
# 增加 Prompt 长度告警
- alert: LargePromptDetected
expr: >
avg(dify_llm_prompt_tokens) by (app_id) > 5000
for: 5m
annotations:
summary: "应用 {{ $labels.app_id }} 的 Prompt Token 数异常"
问题2:RAG 检索质量无法衡量
用户反馈"AI 回答不准确",但监控面板只能看到请求量和延迟,无法判断检索质量。
解决方案:增加自定义业务指标:
# 在 RAG 链路中记录检索质量分数
from prometheus_client import Histogram, Counter
retrieval_score_histogram = Histogram(
'dify_retrieval_score',
'Vector retrieval similarity scores',
buckets=[0.5, 0.6, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 1.0],
labelnames=['knowledge_base_id']
)
low_confidence_counter = Counter(
'dify_low_confidence_retrievals_total',
'Retrievals with score below threshold',
labelnames=['knowledge_base_id']
)
def retrieval_with_metrics(query, kb_id, threshold=0.75):
results = vector_store.similarity_search_with_score(query)
for doc, score in results:
retrieval_score_histogram.labels(kb_id).observe(score)
if score < threshold:
low_confidence_counter.labels(kb_id).inc()
return results
问题3:Celery Worker 任务丢失
文档上传后知识库更新延迟,有时长达数小时,有时彻底丢失。
调查发现:Redis 内存满了,采用 allkeys-lru 策略驱逐了 Celery 任务队列消息。
解决方案:
# Redis 分库,Celery 队列使用独立 Redis(db 1)
CELERY_BROKER_URL: redis://:密码@redis:6379/1
# 并设置 Redis db 1 不驱逐(只给 Celery 用)
redis-server --maxmemory-policy noeviction --databases 16
同时增加告警:
- alert: RedisMemoryHigh
expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.85
for: 5m
annotations:
summary: "Redis 内存使用率超过 85%"
成本优化的实战经验
策略1:模型降级路由
不是所有请求都需要 GPT-4,通过分类器自动降级:
def smart_model_selector(query: str, context_complexity: float) -> str:
"""根据问题复杂度自动选择合适的模型"""
# 简单问候、FAQ -> 用便宜模型
if context_complexity < 0.3 or is_simple_query(query):
return "gpt-3.5-turbo" # $0.002/1K token
# 中等复杂度 -> 用 GPT-4 Turbo
elif context_complexity < 0.7:
return "gpt-4-turbo" # $0.03/1K token
# 高复杂度、需要推理 -> GPT-4
else:
return "gpt-4" # $0.06/1K token
策略2:结果缓存
对于重复性高的查询(如 FAQ),缓存 LLM 响应:
import hashlib
import redis
cache = redis.Redis()
def cached_llm_call(prompt: str, model: str, ttl: int = 3600) -> str:
cache_key = f"llm:{hashlib.md5(f'{model}:{prompt}'.encode()).hexdigest()}"
cached = cache.get(cache_key)
if cached:
return cached.decode()
response = llm.invoke(prompt, model=model)
cache.setex(cache_key, ttl, response.content)
return response.content
策略3:Token 预算限制
# 在 Dify 应用配置中设置每用户每日 Token 上限
class TokenBudgetMiddleware:
def __init__(self, daily_limit: int = 50000):
self.daily_limit = daily_limit
def check_budget(self, user_id: str) -> bool:
today = datetime.now().strftime('%Y-%m-%d')
key = f"token_budget:{user_id}:{today}"
used = int(redis.get(key) or 0)
return used < self.daily_limit
def consume(self, user_id: str, tokens: int):
today = datetime.now().strftime('%Y-%m-%d')
key = f"token_budget:{user_id}:{today}"
redis.incrby(key, tokens)
redis.expire(key, 86400) # 24 小时过期
可观测性成熟度模型
| 成熟度等级 | 描述 | 典型配置 |
|---|---|---|
| Level 0 | 无监控,靠用户投诉发现问题 | 仅 Dify 内置日志 |
| Level 1 | 基础指标监控 | Prometheus + Grafana |
| Level 2 | 日志聚合 + 告警 | + Loki + AlertManager |
| Level 3 | 分布式追踪 | + OpenTelemetry + Jaeger |
| Level 4 | 业务指标 + 成本可视化 | + 自定义指标 + 成本 Dashboard |
| Level 5 | AIOps:异常检测 + 自动根因分析 | + ML 异常检测 |
建议大多数企业以 Level 3 为目标,Level 4 为理想状态。
本章小结
核心要点:
-
LLM 可观测性 = 传统指标 + Token 成本 + 检索质量,三者缺一不可。
-
Prometheus + Grafana + Loki 是轻量级但功能完整的监控栈,适合 50-1000 人规模。
-
结构化日志(JSON 格式 + Trace ID 关联)是快速定位问题的关键,绝对不能用 print 调试。
-
成本告警必须在上线前配置,而不是等到账单超支后才设置。
-
业务指标(检索相似度、用户反馈) 比技术指标更能反映真实服务质量。
-
Celery 任务队列 Redis 要独立配置,切忌与缓存混用,否则内存满时任务会丢失。
告警优先级配置建议:
| 告警名称 | 阈值 | 严重级别 | 通知方式 |
|---|---|---|---|
| API 错误率 | > 5% | Critical | 即时电话/短信 |
| 日费用超预算 | > 100 USD | Critical | 即时消息 |
| P95 延迟过高 | > 10s | Warning | 群消息 |
| Redis 内存 | > 85% | Warning | 群消息 |
| Worker 积压 | > 100 任务 | Warning | 邮件 |
| Prompt Token | > 5000 avg | Info | 日报 |