Chapter 17

API Integration Guide: REST, Streaming and WebSocket Implementation

Chapter 17: Complete API Integration Guide — REST / Streaming / WebSocket Full Implementation

Master all three Dify API calling modes: blocking REST for simple integrations, SSE streaming for great UX, and WebSocket for complex bidirectional interactions — with complete code and production considerations for each.

Chapter Overview

Dify provides a complete API ecosystem that lets you embed AI capabilities into any system — whether an internal ERP, an external SaaS product, or a mobile app. But each of the three API modes has a distinct use case; choosing the wrong one leads to poor user experience, wasted server resources, or needless implementation complexity.

This chapter covers, layer by layer:

After reading this chapter, you will be able to:


Level 1: Foundational Knowledge (1–3 Years Experience)

Dify API Overview

Dify's APIs fall into two categories:

1. Application API

2. Service API (Management API)

Obtaining an API Key

  1. Go to your Dify app → Access API
  2. Click "Create API Key"
  3. Copy the generated key (format: app-xxxxxxxxxxxxxxxxxxxxxxxx)
# Test whether your API Key is valid
curl -X GET 'https://api.dify.ai/v1/info' \
  -H 'Authorization: Bearer app-your_api_key_here'

REST API: The Simplest Calling Method

Send a single chat message (blocking mode):

curl -X POST 'https://api.dify.ai/v1/chat-messages' \
  -H 'Authorization: Bearer app-your_api_key_here' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {},
    "query": "Hello, introduce Dify to me.",
    "response_mode": "blocking",
    "conversation_id": "",
    "user": "user-001"
  }'

Response example:

{
  "event": "message",
  "message_id": "msg-abc123",
  "conversation_id": "conv-xyz456",
  "answer": "Dify is an open-source LLMOps platform that helps you build and deploy AI applications rapidly...",
  "created_at": 1710000000,
  "metadata": {
    "usage": {
      "prompt_tokens": 150,
      "completion_tokens": 200,
      "total_tokens": 350,
      "total_price": "0.0000105"
    }
  }
}

Streaming API: Typewriter Effect

Waiting for a full response can take several seconds — poor UX. The streaming API returns output as it's generated, so users see text appearing in real time:

curl -X POST 'https://api.dify.ai/v1/chat-messages' \
  -H 'Authorization: Bearer app-your_api_key_here' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {},
    "query": "Write a haiku about spring.",
    "response_mode": "streaming",
    "user": "user-001"
  }'

Streaming response (SSE format):

data: {"event": "message", "answer": "Over", "message_id": "msg-001", ...}

data: {"event": "message", "answer": " the", "message_id": "msg-001", ...}

data: {"event": "message", "answer": " fields", "message_id": "msg-001", ...}

data: {"event": "message_end", "metadata": {...}}

data: [DONE]

Each data: line is a self-contained JSON event that the frontend can display incrementally.

Python Client Basics

import httpx, json

DIFY_API_URL = "https://api.dify.ai/v1"
API_KEY      = "app-your_api_key_here"

def chat_blocking(query: str, user_id: str = "default") -> str:
    resp = httpx.post(
        f"{DIFY_API_URL}/chat-messages",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={"query": query, "response_mode": "blocking",
              "user": user_id, "inputs": {}},
        timeout=60.0
    )
    resp.raise_for_status()
    return resp.json()["answer"]

def chat_streaming(query: str, user_id: str = "default"):
    with httpx.stream(
        "POST", f"{DIFY_API_URL}/chat-messages",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={"query": query, "response_mode": "streaming",
              "user": user_id, "inputs": {}},
        timeout=120.0
    ) as resp:
        resp.raise_for_status()
        for line in resp.iter_lines():
            if not line.startswith("data: "):
                continue
            data_str = line[6:]
            if data_str == "[DONE]":
                break
            event = json.loads(data_str)
            if event.get("event") == "message":
                yield event.get("answer", "")

