Chapter 17

API Integration Guide: REST, Streaming and WebSocket Implementation

Chapter 17: Complete API Integration Guide โ€” REST / Streaming / WebSocket Full Implementation

Master all three Dify API calling modes: blocking REST for simple integrations, SSE streaming for great UX, and WebSocket for complex bidirectional interactions โ€” with complete code and production considerations for each.

Chapter Overview

Dify provides a complete API ecosystem that lets you embed AI capabilities into any system โ€” whether an internal ERP, an external SaaS product, or a mobile app. But each of the three API modes has a distinct use case; choosing the wrong one leads to poor user experience, wasted server resources, or needless implementation complexity.

This chapter covers, layer by layer:

After reading this chapter, you will be able to:


Level 1: Foundational Knowledge (1โ€“3 Years Experience)

Dify API Overview

Dify's APIs fall into two categories:

1. Application API

2. Service API (Management API)

Obtaining an API Key

  1. Go to your Dify app โ†’ Access API
  2. Click "Create API Key"
  3. Copy the generated key (format: app-xxxxxxxxxxxxxxxxxxxxxxxx)
# Test whether your API Key is valid
curl -X GET 'https://api.dify.ai/v1/info' \
  -H 'Authorization: Bearer app-your_api_key_here'

REST API: The Simplest Calling Method

Send a single chat message (blocking mode):

curl -X POST 'https://api.dify.ai/v1/chat-messages' \
  -H 'Authorization: Bearer app-your_api_key_here' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {},
    "query": "Hello, introduce Dify to me.",
    "response_mode": "blocking",
    "conversation_id": "",
    "user": "user-001"
  }'

Response example:

{
  "event": "message",
  "message_id": "msg-abc123",
  "conversation_id": "conv-xyz456",
  "answer": "Dify is an open-source LLMOps platform that helps you build and deploy AI applications rapidly...",
  "created_at": 1710000000,
  "metadata": {
    "usage": {
      "prompt_tokens": 150,
      "completion_tokens": 200,
      "total_tokens": 350,
      "total_price": "0.0000105"
    }
  }
}

Streaming API: Typewriter Effect

Waiting for a full response can take several seconds โ€” poor UX. The streaming API returns output as it's generated, so users see text appearing in real time:

curl -X POST 'https://api.dify.ai/v1/chat-messages' \
  -H 'Authorization: Bearer app-your_api_key_here' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {},
    "query": "Write a haiku about spring.",
    "response_mode": "streaming",
    "user": "user-001"
  }'

Streaming response (SSE format):

data: {"event": "message", "answer": "Over", "message_id": "msg-001", ...}

data: {"event": "message", "answer": " the", "message_id": "msg-001", ...}

data: {"event": "message", "answer": " fields", "message_id": "msg-001", ...}

data: {"event": "message_end", "metadata": {...}}

data: [DONE]

Each data: line is a self-contained JSON event that the frontend can display incrementally.

Python Client Basics

import httpx, json

DIFY_API_URL = "https://api.dify.ai/v1"
API_KEY      = "app-your_api_key_here"

def chat_blocking(query: str, user_id: str = "default") -> str:
    resp = httpx.post(
        f"{DIFY_API_URL}/chat-messages",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={"query": query, "response_mode": "blocking",
              "user": user_id, "inputs": {}},
        timeout=60.0
    )
    resp.raise_for_status()
    return resp.json()["answer"]

def chat_streaming(query: str, user_id: str = "default"):
    with httpx.stream(
        "POST", f"{DIFY_API_URL}/chat-messages",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={"query": query, "response_mode": "streaming",
              "user": user_id, "inputs": {}},
        timeout=120.0
    ) as resp:
        resp.raise_for_status()
        for line in resp.iter_lines():
            if not line.startswith("data: "):
                continue
            data_str = line[6:]
            if data_str == "[DONE]":
                break
            event = json.loads(data_str)
            if event.get("event") == "message":
                yield event.get("answer", "")

# Usage
print("=== Blocking ===")
print(chat_blocking("Introduce Dify to me."))

