第 17 章

API 集成全指南:REST / 流式 / WebSocket 完整实现

第十七章:API 集成全指南:REST / 流式 / WebSocket 完整实现

从零掌握 Dify API 的三种调用模式:阻塞式 REST 适合简单集成,SSE 流式响应提升用户体验,WebSocket 双向通信实现复杂交互——每种模式都有完整的代码实现和生产注意事项。

本章导读

Dify 提供了一套完整的 API 体系,让你能够把 AI 能力集成到任何系统——无论是企业内部的 ERP、对外的 SaaS 产品,还是移动 App。但三种 API 模式各有适用场景,选错了会导致用户体验差、服务器资源浪费或实现复杂度暴增。

本章按照从入门到专家的层次,系统讲解:

读完本章,你将能够:


Level 1:基础认知(1-3 年经验)

Dify API 概览

Dify 的 API 分为两类:

1. 应用 API(Application API)

2. 服务 API(Service API)/ 管理 API

获取 API Key

在 Dify 控制台获取应用 API Key:

  1. 进入你的 Dify 应用 → 访问 API
  2. 点击"创建 API 密钥"
  3. 复制生成的密钥(格式:app-xxxxxxxxxxxxxxxxxxxxxxxx
# 测试 API Key 是否有效
curl -X GET 'https://api.dify.ai/v1/info' \
  -H 'Authorization: Bearer app-your_api_key_here'

REST API:最简单的调用方式

对话型应用的基础调用(阻塞模式):

# 发起一次对话
curl -X POST 'https://api.dify.ai/v1/chat-messages' \
  -H 'Authorization: Bearer app-your_api_key_here' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {},
    "query": "你好,介绍一下 Dify",
    "response_mode": "blocking",
    "conversation_id": "",
    "user": "user-001"
  }'

响应示例:

{
  "event": "message",
  "message_id": "msg-abc123",
  "conversation_id": "conv-xyz456",
  "answer": "Dify 是一个开源的 LLMOps 平台,帮助你快速构建和部署 AI 应用...",
  "created_at": 1710000000,
  "metadata": {
    "usage": {
      "prompt_tokens": 150,
      "completion_tokens": 200,
      "total_tokens": 350,
      "total_price": "0.0000105"
    }
  }
}

流式 API:实现打字机效果

在实际产品中,等待完整响应通常需要数秒,用户体验很差。流式 API 可以边生成边返回,让用户看到逐渐出现的文字:

# 流式调用(response_mode 改为 streaming)
curl -X POST 'https://api.dify.ai/v1/chat-messages' \
  -H 'Authorization: Bearer app-your_api_key_here' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {},
    "query": "写一首关于春天的诗",
    "response_mode": "streaming",
    "user": "user-001"
  }'

流式响应(SSE 格式):

data: {"event": "message", "answer": "春", "message_id": "msg-001", ...}

data: {"event": "message", "answer": "风", "message_id": "msg-001", ...}

data: {"event": "message", "answer": "吹", "message_id": "msg-001", ...}

data: {"event": "message_end", "metadata": {...}}

data: [DONE]

每个 data: 行是一个独立的 JSON 事件,前端可以逐步显示。

Python 客户端基础示例

import httpx
import json

DIFY_API_URL = "https://api.dify.ai/v1"
API_KEY      = "app-your_api_key_here"

def chat_blocking(query: str, user_id: str = "default") -> str:
    """阻塞式调用(等待完整响应)"""
    resp = httpx.post(
        f"{DIFY_API_URL}/chat-messages",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={
            "query":         query,
            "response_mode": "blocking",
            "user":          user_id,
            "inputs":        {}
        },
        timeout=60.0
    )
    resp.raise_for_status()
    data = resp.json()
    return data["answer"]

def chat_streaming(query: str, user_id: str = "default"):
    """流式调用(逐步返回内容)"""
    with httpx.stream(
        "POST",
        f"{DIFY_API_URL}/chat-messages",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={
            "query":         query,
            "response_mode": "streaming",
            "user":          user_id,
            "inputs":        {}
        },
        timeout=120.0
    ) as resp:
        resp.raise_for_status()
        for line in resp.iter_lines():
            if line.startswith("data: "):
                data_str = line[6:]
                if data_str == "[DONE]":
                    break
                event = json.loads(data_str)
                if event.get("event") == "message":
                    yield event.get("answer", "")