# Usage
print("=== Blocking ===")
print(chat_blocking("Introduce Dify to me."))

print("\n=== Streaming ===")
for chunk in chat_streaming("Write a haiku about spring."):
    print(chunk, end="", flush=True)
print()

Level 2: Mechanism Deep Dive (3–5 Years Experience)

Complete Dify API Endpoint Reference

Chat application (Chatbot):

POST   /v1/chat-messages               # Send message
GET    /v1/conversations               # List conversations
GET    /v1/conversations/{id}/messages # Message history
DELETE /v1/conversations/{id}          # Delete conversation
POST   /v1/messages/{id}/feedbacks     # User feedback (like/dislike)
GET    /v1/suggested                   # Suggested follow-up questions
POST   /v1/audio-to-text               # Speech to text

Text generation (Completion):

POST /v1/completion-messages           # Generate text (blocking or streaming)

Workflow application:

POST /v1/workflows/run                 # Execute workflow
GET  /v1/workflows/run/{id}           # Query execution status
GET  /v1/workflows/run/{id}/logs      # Execution logs

File upload:

POST /v1/files/upload                  # Upload file (image, document, etc.)

Full Multi-Turn Conversation Client

import httpx, json
from typing import Optional, Generator, AsyncGenerator

class DifyClient:

    def __init__(self, api_url: str, api_key: str):
        self.base_url        = api_url.rstrip("/")
        self.headers         = {"Authorization": f"Bearer {api_key}",
                                "Content-Type": "application/json"}
        self._client         = httpx.Client(timeout=120.0)
        self._async_client   = httpx.AsyncClient(timeout=120.0)

    # ── Synchronous ──────────────────────────────────────────────────────

    def send_message(
        self, query: str, user: str,
        conversation_id: Optional[str] = None,
        inputs: dict = None, files: list = None
    ) -> dict:
        payload = {"query": query, "user": user, "response_mode": "blocking",
                   "inputs": inputs or {}, "conversation_id": conversation_id or ""}
        if files:
            payload["files"] = files
        resp = self._client.post(f"{self.base_url}/chat-messages",
                                  headers=self.headers, json=payload)
        resp.raise_for_status()
        return resp.json()

    def send_message_stream(
        self, query: str, user: str,
        conversation_id: Optional[str] = None, inputs: dict = None
    ) -> Generator[dict, None, None]:
        payload = {"query": query, "user": user, "response_mode": "streaming",
                   "inputs": inputs or {}, "conversation_id": conversation_id or ""}
        with self._client.stream("POST", f"{self.base_url}/chat-messages",
                                  headers=self.headers, json=payload) as resp:
            resp.raise_for_status()
            for line in resp.iter_lines():
                if not line or not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    yield json.loads(data_str)
                except json.JSONDecodeError:
                    continue

    def get_history(self, conversation_id: str, user: str, limit: int = 20) -> dict:
        resp = self._client.get(
            f"{self.base_url}/conversations/{conversation_id}/messages",
            headers=self.headers, params={"user": user, "limit": limit}
        )
        resp.raise_for_status()
        return resp.json()

    def upload_file(self, file_path: str, user: str) -> dict:
        with open(file_path, "rb") as f:
            resp = self._client.post(
                f"{self.base_url}/files/upload",
                headers={"Authorization": self.headers["Authorization"]},
                files={"file": f}, data={"user": user}
            )
        resp.raise_for_status()
        return resp.json()

    def submit_feedback(self, message_id: str, rating: str, user: str,
                        content: Optional[str] = None) -> dict:
        resp = self._client.post(
            f"{self.base_url}/messages/{message_id}/feedbacks",
            headers=self.headers,
            json={"rating": rating, "user": user, "content": content}
        )
        resp.raise_for_status()
        return resp.json()

    # ── Asynchronous ─────────────────────────────────────────────────────

    async def send_message_async(self, query: str, user: str, **kwargs) -> dict:
        payload = {"query": query, "user": user, "response_mode": "blocking",
                   "inputs": kwargs.get("inputs", {}),
                   "conversation_id": kwargs.get("conversation_id", "")}
        resp = await self._async_client.post(
            f"{self.base_url}/chat-messages", headers=self.headers, json=payload
        )
        resp.raise_for_status()
        return resp.json()

    async def send_message_stream_async(
        self, query: str, user: str, **kwargs
    ) -> AsyncGenerator[dict, None]:
        payload = {"query": query, "user": user, "response_mode": "streaming",
                   "inputs": kwargs.get("inputs", {}),
                   "conversation_id": kwargs.get("conversation_id", "")}
        async with self._async_client.stream(
            "POST", f"{self.base_url}/chat-messages",
            headers=self.headers, json=payload
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line or not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    yield json.loads(data_str)
                except json.JSONDecodeError:
                    continue

    async def close(self):
        self._client.close()
        await self._async_client.aclose()

Frontend Streaming Integration (TypeScript)

async function* streamDifyChat(
  query: string, userId: string, conversationId: string = "",
  apiKey: string, apiUrl: string = "https://api.dify.ai/v1"
): AsyncGenerator<{event: string; answer?: string; conversation_id?: string; metadata?: any}> {
  const resp = await fetch(`${apiUrl}/chat-messages`, {
    method:  "POST",
    headers: {"Authorization": `Bearer ${apiKey}`, "Content-Type": "application/json"},
    body:    JSON.stringify({
      query, user: userId, conversation_id: conversationId,
      response_mode: "streaming", inputs: {},
    }),
  });

  if (!resp.ok) {
    const err = await resp.json();
    throw new Error(`API error ${resp.status}: ${err.message}`);
  }

  const reader  = resp.body!.getReader();
  const decoder = new TextDecoder();
  let   buffer  = "";

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split("\n");
      buffer = lines.pop() || "";

      for (const line of lines) {
        if (!line.startsWith("data: ")) continue;
        const data = line.slice(6).trim();
        if (data === "[DONE]") return;
        try { yield JSON.parse(data); } catch { /* skip malformed */ }
      }
    }
  } finally {
    reader.releaseLock();
  }
}

