API 集成全指南:REST / 流式 / WebSocket 完整实现
第十七章:API 集成全指南:REST / 流式 / WebSocket 完整实现
从零掌握 Dify API 的三种调用模式:阻塞式 REST 适合简单集成,SSE 流式响应提升用户体验,WebSocket 双向通信实现复杂交互——每种模式都有完整的代码实现和生产注意事项。
本章导读
Dify 提供了一套完整的 API 体系,让你能够把 AI 能力集成到任何系统——无论是企业内部的 ERP、对外的 SaaS 产品,还是移动 App。但三种 API 模式各有适用场景,选错了会导致用户体验差、服务器资源浪费或实现复杂度暴增。
本章按照从入门到专家的层次,系统讲解:
- REST API:同步调用,适合简单集成和批处理
- SSE 流式 API:基于 Server-Sent Events,实现逐字符输出效果
- WebSocket API:双向实时通信,适合复杂的多轮对话场景
读完本章,你将能够:
- 正确认证并调用 Dify 的所有 API 端点
- 实现前端流式输出的"打字机"效果
- 处理连接中断、重连和错误场景
- 在生产环境中正确管理 API 连接和资源
Level 1:基础认知(1-3 年经验)
Dify API 概览
Dify 的 API 分为两类:
1. 应用 API(Application API)
- 用于调用你在 Dify 中创建的 AI 应用
- 每个应用有独立的 API Key
- 适合集成到业务系统
2. 服务 API(Service API)/ 管理 API
- 用于管理 Dify 平台资源(应用、知识库等)
- 使用管理员 Token 认证
- 适合 CI/CD 和平台管理
获取 API Key
在 Dify 控制台获取应用 API Key:
- 进入你的 Dify 应用 → 访问 API
- 点击"创建 API 密钥"
- 复制生成的密钥(格式:
app-xxxxxxxxxxxxxxxxxxxxxxxx)
# 测试 API Key 是否有效
curl -X GET 'https://api.dify.ai/v1/info' \
-H 'Authorization: Bearer app-your_api_key_here'
REST API:最简单的调用方式
对话型应用的基础调用(阻塞模式):
# 发起一次对话
curl -X POST 'https://api.dify.ai/v1/chat-messages' \
-H 'Authorization: Bearer app-your_api_key_here' \
-H 'Content-Type: application/json' \
-d '{
"inputs": {},
"query": "你好,介绍一下 Dify",
"response_mode": "blocking",
"conversation_id": "",
"user": "user-001"
}'
响应示例:
{
"event": "message",
"message_id": "msg-abc123",
"conversation_id": "conv-xyz456",
"answer": "Dify 是一个开源的 LLMOps 平台,帮助你快速构建和部署 AI 应用...",
"created_at": 1710000000,
"metadata": {
"usage": {
"prompt_tokens": 150,
"completion_tokens": 200,
"total_tokens": 350,
"total_price": "0.0000105"
}
}
}
流式 API:实现打字机效果
在实际产品中,等待完整响应通常需要数秒,用户体验很差。流式 API 可以边生成边返回,让用户看到逐渐出现的文字:
# 流式调用(response_mode 改为 streaming)
curl -X POST 'https://api.dify.ai/v1/chat-messages' \
-H 'Authorization: Bearer app-your_api_key_here' \
-H 'Content-Type: application/json' \
-d '{
"inputs": {},
"query": "写一首关于春天的诗",
"response_mode": "streaming",
"user": "user-001"
}'
流式响应(SSE 格式):
data: {"event": "message", "answer": "春", "message_id": "msg-001", ...}
data: {"event": "message", "answer": "风", "message_id": "msg-001", ...}
data: {"event": "message", "answer": "吹", "message_id": "msg-001", ...}
data: {"event": "message_end", "metadata": {...}}
data: [DONE]
每个 data: 行是一个独立的 JSON 事件,前端可以逐步显示。
Python 客户端基础示例
import httpx
import json
DIFY_API_URL = "https://api.dify.ai/v1"
API_KEY = "app-your_api_key_here"
def chat_blocking(query: str, user_id: str = "default") -> str:
"""阻塞式调用(等待完整响应)"""
resp = httpx.post(
f"{DIFY_API_URL}/chat-messages",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"query": query,
"response_mode": "blocking",
"user": user_id,
"inputs": {}
},
timeout=60.0
)
resp.raise_for_status()
data = resp.json()
return data["answer"]
def chat_streaming(query: str, user_id: str = "default"):
"""流式调用(逐步返回内容)"""
with httpx.stream(
"POST",
f"{DIFY_API_URL}/chat-messages",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"query": query,
"response_mode": "streaming",
"user": user_id,
"inputs": {}
},
timeout=120.0
) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
event = json.loads(data_str)
if event.get("event") == "message":
yield event.get("answer", "")
# 使用示例
print("=== 阻塞模式 ===")
answer = chat_blocking("你好,介绍一下 Dify")
print(answer)
print("\n=== 流式模式 ===")
for chunk in chat_streaming("写一首关于春天的诗"):
print(chunk, end="", flush=True)
print()
Level 2:机制深解(3-5 年经验)
Dify API 端点完整参考
对话型应用(Chatbot):
POST /v1/chat-messages # 发送消息
GET /v1/conversations # 获取对话列表
GET /v1/conversations/{id}/messages # 获取消息历史
DELETE /v1/conversations/{id} # 删除对话
POST /v1/messages/{id}/feedbacks # 用户反馈(点赞/踩)
GET /v1/suggested # 获取推荐问题
POST /v1/audio-to-text # 语音转文字
文本生成型应用(Completion):
POST /v1/completion-messages # 文本生成(阻塞)
POST /v1/completion-messages # 文本生成(流式)
工作流型应用(Workflow):
POST /v1/workflows/run # 执行工作流
GET /v1/workflows/run/{id} # 查询执行状态
GET /v1/workflows/run/{id}/logs # 获取执行日志
文件上传:
POST /v1/files/upload # 上传文件(图片、文档等)
完整的多轮对话管理
在多轮对话中,需要维护 conversation_id 来保持上下文:
import httpx
import json
from typing import Optional, Generator
class DifyClient:
"""Dify API 完整客户端"""
def __init__(self, api_url: str, api_key: str):
self.base_url = api_url.rstrip("/")
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
self._client = httpx.Client(timeout=120.0)
self._async_client = httpx.AsyncClient(timeout=120.0)
# ===== 同步 API =====
def send_message(
self,
query: str,
user: str,
conversation_id: Optional[str] = None,
inputs: dict = None,
files: list = None,
) -> dict:
"""发送消息(阻塞模式)"""
payload = {
"query": query,
"user": user,
"response_mode": "blocking",
"inputs": inputs or {},
"conversation_id": conversation_id or "",
}
if files:
payload["files"] = files
resp = self._client.post(
f"{self.base_url}/chat-messages",
headers=self.headers,
json=payload
)
resp.raise_for_status()
return resp.json()
def send_message_stream(
self,
query: str,
user: str,
conversation_id: Optional[str] = None,
inputs: dict = None,
) -> Generator[dict, None, None]:
"""发送消息(流式模式)"""
payload = {
"query": query,
"user": user,
"response_mode": "streaming",
"inputs": inputs or {},
"conversation_id": conversation_id or "",
}
with self._client.stream(
"POST",
f"{self.base_url}/chat-messages",
headers=self.headers,
json=payload
) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if not line or not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
return
try:
event = json.loads(data_str)
yield event
except json.JSONDecodeError:
continue
def get_conversation_history(
self,
conversation_id: str,
user: str,
limit: int = 20,
first_id: Optional[str] = None
) -> dict:
"""获取对话历史"""
params = {"user": user, "limit": limit}
if first_id:
params["first_id"] = first_id
resp = self._client.get(
f"{self.base_url}/conversations/{conversation_id}/messages",
headers=self.headers,
params=params
)
resp.raise_for_status()
return resp.json()
def upload_file(self, file_path: str, user: str) -> dict:
"""上传文件"""
with open(file_path, "rb") as f:
resp = self._client.post(
f"{self.base_url}/files/upload",
headers={"Authorization": self.headers["Authorization"]},
files={"file": f},
data={"user": user}
)
resp.raise_for_status()
return resp.json()
def submit_feedback(
self,
message_id: str,
rating: str, # "like" | "dislike" | None
user: str,
content: Optional[str] = None
) -> dict:
"""提交用户反馈"""
resp = self._client.post(
f"{self.base_url}/messages/{message_id}/feedbacks",
headers=self.headers,
json={"rating": rating, "user": user, "content": content}
)
resp.raise_for_status()
return resp.json()
# ===== 异步 API =====
async def send_message_async(
self, query: str, user: str, **kwargs
) -> dict:
"""异步发送消息(阻塞模式)"""
payload = {
"query": query, "user": user,
"response_mode": "blocking",
"inputs": kwargs.get("inputs", {}),
"conversation_id": kwargs.get("conversation_id", ""),
}
resp = await self._async_client.post(
f"{self.base_url}/chat-messages",
headers=self.headers, json=payload
)
resp.raise_for_status()
return resp.json()
async def send_message_stream_async(
self, query: str, user: str, **kwargs
):
"""异步流式发送消息"""
payload = {
"query": query, "user": user,
"response_mode": "streaming",
"inputs": kwargs.get("inputs", {}),
"conversation_id": kwargs.get("conversation_id", ""),
}
async with self._async_client.stream(
"POST", f"{self.base_url}/chat-messages",
headers=self.headers, json=payload
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line or not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
return
try:
yield json.loads(data_str)
except json.JSONDecodeError:
continue
def close(self):
self._client.close()
import asyncio
asyncio.create_task(self._async_client.aclose())
前端流式集成(JavaScript/TypeScript)
在前端实现流式输出有两种方式:EventSource API(标准 SSE)和 Fetch API + ReadableStream:
// 方式一:使用 EventSource(限制:不支持自定义 Header,无法发送 Body)
// 不推荐用于 Dify(因为需要 POST 请求和 Bearer Token)
// 方式二:使用 Fetch + ReadableStream(推荐)
async function* streamDifyChatMessage(
query: string,
userId: string,
conversationId: string = "",
apiKey: string,
apiUrl: string = "https://api.dify.ai/v1"
): AsyncGenerator<{event: string; answer?: string; metadata?: any}> {
const resp = await fetch(`${apiUrl}/chat-messages`, {
method: "POST",
headers: {
"Authorization": `Bearer ${apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
query,
user: userId,
conversation_id: conversationId,
response_mode: "streaming",
inputs: {},
}),
});
if (!resp.ok) {
const err = await resp.json();
throw new Error(`API error ${resp.status}: ${err.message}`);
}
const reader = resp.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // 最后一行可能不完整,保留到下次
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const dataStr = line.slice(6).trim();
if (dataStr === "[DONE]") return;
try {
yield JSON.parse(dataStr);
} catch { /* 忽略非 JSON 行 */ }
}
}
}
// React 组件中的使用示例
const ChatComponent: React.FC = () => {
const [answer, setAnswer] = useState("");
const [isLoading, setIsLoading] = useState(false);
const [convId, setConvId] = useState("");
const sendMessage = async (query: string) => {
setIsLoading(true);
setAnswer("");
let fullAnswer = "";
let newConvId = convId;
try {
for await (const event of streamDifyChatMessage(
query, "user-001", convId, API_KEY
)) {
if (event.event === "message") {
fullAnswer += event.answer || "";
setAnswer(fullAnswer); // 实时更新 UI
} else if (event.event === "message_end") {
if (event.conversation_id) {
newConvId = event.conversation_id;
setConvId(newConvId);
}
} else if (event.event === "error") {
throw new Error(event.message);
}
}
} finally {
setIsLoading(false);
}
};
return (
<div>
<div className="answer">{answer}</div>
{isLoading && <div className="typing-indicator">AI 正在思考...</div>}
</div>
);
};
Workflow API 的完整调用
对于工作流型应用,需要处理异步执行和状态查询:
import asyncio
import httpx
from typing import Optional
class DifyWorkflowClient:
"""Dify 工作流 API 客户端"""
def __init__(self, api_url: str, api_key: str):
self.base_url = api_url.rstrip("/")
self.headers = {"Authorization": f"Bearer {api_key}"}
self.client = httpx.AsyncClient(timeout=30.0)
async def run_workflow(
self,
inputs: dict,
user: str,
mode: str = "blocking" # "blocking" | "streaming"
) -> dict:
"""执行工作流"""
resp = await self.client.post(
f"{self.base_url}/workflows/run",
headers={**self.headers, "Content-Type": "application/json"},
json={"inputs": inputs, "user": user, "response_mode": mode}
)
resp.raise_for_status()
if mode == "blocking":
return resp.json()
else:
# 流式模式,收集所有事件
return await self._collect_stream_events(resp)
async def run_workflow_stream(self, inputs: dict, user: str):
"""流式执行工作流,逐步产出事件"""
async with self.client.stream(
"POST",
f"{self.base_url}/workflows/run",
headers={**self.headers, "Content-Type": "application/json"},
json={"inputs": inputs, "user": user, "response_mode": "streaming"}
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
return
try:
event = json.loads(data_str)
yield event
except json.JSONDecodeError:
continue
async def get_workflow_run(self, run_id: str) -> dict:
"""查询工作流执行状态"""
resp = await self.client.get(
f"{self.base_url}/workflows/run/{run_id}",
headers=self.headers
)
resp.raise_for_status()
return resp.json()
async def wait_for_completion(
self,
run_id: str,
poll_interval: float = 2.0,
timeout: float = 300.0
) -> dict:
"""轮询等待工作流完成"""
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
status = await self.get_workflow_run(run_id)
if status["status"] in ["succeeded", "failed", "stopped"]:
return status
await asyncio.sleep(poll_interval)
raise TimeoutError(f"工作流 {run_id} 在 {timeout}s 内未完成")
# 工作流 SSE 事件类型
WORKFLOW_EVENTS = {
"workflow_started": "工作流开始执行",
"node_started": "节点开始执行",
"node_finished": "节点执行完成",
"workflow_finished": "工作流执行完成",
"text_chunk": "文本块(流式输出)",
"error": "执行错误",
}
Level 3:源码与原理(5 年以上)
Dify API 服务器的路由架构
Dify 的 API 服务基于 Flask,路由定义在 api/controllers/ 目录:
api/controllers/
├── console/ # 控制台 API(管理用)
│ ├── app/
│ │ ├── chat.py # 对话管理
│ │ └── conversation.py # 对话列表
│ └── datasets/ # 知识库管理
└── service_api/ # 服务 API(应用调用用)
├── app/
│ ├── chat.py # 聊天 API
│ ├── completion.py # 文本生成 API
│ ├── workflow.py # 工作流 API
│ └── audio.py # 语音 API
└── index.py # 路由注册
流式响应的服务端实现:
# api/controllers/service_api/app/chat.py(关键逻辑)
from flask import Response, stream_with_context
import json
class ChatMessageStreamView(Resource):
def post(self):
"""处理流式聊天消息请求"""
data = request.get_json()
app_model = g.app_model # 从认证中间件获取应用
user = self._get_or_create_end_user(data["user"])
if data.get("response_mode") == "streaming":
return self._streaming_response(data, app_model, user)
else:
return self._blocking_response(data, app_model, user)
def _streaming_response(self, data, app_model, user) -> Response:
"""生成 SSE 流式响应"""
def generate():
try:
# 调用 AppRunner 获取事件流
response = app_runner.generate(
app_model=app_model,
user=user,
query=data["query"],
stream=True,
)
for event in response:
# 将每个事件格式化为 SSE 格式
event_data = self._format_event(event)
yield f"data: {json.dumps(event_data, ensure_ascii=False)}\n\n"
# 发送结束信号
yield "data: [DONE]\n\n"
except Exception as e:
error_event = {"event": "error", "code": 500, "message": str(e)}
yield f"data: {json.dumps(error_event)}\n\n"
yield "data: [DONE]\n\n"
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
"Transfer-Encoding": "chunked",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
}
)
def _format_event(self, event) -> dict:
"""将内部 Event 对象格式化为 API 响应格式"""
if event.type == "llm_message_chunk":
return {
"event": "message",
"answer": event.chunk.delta.content,
"message_id": event.message_id,
"conversation_id": event.conversation_id,
"created_at": int(event.created_at.timestamp()),
}
elif event.type == "llm_message_end":
return {
"event": "message_end",
"message_id": event.message_id,
"conversation_id": event.conversation_id,
"metadata": {
"usage": {
"prompt_tokens": event.usage.prompt_tokens,
"completion_tokens": event.usage.completion_tokens,
"total_tokens": event.usage.total_tokens,
"total_price": str(event.usage.total_price),
}
}
}
elif event.type == "error":
return {"event": "error", "message": event.message, "code": event.code}
SSE 协议的技术细节
Server-Sent Events(SSE)是 HTTP 上的单向推送协议,格式规范:
# SSE 消息格式
event: [事件类型,可选]\n
data: [数据内容]\n
id: [消息ID,可选]\n
retry: [重连间隔毫秒,可选]\n
\n (空行表示消息结束)
Dify 使用最简单的格式:只有 data: 字段,每条消息独占一行:
data: {"event":"message","answer":"春","message_id":"xxx"}\n\n
SSE 的重连机制:
// 浏览器 EventSource 会自动重连(但 Dify 使用 Fetch,需要手动处理)
class ResilientSSEClient {
private retryCount = 0;
private maxRetries = 3;
private baseDelay = 1000;
async *connect(
url: string,
body: object,
headers: Record<string, string>
): AsyncGenerator<any> {
while (this.retryCount <= this.maxRetries) {
try {
yield* this._doConnect(url, body, headers);
this.retryCount = 0; // 成功完成,重置重试计数
return;
} catch (e) {
if (this.retryCount >= this.maxRetries) throw e;
const delay = this.baseDelay * Math.pow(2, this.retryCount);
console.warn(`SSE 连接失败,${delay}ms 后重试(第 ${this.retryCount + 1} 次)`);
await new Promise(resolve => setTimeout(resolve, delay));
this.retryCount++;
}
}
}
private async *_doConnect(url: string, body: object, headers: Record<string, string>) {
const resp = await fetch(url, {
method: "POST",
headers: {"Content-Type": "application/json", ...headers},
body: JSON.stringify(body),
signal: AbortSignal.timeout(120_000), // 2 分钟超时
});
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
const reader = resp.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") return;
try { yield JSON.parse(data); } catch {}
}
}
} finally {
reader.releaseLock();
}
}
}
WebSocket 实现方案
虽然 Dify 的官方 API 主要基于 HTTP/SSE,但对于需要双向实时通信的场景(如实时协作、多用户聊天室),可以在 Dify API 之上构建 WebSocket 层:
# 基于 FastAPI 的 WebSocket 代理层
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import httpx
import json
import asyncio
app = FastAPI()
client = httpx.AsyncClient(timeout=120.0)
@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str):
"""
WebSocket 代理:将客户端 WS 消息转发给 Dify,
并将 Dify 的 SSE 流式响应推送回客户端
"""
await websocket.accept()
conversation_id = ""
try:
while True:
# 接收客户端消息
raw = await websocket.receive_text()
msg = json.loads(raw)
query = msg.get("query", "")
if not query:
await websocket.send_json({"type": "error", "message": "empty query"})
continue
# 通知客户端开始流式输出
await websocket.send_json({"type": "start", "query": query})
# 调用 Dify API(流式模式)
full_answer = ""
async with client.stream(
"POST", f"{DIFY_API_URL}/chat-messages",
headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
json={
"query": query,
"user": user_id,
"conversation_id": conversation_id,
"response_mode": "streaming",
"inputs": msg.get("inputs", {}),
}
) as resp:
if resp.status_code != 200:
await websocket.send_json({
"type": "error",
"message": f"Dify API error: {resp.status_code}"
})
continue
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
break
try:
event = json.loads(data_str)
except json.JSONDecodeError:
continue
if event.get("event") == "message":
chunk = event.get("answer", "")
full_answer += chunk
# 实时推送给客户端
await websocket.send_json({
"type": "chunk",
"content": chunk
})
elif event.get("event") == "message_end":
conversation_id = event.get("conversation_id", conversation_id)
# 发送完成信号
await websocket.send_json({
"type": "done",
"full_answer": full_answer,
"conversation_id": conversation_id,
})
except WebSocketDisconnect:
print(f"WebSocket 断开:user_id={user_id}")
except Exception as e:
try:
await websocket.send_json({"type": "error", "message": str(e)})
except Exception:
pass
# 前端 JavaScript WebSocket 客户端
// 前端 WebSocket 客户端
class DifyWebSocketClient {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnects = 5;
connect(userId: string, onChunk: (text: string) => void) {
const wsUrl = `wss://your-server.com/ws/chat/${userId}`;
this.ws = new WebSocket(wsUrl);
this.ws.onopen = () => {
console.log("WebSocket 已连接");
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "start": console.log("开始生成..."); break;
case "chunk": onChunk(data.content); break;
case "done": console.log("生成完成", data.conversation_id); break;
case "error": console.error("错误:", data.message); break;
}
};
this.ws.onclose = () => {
if (this.reconnectAttempts < this.maxReconnects) {
const delay = 1000 * Math.pow(2, this.reconnectAttempts);
setTimeout(() => {
this.reconnectAttempts++;
this.connect(userId, onChunk);
}, delay);
}
};
}
sendMessage(query: string, inputs: Record<string, any> = {}) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ query, inputs }));
}
}
disconnect() {
this.ws?.close();
}
}
Level 4:生产陷阱与决策(专家视角)
陷阱一:SSE 连接被 Nginx 缓冲截断
在生产环境中,Nginx 默认会缓冲上游响应,导致 SSE 流式响应被缓冲后批量发送,破坏流式效果:
# ❌ 问题配置:默认 Nginx 会缓冲响应
location /v1/ {
proxy_pass http://dify_api;
}
# ✅ 正确配置:禁用缓冲,支持 SSE
location /v1/ {
proxy_pass http://dify_api;
proxy_buffering off; # 关闭代理缓冲
proxy_cache off; # 关闭缓存
proxy_read_timeout 300s; # 增加超时(流式响应可能很长)
proxy_send_timeout 300s;
proxy_http_version 1.1;
proxy_set_header Connection ""; # 保持长连接
proxy_set_header X-Accel-Buffering no; # 通知 Nginx 不缓冲
# SSE 必须的 Header
add_header Cache-Control "no-cache";
add_header X-Accel-Buffering "no";
add_header Content-Type "text/event-stream";
}
陷阱二:API Key 的安全管理
前端直接使用 API Key 是极其危险的:
❌ 危险:在前端代码中暴露 API Key
const resp = await fetch("https://api.dify.ai/v1/chat-messages", {
headers: {"Authorization": "Bearer app-your_secret_key"} // 任何人都能看到!
});
✅ 正确:API Key 只存储在后端,前端通过自己的后端中转
后端 API Key 代理层:
# FastAPI 后端:用自己的 JWT 认证用户,用 Dify API Key 调用 Dify
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
app = FastAPI()
bearer = HTTPBearer()
def verify_user_token(creds: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
"""验证用户的 JWT Token(不是 Dify API Key)"""
try:
payload = jwt.decode(creds.credentials, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="无效的用户凭证")
@app.post("/api/chat")
async def chat_proxy(
request: ChatRequest,
user: dict = Depends(verify_user_token)
):
"""
代理层:
- 用用户自己的 JWT 认证用户
- 用 Dify API Key(存储在环境变量中)调用 Dify
- API Key 永远不暴露给前端
"""
user_id = user["sub"]
dify_key = os.getenv("DIFY_API_KEY") # API Key 安全地存储在环境变量
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.dify.ai/v1/chat-messages",
headers={"Authorization": f"Bearer {dify_key}"},
json={
"query": request.query,
"user": user_id,
"response_mode": "blocking",
"inputs": {},
}
)
return resp.json()
陷阱三:流式响应的超时设置
# 不同场景的超时配置
# ❌ 错误:使用短超时,流式响应会被提前切断
httpx.post(url, timeout=10) # 10秒对于长文本生成不够
# ✅ 正确:区分连接超时和读取超时
TIMEOUTS = {
"blocking_simple": httpx.Timeout(connect=5, read=30, write=10, pool=5),
"blocking_complex": httpx.Timeout(connect=5, read=120, write=10, pool=5),
"streaming": httpx.Timeout(connect=5, read=None, write=10, pool=5),
# 流式:read=None 表示无读取超时(因为流式响应可能很长)
}
# ❌ 问题:无限等待可能导致连接泄漏
# ✅ 使用外部超时控制
async def stream_with_total_timeout(query: str, max_total_seconds: float = 120):
try:
async with asyncio.timeout(max_total_seconds): # Python 3.11+
async for chunk in dify_client.stream(query):
yield chunk
except asyncio.TimeoutError:
raise HTTPException(504, "响应超时,请重试")
API 调用的可靠性配置
# 生产级别的 Dify API 客户端配置
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class ProductionDifyClient:
def __init__(self, api_key: str, api_url: str):
# 连接池配置
limits = httpx.Limits(max_connections=100, max_keepalive_connections=20)
self.client = httpx.AsyncClient(
base_url = api_url,
headers = {"Authorization": f"Bearer {api_key}"},
limits = limits,
timeout = httpx.Timeout(connect=5.0, read=None, write=10.0, pool=5.0),
http2 = True, # 启用 HTTP/2 提高并发性能
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
)
async def send_blocking(self, query: str, user: str, **kwargs) -> dict:
resp = await self.client.post(
"/chat-messages",
json={"query": query, "user": user, "response_mode": "blocking",
"inputs": kwargs.get("inputs", {})}
)
if resp.status_code == 429: # Rate limit
retry_after = int(resp.headers.get("Retry-After", 5))
await asyncio.sleep(retry_after)
raise httpx.HTTPStatusError("Rate limited", request=resp.request, response=resp)
resp.raise_for_status()
return resp.json()
async def close(self):
await self.client.aclose()
本章小结
本章系统讲解了 Dify API 三种调用模式的完整实现:
REST API(阻塞模式):
- 适用:简单集成、批处理、后台任务
- 关键:设置合理超时(简单任务 30s,复杂任务 120s)
- 最大问题:用户等待体验差
SSE 流式 API:
- 适用:对话界面、长文本生成
- 关键:正确处理
data:前缀和[DONE]结束信号 - Nginx 必须禁用缓冲(
proxy_buffering off) - 读取超时设为
None(无限制),用外部总超时控制
WebSocket 层:
- 适用:双向实时通信、多用户场景
- 通常在 Dify SSE API 之上自建 WS 代理层
- 必须处理重连(指数退避)和连接管理
生产必须项:
- API Key 永远不出现在前端代码(使用后端代理)
- 实施多层限流(用户级 RPM + 全局 RPM)
- 流式连接设置最大总时长(防止连接泄漏)
- 对网络错误实施自动重试(最多 3 次,指数退避)
下一章预告: 第 18 章将深入飞书、企业微信和钉钉的深度集成,包括消息路由、机器人配置和多平台统一管理。