print("\n=== Streaming ===")
for chunk in chat_streaming("Write a haiku about spring."):
    print(chunk, end="", flush=True)
print()

Level 2: Mechanism Deep Dive (3โ€“5 Years Experience)

Complete Dify API Endpoint Reference

Chat application (Chatbot):

POST   /v1/chat-messages               # Send message
GET    /v1/conversations               # List conversations
GET    /v1/conversations/{id}/messages # Message history
DELETE /v1/conversations/{id}          # Delete conversation
POST   /v1/messages/{id}/feedbacks     # User feedback (like/dislike)
GET    /v1/suggested                   # Suggested follow-up questions
POST   /v1/audio-to-text               # Speech to text

Text generation (Completion):

POST /v1/completion-messages           # Generate text (blocking or streaming)

Workflow application:

POST /v1/workflows/run                 # Execute workflow
GET  /v1/workflows/run/{id}           # Query execution status
GET  /v1/workflows/run/{id}/logs      # Execution logs

File upload:

POST /v1/files/upload                  # Upload file (image, document, etc.)

Full Multi-Turn Conversation Client

import httpx, json
from typing import Optional, Generator, AsyncGenerator

class DifyClient:

    def __init__(self, api_url: str, api_key: str):
        self.base_url        = api_url.rstrip("/")
        self.headers         = {"Authorization": f"Bearer {api_key}",
                                "Content-Type": "application/json"}
        self._client         = httpx.Client(timeout=120.0)
        self._async_client   = httpx.AsyncClient(timeout=120.0)

    # โ”€โ”€ Synchronous โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

    def send_message(
        self, query: str, user: str,
        conversation_id: Optional[str] = None,
        inputs: dict = None, files: list = None
    ) -> dict:
        payload = {"query": query, "user": user, "response_mode": "blocking",
                   "inputs": inputs or {}, "conversation_id": conversation_id or ""}
        if files:
            payload["files"] = files
        resp = self._client.post(f"{self.base_url}/chat-messages",
                                  headers=self.headers, json=payload)
        resp.raise_for_status()
        return resp.json()

    def send_message_stream(
        self, query: str, user: str,
        conversation_id: Optional[str] = None, inputs: dict = None
    ) -> Generator[dict, None, None]:
        payload = {"query": query, "user": user, "response_mode": "streaming",
                   "inputs": inputs or {}, "conversation_id": conversation_id or ""}
        with self._client.stream("POST", f"{self.base_url}/chat-messages",
                                  headers=self.headers, json=payload) as resp:
            resp.raise_for_status()
            for line in resp.iter_lines():
                if not line or not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    yield json.loads(data_str)
                except json.JSONDecodeError:
                    continue

    def get_history(self, conversation_id: str, user: str, limit: int = 20) -> dict:
        resp = self._client.get(
            f"{self.base_url}/conversations/{conversation_id}/messages",
            headers=self.headers, params={"user": user, "limit": limit}
        )
        resp.raise_for_status()
        return resp.json()

    def upload_file(self, file_path: str, user: str) -> dict:
        with open(file_path, "rb") as f:
            resp = self._client.post(
                f"{self.base_url}/files/upload",
                headers={"Authorization": self.headers["Authorization"]},
                files={"file": f}, data={"user": user}
            )
        resp.raise_for_status()
        return resp.json()

    def submit_feedback(self, message_id: str, rating: str, user: str,
                        content: Optional[str] = None) -> dict:
        resp = self._client.post(
            f"{self.base_url}/messages/{message_id}/feedbacks",
            headers=self.headers,
            json={"rating": rating, "user": user, "content": content}
        )
        resp.raise_for_status()
        return resp.json()

    # โ”€โ”€ Asynchronous โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

    async def send_message_async(self, query: str, user: str, **kwargs) -> dict:
        payload = {"query": query, "user": user, "response_mode": "blocking",
                   "inputs": kwargs.get("inputs", {}),
                   "conversation_id": kwargs.get("conversation_id", "")}
        resp = await self._async_client.post(
            f"{self.base_url}/chat-messages", headers=self.headers, json=payload
        )
        resp.raise_for_status()
        return resp.json()

    async def send_message_stream_async(
        self, query: str, user: str, **kwargs
    ) -> AsyncGenerator[dict, None]:
        payload = {"query": query, "user": user, "response_mode": "streaming",
                   "inputs": kwargs.get("inputs", {}),
                   "conversation_id": kwargs.get("conversation_id", "")}
        async with self._async_client.stream(
            "POST", f"{self.base_url}/chat-messages",
            headers=self.headers, json=payload
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line or not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    yield json.loads(data_str)
                except json.JSONDecodeError:
                    continue

    async def close(self):
        self._client.close()
        await self._async_client.aclose()

Frontend Streaming Integration (TypeScript)

async function* streamDifyChat(
  query: string, userId: string, conversationId: string = "",
  apiKey: string, apiUrl: string = "https://api.dify.ai/v1"
): AsyncGenerator<{event: string; answer?: string; conversation_id?: string; metadata?: any}> {
  const resp = await fetch(`${apiUrl}/chat-messages`, {
    method:  "POST",
    headers: {"Authorization": `Bearer ${apiKey}`, "Content-Type": "application/json"},
    body:    JSON.stringify({
      query, user: userId, conversation_id: conversationId,
      response_mode: "streaming", inputs: {},
    }),
  });

  if (!resp.ok) {
    const err = await resp.json();
    throw new Error(`API error ${resp.status}: ${err.message}`);
  }

  const reader  = resp.body!.getReader();
  const decoder = new TextDecoder();
  let   buffer  = "";

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      buffer += decoder.decode(value, { stream: true });
      const lines = buffer.split("\n");
      buffer = lines.pop() || "";

      for (const line of lines) {
        if (!line.startsWith("data: ")) continue;
        const data = line.slice(6).trim();
        if (data === "[DONE]") return;
        try { yield JSON.parse(data); } catch { /* skip malformed */ }
      }
    }
  } finally {
    reader.releaseLock();
  }
}