// React hook
function useDifyChat(apiKey: string) {
  const [answer, setAnswer]       = React.useState("");
  const [convId, setConvId]       = React.useState("");
  const [loading, setLoading]     = React.useState(false);

  const sendMessage = async (query: string) => {
    setLoading(true);
    setAnswer("");
    let full = "", newConvId = convId;

    try {
      for await (const event of streamDifyChat(query, "user-001", convId, apiKey)) {
        if (event.event === "message") {
          full += event.answer || "";
          setAnswer(full);
        } else if (event.event === "message_end") {
          if (event.conversation_id) {
            newConvId = event.conversation_id;
            setConvId(newConvId);
          }
        } else if (event.event === "error") {
          throw new Error((event as any).message);
        }
      }
    } finally {
      setLoading(false);
    }
  };

  return { answer, convId, loading, sendMessage };
}

Workflow API Full Implementation

import asyncio, json
import httpx

class DifyWorkflowClient:

    def __init__(self, api_url: str, api_key: str):
        self.base_url = api_url.rstrip("/")
        self.headers  = {"Authorization": f"Bearer {api_key}"}
        self.client   = httpx.AsyncClient(timeout=30.0)

    async def run(self, inputs: dict, user: str, mode: str = "blocking") -> dict:
        resp = await self.client.post(
            f"{self.base_url}/workflows/run",
            headers={**self.headers, "Content-Type": "application/json"},
            json={"inputs": inputs, "user": user, "response_mode": mode}
        )
        resp.raise_for_status()
        return resp.json()

    async def run_stream(self, inputs: dict, user: str):
        async with self.client.stream(
            "POST", f"{self.base_url}/workflows/run",
            headers={**self.headers, "Content-Type": "application/json"},
            json={"inputs": inputs, "user": user, "response_mode": "streaming"}
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    yield json.loads(data_str)
                except json.JSONDecodeError:
                    continue

    async def wait_for_completion(
        self, run_id: str, poll_interval: float = 2.0, timeout: float = 300.0
    ) -> dict:
        deadline = asyncio.get_event_loop().time() + timeout
        while asyncio.get_event_loop().time() < deadline:
            resp = await self.client.get(
                f"{self.base_url}/workflows/run/{run_id}", headers=self.headers
            )
            resp.raise_for_status()
            status = resp.json()
            if status["status"] in ["succeeded", "failed", "stopped"]:
                return status
            await asyncio.sleep(poll_interval)
        raise TimeoutError(f"Workflow {run_id} did not complete within {timeout}s")

Level 3: Source Code and Principles (5+ Years Experience)

Dify API Server Routing Architecture

api/controllers/
├── console/          # Console API (admin)
│   ├── app/
│   │   ├── chat.py           # Conversation management
│   │   └── conversation.py   # Conversation list
│   └── datasets/             # Knowledge base management
└── service_api/      # Service API (app callers)
    ├── app/
    │   ├── chat.py           # Chat API
    │   ├── completion.py     # Text generation API
    │   ├── workflow.py       # Workflow API
    │   └── audio.py          # Voice API
    └── index.py              # Route registration

Server-side streaming implementation:

# api/controllers/service_api/app/chat.py (key logic)
from flask import Response, stream_with_context
import json

class ChatMessageStreamView(Resource):

    def post(self):
        data = request.get_json()
        if data.get("response_mode") == "streaming":
            return self._streaming_response(data)
        return self._blocking_response(data)

    def _streaming_response(self, data) -> Response:
        def generate():
            try:
                response = app_runner.generate(
                    app_model=g.app_model,
                    user=self._get_user(data["user"]),
                    query=data["query"],
                    stream=True,
                )
                for event in response:
                    yield f"data: {json.dumps(self._format_event(event), ensure_ascii=False)}\n\n"
                yield "data: [DONE]\n\n"
            except Exception as e:
                yield f"data: {json.dumps({'event':'error','message':str(e)})}\n\n"
                yield "data: [DONE]\n\n"

        return Response(
            stream_with_context(generate()),
            content_type="text/event-stream",
            headers={
                "Cache-Control":     "no-cache",
                "X-Accel-Buffering": "no",       # Disable Nginx buffering
                "Transfer-Encoding": "chunked",
                "Connection":        "keep-alive",
            }
        )

SSE Protocol Technical Details

Server-Sent Events (SSE) is a unidirectional HTTP push protocol:

event: [event type, optional]\n
data:  [payload]\n
id:    [message ID, optional]\n
retry: [reconnect interval in ms, optional]\n
\n   (blank line = end of message)

Dify uses the simplest format — only data:, one message per line:

data: {"event":"message","answer":"Hello","message_id":"xxx"}\n\n

Resilient SSE client with reconnect:

class ResilientSSEClient {
  private retryCount = 0;
  private maxRetries = 3;
  private baseDelay  = 1000;

  async *connect(url: string, body: object, headers: Record<string, string>) {
    while (this.retryCount <= this.maxRetries) {
      try {
        yield* this._doConnect(url, body, headers);
        this.retryCount = 0;
        return;
      } catch (e) {
        if (this.retryCount >= this.maxRetries) throw e;
        const delay = this.baseDelay * Math.pow(2, this.retryCount);
        console.warn(`SSE connection failed. Retrying in ${delay}ms (attempt ${this.retryCount + 1})`);
        await new Promise(r => setTimeout(r, delay));
        this.retryCount++;
      }
    }
  }

  private async *_doConnect(url: string, body: object, headers: Record<string, string>) {
    const resp = await fetch(url, {
      method:  "POST",
      headers: {"Content-Type": "application/json", ...headers},
      body:    JSON.stringify(body),
      signal:  AbortSignal.timeout(120_000),
    });
    if (!resp.ok) throw new Error(`HTTP ${resp.status}`);

    const reader  = resp.body!.getReader();
    const decoder = new TextDecoder();
    let   buffer  = "";
    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop() || "";
        for (const line of lines) {
          if (!line.startsWith("data: ")) continue;
          const d = line.slice(6).trim();
          if (d === "[DONE]") return;
          try { yield JSON.parse(d); } catch {}
        }
      }
    } finally {
      reader.releaseLock();
    }
  }
}

