第 20 章

可观测性:日志、追踪、成本控制与告警体系

第20章:可观测性——日志、追踪、成本控制与告警体系

你无法管理你看不见的东西——本章教你为 Dify 构建完整的可观测性体系,让每一次 LLM 调用、每一分 Token 消耗都清晰可查。

本章导读

生产环境的 Dify 就像一辆高速行驶的赛车,没有仪表盘你根本不知道何时会爆胎。可观测性(Observability)的三个支柱是:

对于 Dify 这类 LLM 应用平台,还有第四个关键维度:

本章将带你从零构建完整的可观测性栈:Prometheus + Grafana 指标监控、ELK/Loki 日志聚合、OpenTelemetry 分布式追踪、以及基于 Token 消耗的成本控制与告警。

读完本章,你将能够:


Level 1:基础认知(1-3 年经验)

为什么 LLM 应用需要特殊的可观测性

传统 Web 应用的可观测性重点是延迟和错误率,而 LLM 应用有额外的挑战:

  1. 响应时间高度不确定:同一问题,GPT-4 可能返回 500ms,也可能返回 30s,这取决于 Token 数量和模型负载
  2. 成本与使用量强相关:一个写错的 Prompt 可能让 Token 消耗暴涨 10 倍
  3. 质量难以用传统指标衡量:HTTP 200 不代表答案正确,需要额外的业务指标
  4. 调用链复杂:一次用户请求可能触发 RAG 检索 → 多次 LLM 调用 → Tool 执行 → 再次 LLM 调用

类比:传统可观测性像看汽车的油表和时速表,而 LLM 可观测性还要看"这段路花了多少油费"和"乘客是否满意这段旅途"。

Dify 内置监控功能

Dify 在管理控制台提供了基础的内置监控,无需额外配置:

应用监控(Console → 应用 → 监控)

日志查看(Console → 日志):

局限性:内置监控只能看单个应用,无法跨应用聚合,无法设置告警,无法保留超过 30 天的数据。

Prometheus 快速接入

Dify API 暴露了 /metrics 端点(Prometheus 格式),可以直接抓取。

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  # Dify API 指标
  - job_name: 'dify-api'
    static_configs:
      - targets: ['dify-api:5001']
    metrics_path: '/metrics'
    scrape_interval: 30s

  # PostgreSQL 指标(通过 postgres_exporter)
  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres-exporter:9187']

  # Redis 指标(通过 redis_exporter)
  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

  # Weaviate 指标
  - job_name: 'weaviate'
    static_configs:
      - targets: ['weaviate:2112']
    metrics_path: '/metrics'

  # Node 主机指标
  - job_name: 'node'
    static_configs:
      - targets: ['node-exporter:9100']
# docker-compose 添加监控组件
services:
  prometheus:
    image: prom/prometheus:v2.48.0
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.retention.time=30d'
      - '--web.enable-lifecycle'
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana:10.2.0
    environment:
      GF_SECURITY_ADMIN_PASSWORD: grafana强密码
      GF_USERS_ALLOW_SIGN_UP: 'false'
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./grafana/datasources:/etc/grafana/provisioning/datasources
    ports:
      - "3001:3000"

  postgres-exporter:
    image: prometheuscommunity/postgres-exporter:v0.15.0
    environment:
      DATA_SOURCE_NAME: postgresql://dify:密码@db:5432/dify?sslmode=disable
    
  redis-exporter:
    image: oliver006/redis_exporter:v1.55.0
    environment:
      REDIS_ADDR: redis://redis:6379
      REDIS_PASSWORD: Redis密码

  node-exporter:
    image: prom/node-exporter:v1.7.0
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro
    command:
      - '--path.procfs=/host/proc'
      - '--path.sysfs=/host/sys'

Level 2:机制深解(3-5 年经验)

Grafana Dashboard 核心面板配置

下面是一套覆盖 Dify 核心场景的 Grafana Dashboard 配置(JSON 片段):

Panel 1:API 请求速率

{
  "title": "API Request Rate",
  "type": "graph",
  "targets": [
    {
      "expr": "rate(http_requests_total{job='dify-api'}[5m])",
      "legendFormat": "{{method}} {{path}}"
    }
  ]
}

Panel 2:LLM 调用延迟分布