// React hook
function useDifyChat(apiKey: string) {
  const [answer, setAnswer]       = React.useState("");
  const [convId, setConvId]       = React.useState("");
  const [loading, setLoading]     = React.useState(false);

  const sendMessage = async (query: string) => {
    setLoading(true);
    setAnswer("");
    let full = "", newConvId = convId;

    try {
      for await (const event of streamDifyChat(query, "user-001", convId, apiKey)) {
        if (event.event === "message") {
          full += event.answer || "";
          setAnswer(full);
        } else if (event.event === "message_end") {
          if (event.conversation_id) {
            newConvId = event.conversation_id;
            setConvId(newConvId);
          }
        } else if (event.event === "error") {
          throw new Error((event as any).message);
        }
      }
    } finally {
      setLoading(false);
    }
  };

  return { answer, convId, loading, sendMessage };
}

Workflow API Full Implementation

import asyncio, json
import httpx

class DifyWorkflowClient:

    def __init__(self, api_url: str, api_key: str):
        self.base_url = api_url.rstrip("/")
        self.headers  = {"Authorization": f"Bearer {api_key}"}
        self.client   = httpx.AsyncClient(timeout=30.0)

    async def run(self, inputs: dict, user: str, mode: str = "blocking") -> dict:
        resp = await self.client.post(
            f"{self.base_url}/workflows/run",
            headers={**self.headers, "Content-Type": "application/json"},
            json={"inputs": inputs, "user": user, "response_mode": mode}
        )
        resp.raise_for_status()
        return resp.json()

    async def run_stream(self, inputs: dict, user: str):
        async with self.client.stream(
            "POST", f"{self.base_url}/workflows/run",
            headers={**self.headers, "Content-Type": "application/json"},
            json={"inputs": inputs, "user": user, "response_mode": "streaming"}
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if not line.startswith("data: "):
                    continue
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    return
                try:
                    yield json.loads(data_str)
                except json.JSONDecodeError:
                    continue

    async def wait_for_completion(
        self, run_id: str, poll_interval: float = 2.0, timeout: float = 300.0
    ) -> dict:
        deadline = asyncio.get_event_loop().time() + timeout
        while asyncio.get_event_loop().time() < deadline:
            resp = await self.client.get(
                f"{self.base_url}/workflows/run/{run_id}", headers=self.headers
            )
            resp.raise_for_status()
            status = resp.json()
            if status["status"] in ["succeeded", "failed", "stopped"]:
                return status
            await asyncio.sleep(poll_interval)
        raise TimeoutError(f"Workflow {run_id} did not complete within {timeout}s")

Level 3: Source Code and Principles (5+ Years Experience)

Dify API Server Routing Architecture

api/controllers/
โ”œโ”€โ”€ console/          # Console API (admin)
โ”‚   โ”œโ”€โ”€ app/
โ”‚   โ”‚   โ”œโ”€โ”€ chat.py           # Conversation management
โ”‚   โ”‚   โ””โ”€โ”€ conversation.py   # Conversation list
โ”‚   โ””โ”€โ”€ datasets/             # Knowledge base management
โ””โ”€โ”€ service_api/      # Service API (app callers)
    โ”œโ”€โ”€ app/
    โ”‚   โ”œโ”€โ”€ chat.py           # Chat API
    โ”‚   โ”œโ”€โ”€ completion.py     # Text generation API
    โ”‚   โ”œโ”€โ”€ workflow.py       # Workflow API
    โ”‚   โ””โ”€โ”€ audio.py          # Voice API
    โ””โ”€โ”€ index.py              # Route registration

Server-side streaming implementation:

# api/controllers/service_api/app/chat.py (key logic)
from flask import Response, stream_with_context
import json

class ChatMessageStreamView(Resource):

    def post(self):
        data = request.get_json()
        if data.get("response_mode") == "streaming":
            return self._streaming_response(data)
        return self._blocking_response(data)

    def _streaming_response(self, data) -> Response:
        def generate():
            try:
                response = app_runner.generate(
                    app_model=g.app_model,
                    user=self._get_user(data["user"]),
                    query=data["query"],
                    stream=True,
                )
                for event in response:
                    yield f"data: {json.dumps(self._format_event(event), ensure_ascii=False)}\n\n"
                yield "data: [DONE]\n\n"
            except Exception as e:
                yield f"data: {json.dumps({'event':'error','message':str(e)})}\n\n"
                yield "data: [DONE]\n\n"

        return Response(
            stream_with_context(generate()),
            content_type="text/event-stream",
            headers={
                "Cache-Control":     "no-cache",
                "X-Accel-Buffering": "no",       # Disable Nginx buffering
                "Transfer-Encoding": "chunked",
                "Connection":        "keep-alive",
            }
        )

SSE Protocol Technical Details

Server-Sent Events (SSE) is a unidirectional HTTP push protocol:

event: [event type, optional]\n
data:  [payload]\n
id:    [message ID, optional]\n
retry: [reconnect interval in ms, optional]\n
\n   (blank line = end of message)

Dify uses the simplest format โ€” only data:, one message per line:

data: {"event":"message","answer":"Hello","message_id":"xxx"}\n\n

Resilient SSE client with reconnect:

class ResilientSSEClient {
  private retryCount = 0;
  private maxRetries = 3;
  private baseDelay  = 1000;

  async *connect(url: string, body: object, headers: Record<string, string>) {
    while (this.retryCount <= this.maxRetries) {
      try {
        yield* this._doConnect(url, body, headers);
        this.retryCount = 0;
        return;
      } catch (e) {
        if (this.retryCount >= this.maxRetries) throw e;
        const delay = this.baseDelay * Math.pow(2, this.retryCount);
        console.warn(`SSE connection failed. Retrying in ${delay}ms (attempt ${this.retryCount + 1})`);
        await new Promise(r => setTimeout(r, delay));
        this.retryCount++;
      }
    }
  }

  private async *_doConnect(url: string, body: object, headers: Record<string, string>) {
    const resp = await fetch(url, {
      method:  "POST",
      headers: {"Content-Type": "application/json", ...headers},
      body:    JSON.stringify(body),
      signal:  AbortSignal.timeout(120_000),
    });
    if (!resp.ok) throw new Error(`HTTP ${resp.status}`);

    const reader  = resp.body!.getReader();
    const decoder = new TextDecoder();
    let   buffer  = "";
    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split("\n");
        buffer = lines.pop() || "";
        for (const line of lines) {
          if (!line.startsWith("data: ")) continue;
          const d = line.slice(6).trim();
          if (d === "[DONE]") return;
          try { yield JSON.parse(d); } catch {}
        }
      }
    } finally {
      reader.releaseLock();
    }
  }
}