WebSocket Proxy Layer

# FastAPI WebSocket proxy over Dify SSE
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import httpx, json, asyncio

app    = FastAPI()
client = httpx.AsyncClient(timeout=120.0)

@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(ws: WebSocket, user_id: str):
    await ws.accept()
    conversation_id = ""

    try:
        while True:
            raw   = await ws.receive_text()
            msg   = json.loads(raw)
            query = msg.get("query", "")

            if not query:
                await ws.send_json({"type": "error", "message": "empty query"})
                continue

            await ws.send_json({"type": "start"})
            full_answer = ""

            async with client.stream(
                "POST", f"{DIFY_API_URL}/chat-messages",
                headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
                json={"query": query, "user": user_id, "conversation_id": conversation_id,
                      "response_mode": "streaming", "inputs": msg.get("inputs", {})}
            ) as resp:
                if resp.status_code != 200:
                    await ws.send_json({"type": "error", "message": f"Dify API {resp.status_code}"})
                    continue
                async for line in resp.aiter_lines():
                    if not line.startswith("data: "):
                        continue
                    d = line[6:].strip()
                    if d == "[DONE]":
                        break
                    try:
                        event = json.loads(d)
                    except:
                        continue
                    if event.get("event") == "message":
                        chunk = event.get("answer", "")
                        full_answer += chunk
                        await ws.send_json({"type": "chunk", "content": chunk})
                    elif event.get("event") == "message_end":
                        conversation_id = event.get("conversation_id", conversation_id)

            await ws.send_json({"type": "done", "full_answer": full_answer,
                                "conversation_id": conversation_id})

    except WebSocketDisconnect:
        print(f"WebSocket disconnected: user_id={user_id}")
    except Exception as e:
        try:
            await ws.send_json({"type": "error", "message": str(e)})
        except:
            pass