# P50 延迟
histogram_quantile(0.50, rate(dify_llm_request_duration_seconds_bucket[5m]))

# P95 延迟  
histogram_quantile(0.95, rate(dify_llm_request_duration_seconds_bucket[5m]))

# P99 延迟
histogram_quantile(0.99, rate(dify_llm_request_duration_seconds_bucket[5m]))

Panel 3:Token 消耗趋势

# 每分钟 Token 消耗速率
rate(dify_llm_tokens_total[5m])

# 按模型分组
rate(dify_llm_tokens_total{model=~"gpt-4.*"}[5m])
rate(dify_llm_tokens_total{model=~"gpt-3.5.*"}[5m])

Panel 4:向量检索耗时

histogram_quantile(0.95, 
  rate(dify_vector_search_duration_seconds_bucket[5m])
)

Panel 5:Celery Worker 队列积压

# 待处理任务数量
celery_tasks_received_total - celery_tasks_succeeded_total - celery_tasks_failed_total

日志聚合:Loki + Promtail

对于中小规模部署,Loki 比 ELK 更轻量:

# loki 配置(docker-compose 添加)
services:
  loki:
    image: grafana/loki:2.9.0
    volumes:
      - ./loki-config.yaml:/etc/loki/local-config.yaml
      - loki_data:/loki
    command: -config.file=/etc/loki/local-config.yaml
    ports:
      - "3100:3100"

  promtail:
    image: grafana/promtail:2.9.0
    volumes:
      - /var/log:/var/log
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
      - ./promtail-config.yaml:/etc/promtail/config.yml
    command: -config.file=/etc/promtail/config.yml
# loki-config.yaml
auth_enabled: false

server:
  http_listen_port: 3100

ingester:
  lifecycler:
    ring:
      kvstore:
        store: inmemory
      replication_factor: 1

schema_config:
  configs:
  - from: 2024-01-01
    store: boltdb-shipper
    object_store: filesystem
    schema: v11
    index:
      prefix: index_
      period: 24h

storage_config:
  boltdb_shipper:
    active_index_directory: /loki/boltdb-shipper-active
    cache_location: /loki/boltdb-shipper-cache
  filesystem:
    directory: /loki/chunks

limits_config:
  retention_period: 30d
  ingestion_rate_mb: 16
  ingestion_burst_size_mb: 32
# promtail-config.yaml
server:
  http_listen_port: 9080

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: dify-containers
    docker_sd_configs:
      - host: unix:///var/run/docker.sock
        refresh_interval: 5s
    relabel_configs:
      # 只采集 dify 相关容器
      - source_labels: [__meta_docker_container_name]
        regex: /dify.*
        action: keep
      # 提取容器名作为标签
      - source_labels: [__meta_docker_container_name]
        target_label: container
      # 提取服务名
      - source_labels: [__meta_docker_container_label_com_docker_compose_service]
        target_label: service
    pipeline_stages:
      # 解析 JSON 格式日志
      - json:
          expressions:
            level: level
            message: message
            timestamp: timestamp
            request_id: request_id
      # 将 JSON 字段提取为 Loki 标签
      - labels:
          level:
          service:
      # 设置时间戳
      - timestamp:
          source: timestamp
          format: RFC3339

在 Grafana 中查询日志

# 查看 dify-api 的所有错误日志
{service="api"} |= "ERROR"

# 查看特定 request_id 的完整调用链
{service=~"api|worker"} |= "req-abc-12345"

# 统计过去1小时每分钟错误数
rate({service="api"} |= "ERROR" [1m])

# 查看 LLM 调用超时
{service="api"} |~ "timeout|LLM.*error"

OpenTelemetry 分布式追踪

Dify 支持通过 OpenTelemetry 将追踪数据发送到 Jaeger 或 Zipkin:

# .env 中启用 OpenTelemetry
ENABLE_OTEL=true
OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317
OTEL_SERVICE_NAME=dify-api
OTEL_TRACES_SAMPLER=parentbased_traceidratio
OTEL_TRACES_SAMPLER_ARG=0.1  # 采样 10%,降低开销
# docker-compose 添加 Jaeger
services:
  jaeger:
    image: jaegertracing/all-in-one:1.52
    environment:
      COLLECTOR_OTLP_ENABLED: 'true'
      SPAN_STORAGE_TYPE: elasticsearch
      ES_SERVER_URLS: http://elasticsearch:9200
    ports:
      - "16686:16686"  # Jaeger UI
      - "4317:4317"    # OTLP gRPC
      - "4318:4318"    # OTLP HTTP