WebSocket Proxy Layer

# FastAPI WebSocket proxy over Dify SSE
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import httpx, json, asyncio

app    = FastAPI()
client = httpx.AsyncClient(timeout=120.0)

@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(ws: WebSocket, user_id: str):
    await ws.accept()
    conversation_id = ""

    try:
        while True:
            raw   = await ws.receive_text()
            msg   = json.loads(raw)
            query = msg.get("query", "")

            if not query:
                await ws.send_json({"type": "error", "message": "empty query"})
                continue

            await ws.send_json({"type": "start"})
            full_answer = ""

            async with client.stream(
                "POST", f"{DIFY_API_URL}/chat-messages",
                headers={"Authorization": f"Bearer {DIFY_API_KEY}"},
                json={"query": query, "user": user_id, "conversation_id": conversation_id,
                      "response_mode": "streaming", "inputs": msg.get("inputs", {})}
            ) as resp:
                if resp.status_code != 200:
                    await ws.send_json({"type": "error", "message": f"Dify API {resp.status_code}"})
                    continue
                async for line in resp.aiter_lines():
                    if not line.startswith("data: "):
                        continue
                    d = line[6:].strip()
                    if d == "[DONE]":
                        break
                    try:
                        event = json.loads(d)
                    except:
                        continue
                    if event.get("event") == "message":
                        chunk = event.get("answer", "")
                        full_answer += chunk
                        await ws.send_json({"type": "chunk", "content": chunk})
                    elif event.get("event") == "message_end":
                        conversation_id = event.get("conversation_id", conversation_id)

            await ws.send_json({"type": "done", "full_answer": full_answer,
                                "conversation_id": conversation_id})

    except WebSocketDisconnect:
        print(f"WebSocket disconnected: user_id={user_id}")
    except Exception as e:
        try:
            await ws.send_json({"type": "error", "message": str(e)})
        except:
            pass