// Frontend WebSocket client with reconnect
class DifyWebSocketClient {
  private ws:      WebSocket | null = null;
  private retries: number           = 0;

  connect(userId: string, onChunk: (t: string) => void) {
    this.ws = new WebSocket(`wss://your-server.com/ws/chat/${userId}`);
    this.ws.onmessage = (e) => {
      const d = JSON.parse(e.data);
      if (d.type === "chunk") onChunk(d.content);
    };
    this.ws.onclose = () => {
      if (this.retries < 5) {
        setTimeout(() => { this.retries++; this.connect(userId, onChunk); },
                   1000 * Math.pow(2, this.retries));
      }
    };
    this.ws.onopen = () => { this.retries = 0; };
  }

  send(query: string, inputs: Record<string, any> = {}) {
    if (this.ws?.readyState === WebSocket.OPEN)
      this.ws.send(JSON.stringify({ query, inputs }));
  }
}

Level 4: Production Pitfalls and Decision-Making (Expert Perspective)

Pitfall 1: Nginx Buffering Breaking SSE

Nginx buffers upstream responses by default, causing SSE to be delivered in batches, destroying the streaming effect:

# ❌ Default Nginx — buffers responses
location /v1/ {
    proxy_pass http://dify_api;
}

# ✅ Correct — disable buffering for SSE
location /v1/ {
    proxy_pass              http://dify_api;
    proxy_buffering         off;
    proxy_cache             off;
    proxy_read_timeout      300s;
    proxy_send_timeout      300s;
    proxy_http_version      1.1;
    proxy_set_header        Connection "";
    proxy_set_header        X-Accel-Buffering no;
    add_header              Cache-Control "no-cache";
    add_header              Content-Type  "text/event-stream";
}