自定义 Span 追踪(在 Dify 自定义工具或 API 扩展中):

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

def my_custom_tool_logic(query: str, context: dict) -> str:
    with tracer.start_as_current_span("custom_tool.execute") as span:
        span.set_attribute("tool.name", "my_custom_tool")
        span.set_attribute("query.length", len(query))
        
        try:
            # 子 span:数据库查询
            with tracer.start_as_current_span("db.query") as db_span:
                db_span.set_attribute("db.statement", "SELECT ...")
                result = db.query(query)
                db_span.set_attribute("db.rows_returned", len(result))
            
            # 子 span:LLM 调用
            with tracer.start_as_current_span("llm.call") as llm_span:
                llm_span.set_attribute("llm.model", "gpt-4")
                response = llm.invoke(result)
                llm_span.set_attribute("llm.tokens_used", response.usage.total_tokens)
            
            span.set_status(Status(StatusCode.OK))
            return response.content
            
        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

成本控制与预算告警

Token 成本计算逻辑(Dify 后端实际计算方式):

# 不同模型的价格(USD/1K tokens,2024年价格)
MODEL_PRICING = {
    "gpt-4": {"input": 0.03, "output": 0.06},
    "gpt-4-turbo": {"input": 0.01, "output": 0.03},
    "gpt-3.5-turbo": {"input": 0.001, "output": 0.002},
    "claude-3-sonnet": {"input": 0.003, "output": 0.015},
    "claude-3-haiku": {"input": 0.00025, "output": 0.00125},
}

def calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    """计算单次调用费用(美元)"""
    pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
    cost = (input_tokens / 1000) * pricing["input"]
    cost += (output_tokens / 1000) * pricing["output"]
    return cost

Prometheus 告警规则:成本超预算

# alerts.yaml
groups:
  - name: dify_cost_alerts
    interval: 5m
    rules:
      # 每小时 Token 消耗超过 10万
      - alert: HighTokenConsumption
        expr: >
          increase(dify_llm_tokens_total[1h]) > 100000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Token 消耗过高"
          description: "过去1小时 Token 消耗 {{ $value }} 超过阈值 100,000"
      
      # 估算日费用超过 100 美元
      - alert: DailyBudgetExceeded
        expr: >
          sum(increase(dify_llm_cost_usd_total[24h])) > 100
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "日费用预算超标"
          description: "过去24小时估算费用 ${{ $value }} 超过日预算 $100"
      
      # 错误率超过 5%
      - alert: HighErrorRate
        expr: >
          rate(http_requests_total{job="dify-api",status=~"5.."}[5m])
          /
          rate(http_requests_total{job="dify-api"}[5m]) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "API 错误率过高"
          description: "API 错误率 {{ $value | humanizePercentage }} 超过 5%"

      # P95 延迟超过 10 秒
      - alert: HighLatency
        expr: >
          histogram_quantile(0.95,
            rate(dify_llm_request_duration_seconds_bucket[5m])
          ) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM 调用延迟过高"
          description: "P95 延迟 {{ $value }}s 超过 10s 阈值"

      # Worker 队列积压超过 100
      - alert: WorkerQueueBacklog
        expr: >
          celery_tasks_received_total - on() 
          (celery_tasks_succeeded_total + celery_tasks_failed_total) > 100
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Worker 队列积压"
          description: "待处理任务 {{ $value }} 超过阈值"

AlertManager 配置(飞书/钉钉通知)

# alertmanager.yml
global:
  resolve_timeout: 5m

route:
  group_by: ['alertname', 'severity']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h
  receiver: 'feishu-webhook'
  routes:
  - match:
      severity: critical
    receiver: 'feishu-webhook'
    repeat_interval: 1h
  - match:
      severity: warning
    receiver: 'feishu-webhook'
    repeat_interval: 4h

receivers:
  - name: 'feishu-webhook'
    webhook_configs:
      - url: 'https://open.feishu.cn/open-apis/bot/v2/hook/your-webhook-token'
        send_resolved: true
        http_config:
          # 飞书 webhook 需要自定义 body
        # 使用 alertmanager-webhook-adapter 转换格式