// Frontend WebSocket client with reconnect
class DifyWebSocketClient {
  private ws:      WebSocket | null = null;
  private retries: number           = 0;

  connect(userId: string, onChunk: (t: string) => void) {
    this.ws = new WebSocket(`wss://your-server.com/ws/chat/${userId}`);
    this.ws.onmessage = (e) => {
      const d = JSON.parse(e.data);
      if (d.type === "chunk") onChunk(d.content);
    };
    this.ws.onclose = () => {
      if (this.retries < 5) {
        setTimeout(() => { this.retries++; this.connect(userId, onChunk); },
                   1000 * Math.pow(2, this.retries));
      }
    };
    this.ws.onopen = () => { this.retries = 0; };
  }

  send(query: string, inputs: Record<string, any> = {}) {
    if (this.ws?.readyState === WebSocket.OPEN)
      this.ws.send(JSON.stringify({ query, inputs }));
  }
}

Level 4: Production Pitfalls and Decision-Making (Expert Perspective)

Pitfall 1: Nginx Buffering Breaking SSE

Nginx buffers upstream responses by default, causing SSE to be delivered in batches, destroying the streaming effect:

# โŒ Default Nginx โ€” buffers responses
location /v1/ {
    proxy_pass http://dify_api;
}

# โœ… Correct โ€” disable buffering for SSE
location /v1/ {
    proxy_pass              http://dify_api;
    proxy_buffering         off;
    proxy_cache             off;
    proxy_read_timeout      300s;
    proxy_send_timeout      300s;
    proxy_http_version      1.1;
    proxy_set_header        Connection "";
    proxy_set_header        X-Accel-Buffering no;
    add_header              Cache-Control "no-cache";
    add_header              Content-Type  "text/event-stream";
}