# 使用示例
print("=== 阻塞模式 ===")
answer = chat_blocking("你好,介绍一下 Dify")
print(answer)

print("\n=== 流式模式 ===")
for chunk in chat_streaming("写一首关于春天的诗"):
    print(chunk, end="", flush=True)
print()

Level 2:机制深解(3-5 年经验)

Dify API 端点完整参考

对话型应用(Chatbot):

POST /v1/chat-messages            # 发送消息
GET  /v1/conversations            # 获取对话列表
GET  /v1/conversations/{id}/messages # 获取消息历史
DELETE /v1/conversations/{id}     # 删除对话
POST /v1/messages/{id}/feedbacks  # 用户反馈(点赞/踩)
GET  /v1/suggested                # 获取推荐问题
POST /v1/audio-to-text           # 语音转文字

文本生成型应用(Completion):

POST /v1/completion-messages      # 文本生成(阻塞)
POST /v1/completion-messages      # 文本生成(流式)

工作流型应用(Workflow):

POST /v1/workflows/run            # 执行工作流
GET  /v1/workflows/run/{id}       # 查询执行状态
GET  /v1/workflows/run/{id}/logs  # 获取执行日志

文件上传:

POST /v1/files/upload             # 上传文件(图片、文档等)

完整的多轮对话管理

在多轮对话中,需要维护 conversation_id 来保持上下文:

import httpx
import json
from typing import Optional, Generator