# alertmanager-webhook-adapter.py
# 将 Prometheus AlertManager 告警转换为飞书消息格式

from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/your-token"

@app.route('/alert', methods=['POST'])
def handle_alert():
    data = request.json
    alerts = data.get('alerts', [])
    
    for alert in alerts:
        status = alert['status']
        name = alert['labels'].get('alertname', 'Unknown')
        severity = alert['labels'].get('severity', 'info')
        summary = alert['annotations'].get('summary', '')
        description = alert['annotations'].get('description', '')
        
        # 根据严重程度选择颜色
        color = {
            'critical': 'red',
            'warning': 'orange',
            'info': 'blue'
        }.get(severity, 'grey')
        
        # 飞书卡片消息格式
        card = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": {
                        "content": f"{'🔴' if severity == 'critical' else '🟡'} [{status.upper()}] {name}",
                        "tag": "plain_text"
                    },
                    "template": color
                },
                "elements": [
                    {
                        "tag": "div",
                        "text": {
                            "content": f"**摘要**: {summary}\n**详情**: {description}",
                            "tag": "lark_md"
                        }
                    }
                ]
            }
        }
        
        requests.post(FEISHU_WEBHOOK, json=card)
    
    return jsonify({"status": "ok"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Level 3:源码与原理(5 年以上)

Dify 可观测性内部实现

Dify 的 API 服务基于 Flask,其指标暴露使用 prometheus_flask_exporter 库:

# api/app.py(简化版)
from prometheus_flask_exporter import PrometheusMetrics

app = Flask(__name__)
metrics = PrometheusMetrics(app)

# 自定义 LLM 调用计数器
llm_request_counter = metrics.counter(
    'dify_llm_requests_total',
    'Total LLM API requests',
    labels={'model': lambda: g.current_model, 'status': lambda: g.request_status}
)

# LLM 调用延迟直方图
llm_latency_histogram = metrics.histogram(
    'dify_llm_request_duration_seconds',
    'LLM request duration in seconds',
    labels={'model': lambda: g.current_model},
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, float('inf')]
)

# Token 消耗计数器
token_counter = metrics.counter(
    'dify_llm_tokens_total',
    'Total LLM tokens consumed',
    labels={'model': lambda: g.current_model, 'type': 'total'}
)

LLM 调用中间件追踪

# api/core/model_runtime/model_providers/_base.py(简化)
from opentelemetry import trace

tracer = trace.get_tracer('dify.model_runtime')

class BaseModelProvider:
    def invoke(self, model_parameters: dict, **kwargs):
        with tracer.start_as_current_span(f"llm.invoke.{self.model}") as span:
            span.set_attribute("llm.model", self.model)
            span.set_attribute("llm.provider", self.provider)
            span.set_attribute("llm.temperature", model_parameters.get('temperature', 1.0))
            
            start_time = time.time()
            try:
                response = self._invoke_model(model_parameters, **kwargs)
                
                # 记录 Token 使用量
                if hasattr(response, 'usage'):
                    span.set_attribute("llm.input_tokens", response.usage.prompt_tokens)
                    span.set_attribute("llm.output_tokens", response.usage.completion_tokens)
                    span.set_attribute("llm.total_tokens", response.usage.total_tokens)
                
                duration = time.time() - start_time
                span.set_attribute("llm.duration_seconds", duration)
                
                return response
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.Status(trace.StatusCode.ERROR))
                raise

向量检索的可观测性

向量检索是 RAG 系统中最难调优的环节,需要特别关注:

# 自定义向量检索追踪中间件
class TracedVectorStore:
    def __init__(self, vector_store, tracer):
        self.vector_store = vector_store
        self.tracer = tracer
    
    def similarity_search(self, query: str, k: int = 4, **kwargs):
        with self.tracer.start_as_current_span("vector.similarity_search") as span:
            span.set_attribute("vector.query_length", len(query))
            span.set_attribute("vector.top_k", k)
            span.set_attribute("vector.store_type", type(self.vector_store).__name__)
            
            start = time.time()
            results = self.vector_store.similarity_search(query, k=k, **kwargs)
            elapsed = time.time() - start
            
            span.set_attribute("vector.results_count", len(results))
            span.set_attribute("vector.duration_ms", elapsed * 1000)
            
            if results:
                # 记录最高相似度分数
                if hasattr(results[0], 'score'):
                    span.set_attribute("vector.top_score", results[0].score)
            
            # 推送到 Prometheus
            vector_search_histogram.observe(elapsed, labels={
                'store': type(self.vector_store).__name__
            })
            
            return results