Pitfall 2: API Key Security

Exposing your API Key in frontend code is extremely dangerous:

❌ Dangerous: API Key in frontend source
const resp = await fetch("https://api.dify.ai/v1/chat-messages", {
    headers: {"Authorization": "Bearer app-your_secret_key"}  // Visible to anyone!
});

Backend proxy layer:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt, os, httpx

app    = FastAPI()
bearer = HTTPBearer()

def verify_user(creds: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
    try:
        return jwt.decode(creds.credentials, os.getenv("JWT_SECRET"), algorithms=["HS256"])
    except:
        raise HTTPException(401, "Invalid credentials")

@app.post("/api/chat")
async def chat_proxy(request: ChatRequest, user: dict = Depends(verify_user)):
    # Dify API Key lives only in the server environment — never sent to the client
    dify_key = os.getenv("DIFY_API_KEY")
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "https://api.dify.ai/v1/chat-messages",
            headers={"Authorization": f"Bearer {dify_key}"},
            json={"query": request.query, "user": user["sub"],
                  "response_mode": "blocking", "inputs": {}}
        )
    return resp.json()

Pitfall 3: Streaming Timeout Configuration

# ❌ Wrong: short timeout cuts off streaming responses
httpx.post(url, timeout=10)

# ✅ Correct: separate connect timeout from read timeout
TIMEOUTS = {
    "blocking_simple":  httpx.Timeout(connect=5, read=30,   write=10, pool=5),
    "blocking_complex": httpx.Timeout(connect=5, read=120,  write=10, pool=5),
    "streaming":        httpx.Timeout(connect=5, read=None, write=10, pool=5),
    # read=None for streaming: the response may last arbitrarily long
}

# Use an external total-duration limit instead
async def stream_with_deadline(query: str, max_seconds: float = 120):
    async with asyncio.timeout(max_seconds):
        async for chunk in dify_client.stream(query):
            yield chunk

Production-Grade Client Configuration

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class ProductionDifyClient:

    def __init__(self, api_key: str, api_url: str):
        limits = httpx.Limits(max_connections=100, max_keepalive_connections=20)
        self.client = httpx.AsyncClient(
            base_url = api_url,
            headers  = {"Authorization": f"Bearer {api_key}"},
            limits   = limits,
            timeout  = httpx.Timeout(connect=5.0, read=None, write=10.0, pool=5.0),
            http2    = True,  # HTTP/2 for better concurrency
        )

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
    )
    async def send_blocking(self, query: str, user: str, **kwargs) -> dict:
        resp = await self.client.post(
            "/chat-messages",
            json={"query": query, "user": user, "response_mode": "blocking",
                  "inputs": kwargs.get("inputs", {})}
        )
        if resp.status_code == 429:
            await asyncio.sleep(int(resp.headers.get("Retry-After", 5)))
            raise httpx.HTTPStatusError("Rate limited", request=resp.request, response=resp)
        resp.raise_for_status()
        return resp.json()

    async def close(self):
        await self.client.aclose()

Chapter Summary

This chapter systematically covered the complete implementation of all three Dify API calling modes:

REST API (blocking):

SSE Streaming API:

WebSocket layer:

Production must-haves:

  1. API Key must never appear in frontend code — use a backend proxy
  2. Implement multi-layer rate limiting (per-user RPM + global RPM)
  3. Set a maximum total duration for streaming connections to prevent leaks
  4. Automatically retry on network errors (max 3 attempts, exponential backoff)

Key numbers:

Next chapter: Chapter 18 dives deep into integrating Dify with Feishu (Lark), WeChat Work, and DingTalk — including message routing, bot configuration, and unified multi-platform management.

Rate this chapter
4.9  / 5  (13 ratings)

💬 Comments