API Integration Guide: REST, Streaming and WebSocket Implementation
Chapter 17: Complete API Integration Guide — REST / Streaming / WebSocket Full Implementation
Master all three Dify API calling modes: blocking REST for simple integrations, SSE streaming for great UX, and WebSocket for complex bidirectional interactions — with complete code and production considerations for each.
Chapter Overview
Dify provides a complete API ecosystem that lets you embed AI capabilities into any system — whether an internal ERP, an external SaaS product, or a mobile app. But each of the three API modes has a distinct use case; choosing the wrong one leads to poor user experience, wasted server resources, or needless implementation complexity.
This chapter covers, layer by layer:
- REST API: Synchronous blocking calls — ideal for simple integrations and batch processing
- SSE Streaming API: Server-Sent Events for character-by-character output effects
- WebSocket API: Bidirectional real-time communication for complex multi-turn conversations
After reading this chapter, you will be able to:
- Correctly authenticate and call all Dify API endpoints
- Implement the "typewriter" streaming effect in your frontend
- Handle connection drops, reconnection, and error scenarios
- Correctly manage API connections and resources in production
Level 1: Foundational Knowledge (1–3 Years Experience)
Dify API Overview
Dify's APIs fall into two categories:
1. Application API
- Calls AI applications you create in Dify
- Each application has its own API Key
- Used for business system integration
2. Service API (Management API)
- Manages Dify platform resources (apps, knowledge bases, etc.)
- Authenticated with an admin token
- Used for CI/CD and platform administration
Obtaining an API Key
- Go to your Dify app → Access API
- Click "Create API Key"
- Copy the generated key (format:
app-xxxxxxxxxxxxxxxxxxxxxxxx)
# Test whether your API Key is valid
curl -X GET 'https://api.dify.ai/v1/info' \
-H 'Authorization: Bearer app-your_api_key_here'
REST API: The Simplest Calling Method
Send a single chat message (blocking mode):
curl -X POST 'https://api.dify.ai/v1/chat-messages' \
-H 'Authorization: Bearer app-your_api_key_here' \
-H 'Content-Type: application/json' \
-d '{
"inputs": {},
"query": "Hello, introduce Dify to me.",
"response_mode": "blocking",
"conversation_id": "",
"user": "user-001"
}'
Response example:
{
"event": "message",
"message_id": "msg-abc123",
"conversation_id": "conv-xyz456",
"answer": "Dify is an open-source LLMOps platform that helps you build and deploy AI applications rapidly...",
"created_at": 1710000000,
"metadata": {
"usage": {
"prompt_tokens": 150,
"completion_tokens": 200,
"total_tokens": 350,
"total_price": "0.0000105"
}
}
}
Streaming API: Typewriter Effect
Waiting for a full response can take several seconds — poor UX. The streaming API returns output as it's generated, so users see text appearing in real time:
curl -X POST 'https://api.dify.ai/v1/chat-messages' \
-H 'Authorization: Bearer app-your_api_key_here' \
-H 'Content-Type: application/json' \
-d '{
"inputs": {},
"query": "Write a haiku about spring.",
"response_mode": "streaming",
"user": "user-001"
}'
Streaming response (SSE format):
data: {"event": "message", "answer": "Over", "message_id": "msg-001", ...}
data: {"event": "message", "answer": " the", "message_id": "msg-001", ...}
data: {"event": "message", "answer": " fields", "message_id": "msg-001", ...}
data: {"event": "message_end", "metadata": {...}}
data: [DONE]
Each data: line is a self-contained JSON event that the frontend can display incrementally.
Python Client Basics
import httpx, json
DIFY_API_URL = "https://api.dify.ai/v1"
API_KEY = "app-your_api_key_here"
def chat_blocking(query: str, user_id: str = "default") -> str:
resp = httpx.post(
f"{DIFY_API_URL}/chat-messages",
headers={"Authorization": f"Bearer {API_KEY}"},
json={"query": query, "response_mode": "blocking",
"user": user_id, "inputs": {}},
timeout=60.0
)
resp.raise_for_status()
return resp.json()["answer"]
def chat_streaming(query: str, user_id: str = "default"):
with httpx.stream(
"POST", f"{DIFY_API_URL}/chat-messages",
headers={"Authorization": f"Bearer {API_KEY}"},
json={"query": query, "response_mode": "streaming",
"user": user_id, "inputs": {}},
timeout=120.0
) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if not line.startswith("data: "):
continue
data_str = line[6:]
if data_str == "[DONE]":
break
event = json.loads(data_str)
if event.get("event") == "message":
yield event.get("answer", "")
# Usage
print("=== Blocking ===")
print(chat_blocking("Introduce Dify to me."))
print("\n=== Streaming ===")
for chunk in chat_streaming("Write a haiku about spring."):
print(chunk, end="", flush=True)
print()
Level 2: Mechanism Deep Dive (3–5 Years Experience)
Complete Dify API Endpoint Reference
Chat application (Chatbot):
POST /v1/chat-messages # Send message
GET /v1/conversations # List conversations
GET /v1/conversations/{id}/messages # Message history
DELETE /v1/conversations/{id} # Delete conversation
POST /v1/messages/{id}/feedbacks # User feedback (like/dislike)
GET /v1/suggested # Suggested follow-up questions
POST /v1/audio-to-text # Speech to text
Text generation (Completion):
POST /v1/completion-messages # Generate text (blocking or streaming)
Workflow application:
POST /v1/workflows/run # Execute workflow
GET /v1/workflows/run/{id} # Query execution status
GET /v1/workflows/run/{id}/logs # Execution logs
File upload:
POST /v1/files/upload # Upload file (image, document, etc.)
Full Multi-Turn Conversation Client
import httpx, json
from typing import Optional, Generator, AsyncGenerator
class DifyClient:
def __init__(self, api_url: str, api_key: str):
self.base_url = api_url.rstrip("/")
self.headers = {"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"}
self._client = httpx.Client(timeout=120.0)
self._async_client = httpx.AsyncClient(timeout=120.0)
# ── Synchronous ──────────────────────────────────────────────────────
def send_message(
self, query: str, user: str,
conversation_id: Optional[str] = None,
inputs: dict = None, files: list = None
) -> dict:
payload = {"query": query, "user": user, "response_mode": "blocking",
"inputs": inputs or {}, "conversation_id": conversation_id or ""}
if files:
payload["files"] = files
resp = self._client.post(f"{self.base_url}/chat-messages",
headers=self.headers, json=payload)
resp.raise_for_status()
return resp.json()
def send_message_stream(
self, query: str, user: str,
conversation_id: Optional[str] = None, inputs: dict = None
) -> Generator[dict, None, None]:
payload = {"query": query, "user": user, "response_mode": "streaming",
"inputs": inputs or {}, "conversation_id": conversation_id or ""}
with self._client.stream("POST", f"{self.base_url}/chat-messages",
headers=self.headers, json=payload) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
if not line or not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
return
try:
yield json.loads(data_str)
except json.JSONDecodeError:
continue
def get_history(self, conversation_id: str, user: str, limit: int = 20) -> dict:
resp = self._client.get(
f"{self.base_url}/conversations/{conversation_id}/messages",
headers=self.headers, params={"user": user, "limit": limit}
)
resp.raise_for_status()
return resp.json()
def upload_file(self, file_path: str, user: str) -> dict:
with open(file_path, "rb") as f:
resp = self._client.post(
f"{self.base_url}/files/upload",
headers={"Authorization": self.headers["Authorization"]},
files={"file": f}, data={"user": user}
)
resp.raise_for_status()
return resp.json()
def submit_feedback(self, message_id: str, rating: str, user: str,
content: Optional[str] = None) -> dict:
resp = self._client.post(
f"{self.base_url}/messages/{message_id}/feedbacks",
headers=self.headers,
json={"rating": rating, "user": user, "content": content}
)
resp.raise_for_status()
return resp.json()
# ── Asynchronous ─────────────────────────────────────────────────────
async def send_message_async(self, query: str, user: str, **kwargs) -> dict:
payload = {"query": query, "user": user, "response_mode": "blocking",
"inputs": kwargs.get("inputs", {}),
"conversation_id": kwargs.get("conversation_id", "")}
resp = await self._async_client.post(
f"{self.base_url}/chat-messages", headers=self.headers, json=payload
)
resp.raise_for_status()
return resp.json()
async def send_message_stream_async(
self, query: str, user: str, **kwargs
) -> AsyncGenerator[dict, None]:
payload = {"query": query, "user": user, "response_mode": "streaming",
"inputs": kwargs.get("inputs", {}),
"conversation_id": kwargs.get("conversation_id", "")}
async with self._async_client.stream(
"POST", f"{self.base_url}/chat-messages",
headers=self.headers, json=payload
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line or not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
return
try:
yield json.loads(data_str)
except json.JSONDecodeError:
continue
async def close(self):
self._client.close()
await self._async_client.aclose()
Frontend Streaming Integration (TypeScript)
async function* streamDifyChat(
query: string, userId: string, conversationId: string = "",
apiKey: string, apiUrl: string = "https://api.dify.ai/v1"
): AsyncGenerator<{event: string; answer?: string; conversation_id?: string; metadata?: any}> {
const resp = await fetch(`${apiUrl}/chat-messages`, {
method: "POST",
headers: {"Authorization": `Bearer ${apiKey}`, "Content-Type": "application/json"},
body: JSON.stringify({
query, user: userId, conversation_id: conversationId,
response_mode: "streaming", inputs: {},
}),
});
if (!resp.ok) {
const err = await resp.json();
throw new Error(`API error ${resp.status}: ${err.message}`);
}
const reader = resp.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6).trim();
if (data === "[DONE]") return;
try { yield JSON.parse(data); } catch { /* skip malformed */ }
}
}
} finally {
reader.releaseLock();
}
}
// React hook
function useDifyChat(apiKey: string) {
const [answer, setAnswer] = React.useState("");
const [convId, setConvId] = React.useState("");
const [loading, setLoading] = React.useState(false);
const sendMessage = async (query: string) => {
setLoading(true);
setAnswer("");
let full = "", newConvId = convId;
try {
for await (const event of streamDifyChat(query, "user-001", convId, apiKey)) {
if (event.event === "message") {
full += event.answer || "";
setAnswer(full);
} else if (event.event === "message_end") {
if (event.conversation_id) {
newConvId = event.conversation_id;
setConvId(newConvId);
}
} else if (event.event === "error") {
throw new Error((event as any).message);
}
}
} finally {
setLoading(false);
}
};
return { answer, convId, loading, sendMessage };
}
Workflow API Full Implementation
import asyncio, json
import httpx
class DifyWorkflowClient:
def __init__(self, api_url: str, api_key: str):
self.base_url = api_url.rstrip("/")
self.headers = {"Authorization": f"Bearer {api_key}"}
self.client = httpx.AsyncClient(timeout=30.0)
async def run(self, inputs: dict, user: str, mode: str = "blocking") -> dict:
resp = await self.client.post(
f"{self.base_url}/workflows/run",
headers={**self.headers, "Content-Type": "application/json"},
json={"inputs": inputs, "user": user, "response_mode": mode}
)
resp.raise_for_status()
return resp.json()
async def run_stream(self, inputs: dict, user: str):
async with self.client.stream(
"POST", f"{self.base_url}/workflows/run",
headers={**self.headers, "Content-Type": "application/json"},
json={"inputs": inputs, "user": user, "response_mode": "streaming"}
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
return
try:
yield json.loads(data_str)
except json.JSONDecodeError:
continue
async def wait_for_completion(
self, run_id: str, poll_interval: float = 2.0, timeout: float = 300.0
) -> dict:
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
resp = await self.client.get(
f"{self.base_url}/workflows/run/{run_id}", headers=self.headers
)
resp.raise_for_status()
status = resp.json()
if status["status"] in ["succeeded", "failed", "stopped"]:
return status
await asyncio.sleep(poll_interval)
raise TimeoutError(f"Workflow {run_id} did not complete within {timeout}s")
Level 3: Source Code and Principles (5+ Years Experience)
Dify API Server Routing Architecture
api/controllers/
├── console/ # Console API (admin)
│ ├── app/
│ │ ├── chat.py # Conversation management
│ │ └── conversation.py # Conversation list
│ └── datasets/ # Knowledge base management
└── service_api/ # Service API (app callers)
├── app/
│ ├── chat.py # Chat API
│ ├── completion.py # Text generation API
│ ├── workflow.py # Workflow API
│ └── audio.py # Voice API
└── index.py # Route registration
Server-side streaming implementation:
# api/controllers/service_api/app/chat.py (key logic)
from flask import Response, stream_with_context
import json
class ChatMessageStreamView(Resource):
def post(self):
data = request.get_json()
if data.get("response_mode") == "streaming":
return self._streaming_response(data)
return self._blocking_response(data)
def _streaming_response(self, data) -> Response:
def generate():
try:
response = app_runner.generate(
app_model=g.app_model,
user=self._get_user(data["user"]),
query=data["query"],
stream=True,
)
for event in response:
yield f"data: {json.dumps(self._format_event(event), ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'event':'error','message':str(e)})}\n\n"
yield "data: [DONE]\n\n"
return Response(
stream_with_context(generate()),
content_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable Nginx buffering
"Transfer-Encoding": "chunked",
"Connection": "keep-alive",
}
)
SSE Protocol Technical Details
Server-Sent Events (SSE) is a unidirectional HTTP push protocol:
event: [event type, optional]\n
data: [payload]\n
id: [message ID, optional]\n
retry: [reconnect interval in ms, optional]\n
\n (blank line = end of message)
Dify uses the simplest format — only data:, one message per line:
data: {"event":"message","answer":"Hello","message_id":"xxx"}\n\n
Resilient SSE client with reconnect:
class ResilientSSEClient {
private retryCount = 0;
private maxRetries = 3;
private baseDelay = 1000;
async *connect(url: string, body: object, headers: Record<string, string>) {
while (this.retryCount <= this.maxRetries) {
try {
yield* this._doConnect(url, body, headers);
this.retryCount = 0;
return;
} catch (e) {
if (this.retryCount >= this.maxRetries) throw e;
const delay = this.baseDelay * Math.pow(2, this.retryCount);
console.warn(`SSE connection failed. Retrying in ${delay}ms (attempt ${this.retryCount + 1})`);
await new Promise(r => setTimeout(r, delay));
this.retryCount++;
}
}
}
private async *_doConnect(url: string, body: object, headers: Record<string, string>) {
const resp = await fetch(url, {
method: "POST",
headers: {"Content-Type": "application/json", ...headers},
body: JSON.stringify(body),
signal: AbortSignal.timeout(120_000),
});
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
const reader = resp.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const d = line.slice(6).trim();
if (d === "[DONE]") return;
try { yield JSON.parse(d); } catch {}
}
}
} finally {
reader.releaseLock();
}
}
}
WebSocket Proxy Layer
# FastAPI WebSocket proxy over Dify SSE
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import httpx, json, asyncio
app = FastAPI()
client = httpx.AsyncClient(timeout=120.0)
@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(ws: WebSocket, user_id: str):
await ws.accept()
conversation_id = ""
try:
while True:
raw = await ws.receive_text()
msg = json.loads(raw)
query = msg.get("query", "")
if not query:
await ws.send_json({"type": "error", "message": "empty query"})
continue
await ws.send_json({"type": "start"})
full_answer = ""
async with client.stream(
"POST", f"{DIFY_API_URL}/chat-messages",
headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
json={"query": query, "user": user_id, "conversation_id": conversation_id,
"response_mode": "streaming", "inputs": msg.get("inputs", {})}
) as resp:
if resp.status_code != 200:
await ws.send_json({"type": "error", "message": f"Dify API {resp.status_code}"})
continue
async for line in resp.aiter_lines():
if not line.startswith("data: "):
continue
d = line[6:].strip()
if d == "[DONE]":
break
try:
event = json.loads(d)
except:
continue
if event.get("event") == "message":
chunk = event.get("answer", "")
full_answer += chunk
await ws.send_json({"type": "chunk", "content": chunk})
elif event.get("event") == "message_end":
conversation_id = event.get("conversation_id", conversation_id)
await ws.send_json({"type": "done", "full_answer": full_answer,
"conversation_id": conversation_id})
except WebSocketDisconnect:
print(f"WebSocket disconnected: user_id={user_id}")
except Exception as e:
try:
await ws.send_json({"type": "error", "message": str(e)})
except:
pass
// Frontend WebSocket client with reconnect
class DifyWebSocketClient {
private ws: WebSocket | null = null;
private retries: number = 0;
connect(userId: string, onChunk: (t: string) => void) {
this.ws = new WebSocket(`wss://your-server.com/ws/chat/${userId}`);
this.ws.onmessage = (e) => {
const d = JSON.parse(e.data);
if (d.type === "chunk") onChunk(d.content);
};
this.ws.onclose = () => {
if (this.retries < 5) {
setTimeout(() => { this.retries++; this.connect(userId, onChunk); },
1000 * Math.pow(2, this.retries));
}
};
this.ws.onopen = () => { this.retries = 0; };
}
send(query: string, inputs: Record<string, any> = {}) {
if (this.ws?.readyState === WebSocket.OPEN)
this.ws.send(JSON.stringify({ query, inputs }));
}
}
Level 4: Production Pitfalls and Decision-Making (Expert Perspective)
Pitfall 1: Nginx Buffering Breaking SSE
Nginx buffers upstream responses by default, causing SSE to be delivered in batches, destroying the streaming effect:
# ❌ Default Nginx — buffers responses
location /v1/ {
proxy_pass http://dify_api;
}
# ✅ Correct — disable buffering for SSE
location /v1/ {
proxy_pass http://dify_api;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header X-Accel-Buffering no;
add_header Cache-Control "no-cache";
add_header Content-Type "text/event-stream";
}
Pitfall 2: API Key Security
Exposing your API Key in frontend code is extremely dangerous:
❌ Dangerous: API Key in frontend source
const resp = await fetch("https://api.dify.ai/v1/chat-messages", {
headers: {"Authorization": "Bearer app-your_secret_key"} // Visible to anyone!
});
Backend proxy layer:
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt, os, httpx
app = FastAPI()
bearer = HTTPBearer()
def verify_user(creds: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
try:
return jwt.decode(creds.credentials, os.getenv("JWT_SECRET"), algorithms=["HS256"])
except:
raise HTTPException(401, "Invalid credentials")
@app.post("/api/chat")
async def chat_proxy(request: ChatRequest, user: dict = Depends(verify_user)):
# Dify API Key lives only in the server environment — never sent to the client
dify_key = os.getenv("DIFY_API_KEY")
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.dify.ai/v1/chat-messages",
headers={"Authorization": f"Bearer {dify_key}"},
json={"query": request.query, "user": user["sub"],
"response_mode": "blocking", "inputs": {}}
)
return resp.json()
Pitfall 3: Streaming Timeout Configuration
# ❌ Wrong: short timeout cuts off streaming responses
httpx.post(url, timeout=10)
# ✅ Correct: separate connect timeout from read timeout
TIMEOUTS = {
"blocking_simple": httpx.Timeout(connect=5, read=30, write=10, pool=5),
"blocking_complex": httpx.Timeout(connect=5, read=120, write=10, pool=5),
"streaming": httpx.Timeout(connect=5, read=None, write=10, pool=5),
# read=None for streaming: the response may last arbitrarily long
}
# Use an external total-duration limit instead
async def stream_with_deadline(query: str, max_seconds: float = 120):
async with asyncio.timeout(max_seconds):
async for chunk in dify_client.stream(query):
yield chunk
Production-Grade Client Configuration
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class ProductionDifyClient:
def __init__(self, api_key: str, api_url: str):
limits = httpx.Limits(max_connections=100, max_keepalive_connections=20)
self.client = httpx.AsyncClient(
base_url = api_url,
headers = {"Authorization": f"Bearer {api_key}"},
limits = limits,
timeout = httpx.Timeout(connect=5.0, read=None, write=10.0, pool=5.0),
http2 = True, # HTTP/2 for better concurrency
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
)
async def send_blocking(self, query: str, user: str, **kwargs) -> dict:
resp = await self.client.post(
"/chat-messages",
json={"query": query, "user": user, "response_mode": "blocking",
"inputs": kwargs.get("inputs", {})}
)
if resp.status_code == 429:
await asyncio.sleep(int(resp.headers.get("Retry-After", 5)))
raise httpx.HTTPStatusError("Rate limited", request=resp.request, response=resp)
resp.raise_for_status()
return resp.json()
async def close(self):
await self.client.aclose()
Chapter Summary
This chapter systematically covered the complete implementation of all three Dify API calling modes:
REST API (blocking):
- Best for: simple integrations, batch processing, background tasks
- Key: set appropriate timeouts (simple tasks 30s, complex 120s)
- Main weakness: poor waiting UX for long generations
SSE Streaming API:
- Best for: chat interfaces, long-text generation
- Key: correctly parse
data:prefix and[DONE]terminator - Nginx must disable buffering (
proxy_buffering off) - Set
readtimeout toNone; control total duration externally
WebSocket layer:
- Best for: bidirectional real-time communication, multi-user scenarios
- Typically built as a WS proxy on top of Dify's SSE API
- Must implement reconnect with exponential backoff
Production must-haves:
- API Key must never appear in frontend code — use a backend proxy
- Implement multi-layer rate limiting (per-user RPM + global RPM)
- Set a maximum total duration for streaming connections to prevent leaks
- Automatically retry on network errors (max 3 attempts, exponential backoff)
Key numbers:
- Blocking timeout: simple tasks 30s, complex tasks 120s
- Streaming: connect timeout 5s, read timeout unlimited
- Retry: initial 1s, max 30s, max 3 attempts
- Connection pool: max 100 connections, 20 keepalive connections
Next chapter: Chapter 18 dives deep into integrating Dify with Feishu (Lark), WeChat Work, and DingTalk — including message routing, bot configuration, and unified multi-platform management.