生产级日志结构化方案

# api/core/logging.py
import structlog
import logging

def configure_structlog():
    """配置结构化日志,便于 Loki/ELK 解析"""
    structlog.configure(
        processors=[
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            # 添加追踪上下文(自动关联 Trace ID)
            add_open_telemetry_spans,
            structlog.processors.JSONRenderer()  # 输出 JSON 格式
        ],
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

def add_open_telemetry_spans(logger, method, event_dict):
    """将当前 Span ID 注入到日志中,方便 Trace 和 Log 联动"""
    from opentelemetry import trace
    span = trace.get_current_span()
    if span.is_recording():
        ctx = span.get_span_context()
        event_dict['trace_id'] = format(ctx.trace_id, '032x')
        event_dict['span_id'] = format(ctx.span_id, '016x')
    return event_dict

# 使用示例
logger = structlog.get_logger()

def process_conversation(conversation_id: str, user_input: str):
    log = logger.bind(
        conversation_id=conversation_id,
        user_id=g.user_id,
        app_id=g.app_id
    )
    
    log.info("conversation.started", input_length=len(user_input))
    
    try:
        result = run_chain(user_input)
        log.info("conversation.completed",
                 output_length=len(result.output),
                 tokens_used=result.tokens,
                 duration_ms=result.duration * 1000)
        return result
    except Exception as e:
        log.error("conversation.failed",
                  error=str(e),
                  error_type=type(e).__name__)
        raise

Level 4:生产陷阱与决策(专家视角)

某 500 人企业的真实监控问题复盘

背景:某制造业企业使用 Dify 搭建了 4 个内部 AI 应用,2024 年初上线。上线 3 个月后遭遇以下问题:

问题1:OpenAI API 费用失控

某天早上突然发现 OpenAI 账单暴增,单日消耗 $850,是平时的 10 倍。

调查过程:

  1. 查 Grafana Token 消耗面板 → 发现凌晨 2:00 开始异常
  2. 查 Loki 日志 → 找到大量 app_id=prod-sales-assistant 的请求
  3. 深入查日志 → 发现某销售助理 Prompt 被修改,Prompt 长度从 1000 token 暴增到 12000 token
  4. 查操作日志 → 确认是某员工在 Console 中误操作,将示例文档整个复制进了 System Prompt

根因:缺少 Prompt 长度告警,缺少关键操作的二次确认。

解决方案

# 增加 Prompt 长度告警
- alert: LargePromptDetected
  expr: >
    avg(dify_llm_prompt_tokens) by (app_id) > 5000
  for: 5m
  annotations:
    summary: "应用 {{ $labels.app_id }} 的 Prompt Token 数异常"

问题2:RAG 检索质量无法衡量

用户反馈"AI 回答不准确",但监控面板只能看到请求量和延迟,无法判断检索质量。

解决方案:增加自定义业务指标:

# 在 RAG 链路中记录检索质量分数
from prometheus_client import Histogram, Counter

retrieval_score_histogram = Histogram(
    'dify_retrieval_score',
    'Vector retrieval similarity scores',
    buckets=[0.5, 0.6, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 1.0],
    labelnames=['knowledge_base_id']
)

low_confidence_counter = Counter(
    'dify_low_confidence_retrievals_total',
    'Retrievals with score below threshold',
    labelnames=['knowledge_base_id']
)

def retrieval_with_metrics(query, kb_id, threshold=0.75):
    results = vector_store.similarity_search_with_score(query)
    
    for doc, score in results:
        retrieval_score_histogram.labels(kb_id).observe(score)
        if score < threshold:
            low_confidence_counter.labels(kb_id).inc()
    
    return results

问题3:Celery Worker 任务丢失

文档上传后知识库更新延迟,有时长达数小时,有时彻底丢失。

调查发现:Redis 内存满了,采用 allkeys-lru 策略驱逐了 Celery 任务队列消息。

解决方案:

# Redis 分库,Celery 队列使用独立 Redis(db 1)
CELERY_BROKER_URL: redis://:密码@redis:6379/1

# 并设置 Redis db 1 不驱逐(只给 Celery 用)
redis-server --maxmemory-policy noeviction --databases 16

同时增加告警:

- alert: RedisMemoryHigh
  expr: redis_memory_used_bytes / redis_memory_max_bytes > 0.85
  for: 5m
  annotations:
    summary: "Redis 内存使用率超过 85%"

成本优化的实战经验

策略1:模型降级路由

不是所有请求都需要 GPT-4,通过分类器自动降级:

def smart_model_selector(query: str, context_complexity: float) -> str:
    """根据问题复杂度自动选择合适的模型"""
    
    # 简单问候、FAQ -> 用便宜模型
    if context_complexity < 0.3 or is_simple_query(query):
        return "gpt-3.5-turbo"  # $0.002/1K token
    
    # 中等复杂度 -> 用 GPT-4 Turbo
    elif context_complexity < 0.7:
        return "gpt-4-turbo"    # $0.03/1K token
    
    # 高复杂度、需要推理 -> GPT-4
    else:
        return "gpt-4"          # $0.06/1K token

策略2:结果缓存

对于重复性高的查询(如 FAQ),缓存 LLM 响应:

import hashlib
import redis

cache = redis.Redis()

def cached_llm_call(prompt: str, model: str, ttl: int = 3600) -> str:
    cache_key = f"llm:{hashlib.md5(f'{model}:{prompt}'.encode()).hexdigest()}"
    
    cached = cache.get(cache_key)
    if cached:
        return cached.decode()
    
    response = llm.invoke(prompt, model=model)
    cache.setex(cache_key, ttl, response.content)
    
    return response.content

策略3:Token 预算限制

# 在 Dify 应用配置中设置每用户每日 Token 上限
class TokenBudgetMiddleware:
    def __init__(self, daily_limit: int = 50000):
        self.daily_limit = daily_limit
    
    def check_budget(self, user_id: str) -> bool:
        today = datetime.now().strftime('%Y-%m-%d')
        key = f"token_budget:{user_id}:{today}"
        used = int(redis.get(key) or 0)
        return used < self.daily_limit
    
    def consume(self, user_id: str, tokens: int):
        today = datetime.now().strftime('%Y-%m-%d')
        key = f"token_budget:{user_id}:{today}"
        redis.incrby(key, tokens)
        redis.expire(key, 86400)  # 24 小时过期

可观测性成熟度模型

成熟度等级 描述 典型配置
Level 0 无监控,靠用户投诉发现问题 仅 Dify 内置日志
Level 1 基础指标监控 Prometheus + Grafana
Level 2 日志聚合 + 告警 + Loki + AlertManager
Level 3 分布式追踪 + OpenTelemetry + Jaeger
Level 4 业务指标 + 成本可视化 + 自定义指标 + 成本 Dashboard
Level 5 AIOps:异常检测 + 自动根因分析 + ML 异常检测

建议大多数企业以 Level 3 为目标,Level 4 为理想状态。


本章小结

核心要点

  1. LLM 可观测性 = 传统指标 + Token 成本 + 检索质量,三者缺一不可。

  2. Prometheus + Grafana + Loki 是轻量级但功能完整的监控栈,适合 50-1000 人规模。

  3. 结构化日志(JSON 格式 + Trace ID 关联)是快速定位问题的关键,绝对不能用 print 调试。

  4. 成本告警必须在上线前配置,而不是等到账单超支后才设置。

  5. 业务指标(检索相似度、用户反馈) 比技术指标更能反映真实服务质量。

  6. Celery 任务队列 Redis 要独立配置,切忌与缓存混用,否则内存满时任务会丢失。

告警优先级配置建议

告警名称 阈值 严重级别 通知方式
API 错误率 > 5% Critical 即时电话/短信
日费用超预算 > 100 USD Critical 即时消息
P95 延迟过高 > 10s Warning 群消息
Redis 内存 > 85% Warning 群消息
Worker 积压 > 100 任务 Warning 邮件
Prompt Token > 5000 avg Info 日报
本章评分
4.8  / 5  (9 评分)

💬 留言讨论