class DifyClient:
    """Dify API 完整客户端"""

    def __init__(self, api_url: str, api_key: str):
        self.base_url = api_url.rstrip("/")
        self.headers  = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }
        self._client  = httpx.Client(timeout=120.0)
        self._async_client = httpx.AsyncClient(timeout=120.0)

    # ===== 同步 API =====

    def send_message(
        self,
        query:           str,
        user:            str,
        conversation_id: Optional[str] = None,
        inputs:          dict          = None,
        files:           list          = None,
    ) -> dict:
        """发送消息(阻塞模式)"""
        payload = {
            "query":           query,
            "user":            user,
            "response_mode":   "blocking",
            "inputs":          inputs or {},
            "conversation_id": conversation_id or "",
        }
        if files:
            payload["files"] = files

        resp = self._client.post(
            f"{self.base_url}/chat-messages",
            headers=self.headers,
            json=payload
        )
        resp.raise_for_status()
        return resp.json()

    def send_message_stream(
        self,
        query:           str,
        user:            str,
        conversation_id: Optional[str] = None,
        inputs:          dict          = None,
    ) -> Generator[dict, None, None]:
        """发送消息(流式模式)"""
        payload = {
            "query":           query,
            "user":            user,
            "response_mode":   "streaming",
            "inputs":          inputs or {},
            "conversation_id": conversation_id or "",
        }

        with self._client.stream(
            "POST",
            f"{self.base_url}/chat-messages",
            headers=self.headers,
            json=payload
        ) as resp:
            resp.raise_for_status()
            for line in resp.iter_lines():
                if not line or not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    event = json.loads(data_str)
                    yield event
                except json.JSONDecodeError:
                    continue

    def get_conversation_history(
        self,
        conversation_id: str,
        user:            str,
        limit:           int = 20,
        first_id:        Optional[str] = None
    ) -> dict:
        """获取对话历史"""
        params = {"user": user, "limit": limit}
        if first_id:
            params["first_id"] = first_id

        resp = self._client.get(
            f"{self.base_url}/conversations/{conversation_id}/messages",
            headers=self.headers,
            params=params
        )
        resp.raise_for_status()
        return resp.json()

    def upload_file(self, file_path: str, user: str) -> dict:
        """上传文件"""
        with open(file_path, "rb") as f:
            resp = self._client.post(
                f"{self.base_url}/files/upload",
                headers={"Authorization": self.headers["Authorization"]},
                files={"file": f},
                data={"user": user}
            )
        resp.raise_for_status()
        return resp.json()

    def submit_feedback(
        self,
        message_id: str,
        rating:     str,  # "like" | "dislike" | None
        user:       str,
        content:    Optional[str] = None
    ) -> dict:
        """提交用户反馈"""
        resp = self._client.post(
            f"{self.base_url}/messages/{message_id}/feedbacks",
            headers=self.headers,
            json={"rating": rating, "user": user, "content": content}
        )
        resp.raise_for_status()
        return resp.json()

    # ===== 异步 API =====

    async def send_message_async(
        self, query: str, user: str, **kwargs
    ) -> dict:
        """异步发送消息(阻塞模式)"""
        payload = {
            "query": query, "user": user,
            "response_mode": "blocking",
            "inputs": kwargs.get("inputs", {}),
            "conversation_id": kwargs.get("conversation_id", ""),
        }
        resp = await self._async_client.post(
            f"{self.base_url}/chat-messages",
            headers=self.headers, json=payload
        )
        resp.raise_for_status()
        return resp.json()

    async def send_message_stream_async(
        self, query: str, user: str, **kwargs
    ):
        """异步流式发送消息"""
        payload = {
            "query": query, "user": user,
            "response_mode": "streaming",
            "inputs": kwargs.get("inputs", {}),
            "conversation_id": kwargs.get("conversation_id", ""),
        }
        async with self._async_client.stream(
            "POST", f"{self.base_url}/chat-messages",
            headers=self.headers, json=payload
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line or not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    yield json.loads(data_str)
                except json.JSONDecodeError:
                    continue

    def close(self):
        self._client.close()
        import asyncio
        asyncio.create_task(self._async_client.aclose())

前端流式集成(JavaScript/TypeScript)

在前端实现流式输出有两种方式:EventSource API(标准 SSE)和 Fetch API + ReadableStream

// 方式一:使用 EventSource(限制:不支持自定义 Header,无法发送 Body)
// 不推荐用于 Dify(因为需要 POST 请求和 Bearer Token)

// 方式二:使用 Fetch + ReadableStream(推荐)
async function* streamDifyChatMessage(
  query: string,
  userId: string,
  conversationId: string = "",
  apiKey: string,
  apiUrl: string = "https://api.dify.ai/v1"
): AsyncGenerator<{event: string; answer?: string; metadata?: any}> {
  const resp = await fetch(`${apiUrl}/chat-messages`, {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${apiKey}`,
      "Content-Type":  "application/json",
    },
    body: JSON.stringify({
      query,
      user:            userId,
      conversation_id: conversationId,
      response_mode:   "streaming",
      inputs:          {},
    }),
  });

  if (!resp.ok) {
    const err = await resp.json();
    throw new Error(`API error ${resp.status}: ${err.message}`);
  }

  const reader  = resp.body!.getReader();
  const decoder = new TextDecoder();
  let   buffer  = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split("\n");
    buffer = lines.pop() || "";  // 最后一行可能不完整,保留到下次

    for (const line of lines) {
      if (!line.startsWith("data: ")) continue;
      const dataStr = line.slice(6).trim();
      if (dataStr === "[DONE]") return;
      try {
        yield JSON.parse(dataStr);
      } catch { /* 忽略非 JSON 行 */ }
    }
  }
}

// React 组件中的使用示例
const ChatComponent: React.FC = () => {
  const [answer, setAnswer]       = useState("");
  const [isLoading, setIsLoading] = useState(false);
  const [convId, setConvId]       = useState("");

  const sendMessage = async (query: string) => {
    setIsLoading(true);
    setAnswer("");

    let fullAnswer  = "";
    let newConvId   = convId;

    try {
      for await (const event of streamDifyChatMessage(
        query, "user-001", convId, API_KEY
      )) {
        if (event.event === "message") {
          fullAnswer += event.answer || "";
          setAnswer(fullAnswer);  // 实时更新 UI
        } else if (event.event === "message_end") {
          if (event.conversation_id) {
            newConvId = event.conversation_id;
            setConvId(newConvId);
          }
        } else if (event.event === "error") {
          throw new Error(event.message);
        }
      }
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <div>
      <div className="answer">{answer}</div>
      {isLoading && <div className="typing-indicator">AI 正在思考...</div>}
    </div>
  );
};

Workflow API 的完整调用

对于工作流型应用,需要处理异步执行和状态查询:

import asyncio
import httpx
from typing import Optional

class DifyWorkflowClient:
    """Dify 工作流 API 客户端"""

    def __init__(self, api_url: str, api_key: str):
        self.base_url = api_url.rstrip("/")
        self.headers  = {"Authorization": f"Bearer {api_key}"}
        self.client   = httpx.AsyncClient(timeout=30.0)

    async def run_workflow(
        self,
        inputs: dict,
        user:   str,
        mode:   str = "blocking"  # "blocking" | "streaming"
    ) -> dict:
        """执行工作流"""
        resp = await self.client.post(
            f"{self.base_url}/workflows/run",
            headers={**self.headers, "Content-Type": "application/json"},
            json={"inputs": inputs, "user": user, "response_mode": mode}
        )
        resp.raise_for_status()

        if mode == "blocking":
            return resp.json()
        else:
            # 流式模式,收集所有事件
            return await self._collect_stream_events(resp)

    async def run_workflow_stream(self, inputs: dict, user: str):
        """流式执行工作流,逐步产出事件"""
        async with self.client.stream(
            "POST",
            f"{self.base_url}/workflows/run",
            headers={**self.headers, "Content-Type": "application/json"},
            json={"inputs": inputs, "user": user, "response_mode": "streaming"}
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    event = json.loads(data_str)
                    yield event
                except json.JSONDecodeError:
                    continue

    async def get_workflow_run(self, run_id: str) -> dict:
        """查询工作流执行状态"""
        resp = await self.client.get(
            f"{self.base_url}/workflows/run/{run_id}",
            headers=self.headers
        )
        resp.raise_for_status()
        return resp.json()

    async def wait_for_completion(
        self,
        run_id:           str,
        poll_interval:    float = 2.0,
        timeout:          float = 300.0
    ) -> dict:
        """轮询等待工作流完成"""
        deadline = asyncio.get_event_loop().time() + timeout
        while asyncio.get_event_loop().time() < deadline:
            status = await self.get_workflow_run(run_id)
            if status["status"] in ["succeeded", "failed", "stopped"]:
                return status
            await asyncio.sleep(poll_interval)
        raise TimeoutError(f"工作流 {run_id} 在 {timeout}s 内未完成")

# 工作流 SSE 事件类型
WORKFLOW_EVENTS = {
    "workflow_started":  "工作流开始执行",
    "node_started":      "节点开始执行",
    "node_finished":     "节点执行完成",
    "workflow_finished": "工作流执行完成",
    "text_chunk":        "文本块(流式输出)",
    "error":             "执行错误",
}

Level 3:源码与原理(5 年以上)

Dify API 服务器的路由架构

Dify 的 API 服务基于 Flask,路由定义在 api/controllers/ 目录:

api/controllers/
├── console/          # 控制台 API(管理用)
│   ├── app/
│   │   ├── chat.py                   # 对话管理
│   │   └── conversation.py           # 对话列表
│   └── datasets/                     # 知识库管理
└── service_api/      # 服务 API(应用调用用)
    ├── app/
    │   ├── chat.py                   # 聊天 API
    │   ├── completion.py             # 文本生成 API
    │   ├── workflow.py               # 工作流 API
    │   └── audio.py                  # 语音 API
    └── index.py                      # 路由注册

流式响应的服务端实现:

# api/controllers/service_api/app/chat.py(关键逻辑)
from flask import Response, stream_with_context
import json

class ChatMessageStreamView(Resource):

    def post(self):
        """处理流式聊天消息请求"""
        data     = request.get_json()
        app_model = g.app_model  # 从认证中间件获取应用
        user      = self._get_or_create_end_user(data["user"])

        if data.get("response_mode") == "streaming":
            return self._streaming_response(data, app_model, user)
        else:
            return self._blocking_response(data, app_model, user)

    def _streaming_response(self, data, app_model, user) -> Response:
        """生成 SSE 流式响应"""
        def generate():
            try:
                # 调用 AppRunner 获取事件流
                response = app_runner.generate(
                    app_model=app_model,
                    user=user,
                    query=data["query"],
                    stream=True,
                )

                for event in response:
                    # 将每个事件格式化为 SSE 格式
                    event_data = self._format_event(event)
                    yield f"data: {json.dumps(event_data, ensure_ascii=False)}\n\n"

                # 发送结束信号
                yield "data: [DONE]\n\n"

            except Exception as e:
                error_event = {"event": "error", "code": 500, "message": str(e)}
                yield f"data: {json.dumps(error_event)}\n\n"
                yield "data: [DONE]\n\n"

        return Response(
            stream_with_context(generate()),
            content_type="text/event-stream",
            headers={
                "Cache-Control":              "no-cache",
                "X-Accel-Buffering":          "no",     # 禁用 Nginx 缓冲
                "Transfer-Encoding":          "chunked",
                "Connection":                 "keep-alive",
                "Access-Control-Allow-Origin": "*",
            }
        )

    def _format_event(self, event) -> dict:
        """将内部 Event 对象格式化为 API 响应格式"""
        if event.type == "llm_message_chunk":
            return {
                "event":           "message",
                "answer":          event.chunk.delta.content,
                "message_id":      event.message_id,
                "conversation_id": event.conversation_id,
                "created_at":      int(event.created_at.timestamp()),
            }
        elif event.type == "llm_message_end":
            return {
                "event":           "message_end",
                "message_id":      event.message_id,
                "conversation_id": event.conversation_id,
                "metadata":        {
                    "usage": {
                        "prompt_tokens":     event.usage.prompt_tokens,
                        "completion_tokens": event.usage.completion_tokens,
                        "total_tokens":      event.usage.total_tokens,
                        "total_price":       str(event.usage.total_price),
                    }
                }
            }
        elif event.type == "error":
            return {"event": "error", "message": event.message, "code": event.code}

SSE 协议的技术细节

Server-Sent Events(SSE)是 HTTP 上的单向推送协议,格式规范:

# SSE 消息格式
event: [事件类型,可选]\n
data: [数据内容]\n
id: [消息ID,可选]\n
retry: [重连间隔毫秒,可选]\n
\n  (空行表示消息结束)

Dify 使用最简单的格式:只有 data: 字段,每条消息独占一行:

data: {"event":"message","answer":"春","message_id":"xxx"}\n\n

SSE 的重连机制:

// 浏览器 EventSource 会自动重连(但 Dify 使用 Fetch,需要手动处理)
class ResilientSSEClient {
  private retryCount = 0;
  private maxRetries = 3;
  private baseDelay  = 1000;

  async *connect(
    url: string,
    body: object,
    headers: Record<string, string>
  ): AsyncGenerator<any> {
    while (this.retryCount <= this.maxRetries) {
      try {
        yield* this._doConnect(url, body, headers);
        this.retryCount = 0;  // 成功完成,重置重试计数
        return;
      } catch (e) {
        if (this.retryCount >= this.maxRetries) throw e;

        const delay = this.baseDelay * Math.pow(2, this.retryCount);
        console.warn(`SSE 连接失败,${delay}ms 后重试(第 ${this.retryCount + 1} 次)`);
        await new Promise(resolve => setTimeout(resolve, delay));
        this.retryCount++;
      }
    }
  }

  private async *_doConnect(url: string, body: object, headers: Record<string, string>) {
    const resp = await fetch(url, {
      method: "POST",
      headers: {"Content-Type": "application/json", ...headers},
      body: JSON.stringify(body),
      signal: AbortSignal.timeout(120_000),  // 2 分钟超时
    });

    if (!resp.ok) throw new Error(`HTTP ${resp.status}`);

    const reader  = resp.body!.getReader();
    const decoder = new TextDecoder();
    let   buffer  = "";

    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop() || "";

        for (const line of lines) {
          if (!line.startsWith("data: ")) continue;
          const data = line.slice(6).trim();
          if (data === "[DONE]") return;
          try { yield JSON.parse(data); } catch {}
        }
      }
    } finally {
      reader.releaseLock();
    }
  }
}

WebSocket 实现方案

虽然 Dify 的官方 API 主要基于 HTTP/SSE,但对于需要双向实时通信的场景(如实时协作、多用户聊天室),可以在 Dify API 之上构建 WebSocket 层:

# 基于 FastAPI 的 WebSocket 代理层
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import httpx
import json
import asyncio

app    = FastAPI()
client = httpx.AsyncClient(timeout=120.0)

@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str):
    """
    WebSocket 代理:将客户端 WS 消息转发给 Dify,
    并将 Dify 的 SSE 流式响应推送回客户端
    """
    await websocket.accept()
    conversation_id = ""

    try:
        while True:
            # 接收客户端消息
            raw = await websocket.receive_text()
            msg = json.loads(raw)

            query = msg.get("query", "")
            if not query:
                await websocket.send_json({"type": "error", "message": "empty query"})
                continue

            # 通知客户端开始流式输出
            await websocket.send_json({"type": "start", "query": query})

            # 调用 Dify API(流式模式)
            full_answer = ""
            async with client.stream(
                "POST", f"{DIFY_API_URL}/chat-messages",
                headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
                json={
                    "query":           query,
                    "user":            user_id,
                    "conversation_id": conversation_id,
                    "response_mode":   "streaming",
                    "inputs":          msg.get("inputs", {}),
                }
            ) as resp:
                if resp.status_code != 200:
                    await websocket.send_json({
                        "type": "error",
                        "message": f"Dify API error: {resp.status_code}"
                    })
                    continue

                async for line in resp.aiter_lines():
                    if not line.startswith("data: "):
                        continue
                    data_str = line[6:].strip()
                    if data_str == "[DONE]":
                        break
                    try:
                        event = json.loads(data_str)
                    except json.JSONDecodeError:
                        continue

                    if event.get("event") == "message":
                        chunk = event.get("answer", "")
                        full_answer += chunk
                        # 实时推送给客户端
                        await websocket.send_json({
                            "type":    "chunk",
                            "content": chunk
                        })
                    elif event.get("event") == "message_end":
                        conversation_id = event.get("conversation_id", conversation_id)

            # 发送完成信号
            await websocket.send_json({
                "type":            "done",
                "full_answer":     full_answer,
                "conversation_id": conversation_id,
            })

    except WebSocketDisconnect:
        print(f"WebSocket 断开:user_id={user_id}")
    except Exception as e:
        try:
            await websocket.send_json({"type": "error", "message": str(e)})
        except Exception:
            pass

# 前端 JavaScript WebSocket 客户端
// 前端 WebSocket 客户端
class DifyWebSocketClient {
  private ws: WebSocket | null = null;
  private reconnectAttempts = 0;
  private maxReconnects     = 5;

  connect(userId: string, onChunk: (text: string) => void) {
    const wsUrl = `wss://your-server.com/ws/chat/${userId}`;
    this.ws     = new WebSocket(wsUrl);

    this.ws.onopen = () => {
      console.log("WebSocket 已连接");
      this.reconnectAttempts = 0;
    };

    this.ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      switch (data.type) {
        case "start": console.log("开始生成..."); break;
        case "chunk": onChunk(data.content); break;
        case "done":  console.log("生成完成", data.conversation_id); break;
        case "error": console.error("错误:", data.message); break;
      }
    };

    this.ws.onclose = () => {
      if (this.reconnectAttempts < this.maxReconnects) {
        const delay = 1000 * Math.pow(2, this.reconnectAttempts);
        setTimeout(() => {
          this.reconnectAttempts++;
          this.connect(userId, onChunk);
        }, delay);
      }
    };
  }

  sendMessage(query: string, inputs: Record<string, any> = {}) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({ query, inputs }));
    }
  }

  disconnect() {
    this.ws?.close();
  }
}

