FastAPI Authentication Guide

Complete authentication patterns for FastAPI including JWT, OAuth2 password flow, HTTP Bearer, and API key schemes.

1. JWT Token Creation & Verification

from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
import os

SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = timedelta(minutes=30)
REFRESH_TOKEN_EXPIRE = timedelta(days=7)

pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")

def create_token(data: dict, expires_in: timedelta) -> str:
    payload = data.copy()
    payload["exp"] = datetime.now(timezone.utc) + expires_in
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

def create_access_token(user_id: int, role: str) -> str:
    return create_token({"sub": str(user_id), "role": role, "type": "access"}, ACCESS_TOKEN_EXPIRE)

def create_refresh_token(user_id: int) -> str:
    return create_token({"sub": str(user_id), "type": "refresh"}, REFRESH_TOKEN_EXPIRE)

def decode_token(token: str) -> dict:
    try:
        return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    except JWTError as e:
        raise HTTPException(status_code=401, detail=str(e))

2. OAuth2 Password Flow

from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")

class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

@app.post("/auth/token", response_model=TokenResponse)
async def login(form: OAuth2PasswordRequestForm = Depends(), db: DB = Depends(get_db)):
    user = db.query(User).filter(User.email == form.username).first()
    if not user or not pwd_ctx.verify(form.password, user.hashed_password):
        raise HTTPException(
            status_code=401,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return TokenResponse(
        access_token=create_access_token(user.id, user.role),
        refresh_token=create_refresh_token(user.id),
    )

@app.post("/auth/refresh")
async def refresh(refresh_token: str, db: DB = Depends(get_db)):
    payload = decode_token(refresh_token)
    if payload.get("type") != "refresh":
        raise HTTPException(status_code=401, detail="Not a refresh token")
    user = db.query(User).get(int(payload["sub"]))
    return {"access_token": create_access_token(user.id, user.role), "token_type": "bearer"}

3. Current User Dependency

from typing import Annotated

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: DB,
) -> User:
    payload = decode_token(token)
    if payload.get("type") != "access":
        raise HTTPException(status_code=401, detail="Invalid token type")
    user = db.query(User).get(int(payload["sub"]))
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="Inactive or missing user")
    return user

CurrentUser = Annotated[User, Depends(get_current_user)]

def require_role(*roles: str):
    async def checker(current_user: CurrentUser) -> User:
        if current_user.role not in roles:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return current_user
    return checker

@app.get("/me")
async def me(user: CurrentUser):
    return user

@app.delete("/admin/users/{user_id}")
async def admin_delete(user_id: int, _: Annotated[User, Depends(require_role("admin"))], db: DB):
    db.query(User).filter(User.id == user_id).delete()
    db.commit()

4. API Key Authentication

from fastapi.security import APIKeyHeader, APIKeyQuery

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query  = APIKeyQuery(name="api_key", auto_error=False)

async def get_api_key(
    header_key: Annotated[str | None, Depends(api_key_header)],
    query_key:  Annotated[str | None, Depends(api_key_query)],
    db: DB,
) -> APIKey:
    token = header_key or query_key
    if not token:
        raise HTTPException(status_code=403, detail="API key required")
    key = db.query(APIKey).filter(APIKey.key == token, APIKey.is_active == True).first()
    if not key:
        raise HTTPException(status_code=403, detail="Invalid API key")
    key.last_used_at = datetime.utcnow()
    db.commit()
    return key

@app.get("/data", dependencies=[Depends(get_api_key)])
async def get_data():
    return {"data": "..."}

5. HTTPBearer Security

from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

bearer_scheme = HTTPBearer(auto_error=False)

async def verify_bearer(
    credentials: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer_scheme)],
) -> dict:
    if not credentials:
        raise HTTPException(status_code=401, detail="Bearer token required")
    return decode_token(credentials.credentials)

@app.get("/protected")
async def protected(payload: Annotated[dict, Depends(verify_bearer)]):
    return {"user_id": payload["sub"]}

6. Auth Schemes Comparison

SchemeHeaderBest ForFastAPI Class
OAuth2 BearerAuthorization: BearerWeb/mobile appsOAuth2PasswordBearer
HTTP BearerAuthorization: BearerMicroservicesHTTPBearer
API Key (header)X-API-Key: keyServer-to-serverAPIKeyHeader
API Key (query)?api_key=keySimple integrationsAPIKeyQuery
Basic AuthAuthorization: BasicInternal toolsHTTPBasic