FastAPI 认证指南

FastAPI 完整认证模式:JWT、OAuth2 密码流、HTTP Bearer 和 API 密钥方案。

1. JWT Token 创建与验证

from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext

SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"

pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_token(data: dict, expires_in: timedelta) -> str:
    payload = data.copy()
    payload["exp"] = datetime.now(timezone.utc) + expires_in
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def create_access_token(user_id: int, role: str) -> str:
    return create_token({"sub": str(user_id), "role": role}, timedelta(minutes=30))

2. OAuth2 密码流

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

@app.post("/auth/token")
async def login(form: OAuth2PasswordRequestForm = Depends(), db: DB = Depends(get_db)):
    user = db.query(User).filter(User.email == form.username).first()
    if not user or not pwd_ctx.verify(form.password, user.hashed_password):
        raise HTTPException(status_code=401, detail="邮箱或密码错误")
    return {
        "access_token": create_access_token(user.id, user.role),
        "token_type": "bearer",
    }

3. 当前用户依赖

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: DB) -> User:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError:
        raise HTTPException(status_code=401, detail="token 验证失败")
    user = db.query(User).get(int(payload["sub"]))
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="用户不存在或已禁用")
    return user

CurrentUser = Annotated[User, Depends(get_current_user)]

@app.get("/me")
async def me(user: CurrentUser):
    return user

4. API 密钥认证

from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

async def get_api_key(key: Annotated[str, Depends(api_key_header)], db: DB) -> APIKey:
    api_key = db.query(APIKey).filter(APIKey.key == key, APIKey.is_active == True).first()
    if not api_key:
        raise HTTPException(status_code=403, detail="API 密钥无效")
    return api_key

@app.get("/data", dependencies=[Depends(get_api_key)])
async def get_data():
    return {"data": "..."}

5. 认证方案对比

方案请求头适用场景
OAuth2 BearerAuthorization: BearerWeb/移动应用
API Key (头部)X-API-Key: key服务器对服务器
API Key (查询)?api_key=key简单集成
Basic AuthAuthorization: Basic内部工具