Level 4:生产陷阱与决策(专家视角)

陷阱一:SSE 连接被 Nginx 缓冲截断

在生产环境中,Nginx 默认会缓冲上游响应,导致 SSE 流式响应被缓冲后批量发送,破坏流式效果:

# ❌ 问题配置:默认 Nginx 会缓冲响应
location /v1/ {
    proxy_pass http://dify_api;
}

# ✅ 正确配置:禁用缓冲,支持 SSE
location /v1/ {
    proxy_pass              http://dify_api;
    proxy_buffering         off;           # 关闭代理缓冲
    proxy_cache             off;           # 关闭缓存
    proxy_read_timeout      300s;          # 增加超时(流式响应可能很长)
    proxy_send_timeout      300s;
    proxy_http_version      1.1;
    proxy_set_header        Connection "";  # 保持长连接
    proxy_set_header        X-Accel-Buffering no;  # 通知 Nginx 不缓冲

    # SSE 必须的 Header
    add_header              Cache-Control  "no-cache";
    add_header              X-Accel-Buffering "no";
    add_header              Content-Type   "text/event-stream";
}

陷阱二:API Key 的安全管理

前端直接使用 API Key 是极其危险的:

❌ 危险:在前端代码中暴露 API Key
const resp = await fetch("https://api.dify.ai/v1/chat-messages", {
    headers: {"Authorization": "Bearer app-your_secret_key"}  // 任何人都能看到!
});