Pitfall 2: API Key Security

Exposing your API Key in frontend code is extremely dangerous:

โŒ Dangerous: API Key in frontend source
const resp = await fetch("https://api.dify.ai/v1/chat-messages", {
    headers: {"Authorization": "Bearer app-your_secret_key"}  // Visible to anyone!
});

Backend proxy layer:

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt, os, httpx

app    = FastAPI()
bearer = HTTPBearer()

def verify_user(creds: HTTPAuthorizationCredentials = Depends(bearer)) -> dict:
    try:
        return jwt.decode(creds.credentials, os.getenv("JWT_SECRET"), algorithms=["HS256"])
    except:
        raise HTTPException(401, "Invalid credentials")

@app.post("/api/chat")
async def chat_proxy(request: ChatRequest, user: dict = Depends(verify_user)):
    # Dify API Key lives only in the server environment โ€” never sent to the client
    dify_key = os.getenv("DIFY_API_KEY")
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "https://api.dify.ai/v1/chat-messages",
            headers={"Authorization": f"Bearer {dify_key}"},
            json={"query": request.query, "user": user["sub"],
                  "response_mode": "blocking", "inputs": {}}
        )
    return resp.json()

Pitfall 3: Streaming Timeout Configuration

# โŒ Wrong: short timeout cuts off streaming responses
httpx.post(url, timeout=10)

# โœ… Correct: separate connect timeout from read timeout
TIMEOUTS = {
    "blocking_simple":  httpx.Timeout(connect=5, read=30,   write=10, pool=5),
    "blocking_complex": httpx.Timeout(connect=5, read=120,  write=10, pool=5),
    "streaming":        httpx.Timeout(connect=5, read=None, write=10, pool=5),
    # read=None for streaming: the response may last arbitrarily long
}

# Use an external total-duration limit instead
async def stream_with_deadline(query: str, max_seconds: float = 120):
    async with asyncio.timeout(max_seconds):
        async for chunk in dify_client.stream(query):
            yield chunk

Production-Grade Client Configuration

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class ProductionDifyClient:

    def __init__(self, api_key: str, api_url: str):
        limits = httpx.Limits(max_connections=100, max_keepalive_connections=20)
        self.client = httpx.AsyncClient(
            base_url = api_url,
            headers  = {"Authorization": f"Bearer {api_key}"},
            limits   = limits,
            timeout  = httpx.Timeout(connect=5.0, read=None, write=10.0, pool=5.0),
            http2    = True,  # HTTP/2 for better concurrency
        )

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry=retry_if_exception_type((httpx.ConnectError, httpx.TimeoutException)),
    )
    async def send_blocking(self, query: str, user: str, **kwargs) -> dict:
        resp = await self.client.post(
            "/chat-messages",
            json={"query": query, "user": user, "response_mode": "blocking",
                  "inputs": kwargs.get("inputs", {})}
        )
        if resp.status_code == 429:
            await asyncio.sleep(int(resp.headers.get("Retry-After", 5)))
            raise httpx.HTTPStatusError("Rate limited", request=resp.request, response=resp)
        resp.raise_for_status()
        return resp.json()

    async def close(self):
        await self.client.aclose()

Chapter Summary

This chapter systematically covered the complete implementation of all three Dify API calling modes:

REST API (blocking):

SSE Streaming API:

WebSocket layer:

Production must-haves:

  1. API Key must never appear in frontend code โ€” use a backend proxy
  2. Implement multi-layer rate limiting (per-user RPM + global RPM)
  3. Set a maximum total duration for streaming connections to prevent leaks
  4. Automatically retry on network errors (max 3 attempts, exponential backoff)

Key numbers:

Next chapter: Chapter 18 dives deep into integrating Dify with Feishu (Lark), WeChat Work, and DingTalk โ€” including message routing, bot configuration, and unified multi-platform management.

Rate this chapter
4.9  / 5  (13 ratings)

๐Ÿ’ฌ Comments