✅ 正确:API Key 只存储在后端,前端通过自己的后端中转

后端 API Key 代理层:

# FastAPI 后端:用自己的 JWT 认证用户,用 Dify API Key 调用 Dify
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

app    = FastAPI()
bearer = HTTPBearer()

def verify_user_token(creds: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
    """验证用户的 JWT Token(不是 Dify API Key)"""
    try:
        payload = jwt.decode(creds.credentials, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="无效的用户凭证")

@app.post("/api/chat")
async def chat_proxy(
    request: ChatRequest,
    user:    dict = Depends(verify_user_token)
):
    """
    代理层:
    - 用用户自己的 JWT 认证用户
    - 用 Dify API Key(存储在环境变量中)调用 Dify
    - API Key 永远不暴露给前端
    """
    user_id   = user["sub"]
    dify_key  = os.getenv("DIFY_API_KEY")  # API Key 安全地存储在环境变量

    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "https://api.dify.ai/v1/chat-messages",
            headers={"Authorization": f"Bearer {dify_key}"},
            json={
                "query": request.query,
                "user":  user_id,
                "response_mode": "blocking",
                "inputs": {},
            }
        )
    return resp.json()

陷阱三:流式响应的超时设置

# 不同场景的超时配置

# ❌ 错误:使用短超时,流式响应会被提前切断
httpx.post(url, timeout=10)  # 10秒对于长文本生成不够

# ✅ 正确:区分连接超时和读取超时
TIMEOUTS = {
    "blocking_simple":  httpx.Timeout(connect=5, read=30, write=10, pool=5),
    "blocking_complex": httpx.Timeout(connect=5, read=120, write=10, pool=5),
    "streaming":        httpx.Timeout(connect=5, read=None, write=10, pool=5),
    # 流式:read=None 表示无读取超时(因为流式响应可能很长)
}

# ❌ 问题:无限等待可能导致连接泄漏
# ✅ 使用外部超时控制
async def stream_with_total_timeout(query: str, max_total_seconds: float = 120):
    try:
        async with asyncio.timeout(max_total_seconds):  # Python 3.11+
            async for chunk in dify_client.stream(query):
                yield chunk
    except asyncio.TimeoutError:
        raise HTTPException(504, "响应超时,请重试")

API 调用的可靠性配置

# 生产级别的 Dify API 客户端配置
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class ProductionDifyClient:

    def __init__(self, api_key: str, api_url: str):
        # 连接池配置
        limits  = httpx.Limits(max_connections=100, max_keepalive_connections=20)
        self.client = httpx.AsyncClient(
            base_url = api_url,
            headers  = {"Authorization": f"Bearer {api_key}"},
            limits   = limits,
            timeout  = httpx.Timeout(connect=5.0, read=None, write=10.0, pool=5.0),
            http2    = True,  # 启用 HTTP/2 提高并发性能
        )

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
    )
    async def send_blocking(self, query: str, user: str, **kwargs) -> dict:
        resp = await self.client.post(
            "/chat-messages",
            json={"query": query, "user": user, "response_mode": "blocking",
                  "inputs": kwargs.get("inputs", {})}
        )
        if resp.status_code == 429:  # Rate limit
            retry_after = int(resp.headers.get("Retry-After", 5))
            await asyncio.sleep(retry_after)
            raise httpx.HTTPStatusError("Rate limited", request=resp.request, response=resp)
        resp.raise_for_status()
        return resp.json()

    async def close(self):
        await self.client.aclose()

本章小结

本章系统讲解了 Dify API 三种调用模式的完整实现:

REST API(阻塞模式):

SSE 流式 API:

WebSocket 层:

生产必须项:

  1. API Key 永远不出现在前端代码(使用后端代理)
  2. 实施多层限流(用户级 RPM + 全局 RPM)
  3. 流式连接设置最大总时长(防止连接泄漏)
  4. 对网络错误实施自动重试(最多 3 次,指数退避)

下一章预告: 第 18 章将深入飞书、企业微信和钉钉的深度集成,包括消息路由、机器人配置和多平台统一管理。

本章评分
4.9  / 5  (13 评分)

💬 留言讨论