DevToolBox免费
博客

Python Web开发指南:Django vs FastAPI vs Flask、SQLAlchemy、Celery和部署

16 分钟阅读作者 DevToolBox

TL;DR

新项目 API 开发选 FastAPI(异步、自动文档、Pydantic 验证);需要完整 admin 后台和 ORM 的全栈 Web 应用选 Django;轻量级微服务或原型开发选 Flask。非 Django 项目的标准 ORM 是 SQLAlchemy 2.0,迁移用 Alembic,后台任务用 Celery,测试用 pytest,生产环境用 Gunicorn+Uvicorn 配合 Nginx。

核心要点

  • FastAPI 适合高性能 API 开发,内置 Pydantic 验证、自动 OpenAPI 文档和原生异步支持。
  • Django 的"开箱即用"优势(ORM、Admin、Auth、迁移)使其成为全栈 Web 应用的首选。
  • SQLAlchemy 2.0 引入了统一的 2.0 查询风格和原生异步会话,是非 Django 项目的标准 ORM。
  • Alembic 迁移脚本必须提交到版本控制,并在每次部署时优先于应用启动执行。
  • 生产环境使用 Gunicorn + UvicornWorker(ASGI)或 Gunicorn + sync worker(WSGI/Django),配合 Nginx 反向代理。
  • 使用 pytest 和 FastAPI TestClient 或 Django test client 编写测试,永远不要在没有测试覆盖的情况下部署。

1. Django vs FastAPI vs Flask — 如何选择?

Python 生态中有三个主流 Web 框架,各有其适用场景。选择正确的框架是项目成功的第一步,错误的选择会在后期带来大量技术债务。

Framework Comparison: Django vs FastAPI vs Flask
=================================================

Feature               Django          FastAPI         Flask
-------               ------          -------         -----
Type                  Full-stack      API-first       Micro
ORM                   Built-in        None (SQLAlch.) None (SQLAlch.)
Admin Panel           Built-in        None            None
Auth System           Built-in        Manual/Libs     Manual/Libs
Migrations            Built-in        Alembic         Alembic
Async Support         Partial (3.1+)  Native (ASGI)   No (WSGI)
Data Validation       Forms/DRF       Pydantic        Manual/Marshmallow
Auto API Docs         No              Yes (OpenAPI)   No
Performance           Medium          Very High       Medium
Learning Curve        Steep           Medium          Low
Best For              Full-stack apps APIs/Microsvcs  Prototypes/Simple

Choose Django when:
  - You need a relational database with admin interface
  - Building a CMS, e-commerce, or content platform
  - Team is familiar with Django ecosystem
  - Need batteries-included: auth, sessions, forms

Choose FastAPI when:
  - Building REST APIs or GraphQL backends
  - Need high throughput with async I/O
  - Serving ML models or data science APIs
  - Want automatic Swagger/ReDoc documentation

Choose Flask when:
  - Building a small microservice or prototype
  - Need maximum flexibility in stack assembly
  - Minimal overhead and simplicity are priorities

2. FastAPI 深度指南:异步路由、Pydantic 与 JWT 认证

FastAPI 基于 Starlette(ASGI 框架)和 Pydantic(数据验证),提供接近 Node.js 和 Go 的性能,同时保持 Python 的开发体验。其自动生成的 OpenAPI 文档(Swagger UI 和 ReDoc)是构建公共 API 和团队协作的重要特性。

项目结构与基本设置

# Install dependencies
pip install fastapi uvicorn[standard] pydantic pydantic-settings
pip install sqlalchemy asyncpg alembic python-jose[cryptography] passlib[bcrypt]

# Recommended project structure
myapp/
  app/
    __init__.py
    main.py          # FastAPI app instance, startup/shutdown events
    config.py        # Settings via pydantic-settings
    database.py      # SQLAlchemy engine and session
    models/          # SQLAlchemy ORM models
      __init__.py
      user.py
    schemas/         # Pydantic request/response models
      __init__.py
      user.py
    routers/         # APIRouter modules
      __init__.py
      users.py
      auth.py
    services/        # Business logic
      user_service.py
    dependencies.py  # Shared Depends() functions
  tests/
    conftest.py
    test_users.py
  alembic/
    versions/
  alembic.ini
  requirements.txt
  Dockerfile

异步路由与 Pydantic 模型

# app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime


class UserBase(BaseModel):
    email: EmailStr
    username: str = Field(..., min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_]+$")
    full_name: Optional[str] = Field(None, max_length=100)


class UserCreate(UserBase):
    password: str = Field(..., min_length=8)


class UserUpdate(BaseModel):
    full_name: Optional[str] = None
    email: Optional[EmailStr] = None


class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime

    model_config = {"from_attributes": True}  # Pydantic v2: replaces orm_mode


# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List

from app.database import get_db
from app.schemas.user import UserCreate, UserResponse, UserUpdate
from app.services.user_service import UserService
from app.dependencies import get_current_user


router = APIRouter(prefix="/users", tags=["users"])


@router.get("/", response_model=List[UserResponse])
async def list_users(
    skip: int = 0,
    limit: int = 100,
    db: AsyncSession = Depends(get_db),
    current_user = Depends(get_current_user),
):
    """List all users (paginated). Requires authentication."""
    service = UserService(db)
    return await service.get_users(skip=skip, limit=limit)


@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
    user_in: UserCreate,
    db: AsyncSession = Depends(get_db),
):
    """Register a new user."""
    service = UserService(db)
    existing = await service.get_by_email(user_in.email)
    if existing:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Email already registered",
        )
    return await service.create_user(user_in)


@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
    user_id: int,
    db: AsyncSession = Depends(get_db),
):
    service = UserService(db)
    user = await service.get_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

JWT 认证实现

# app/routers/auth.py
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession

from app.config import settings
from app.database import get_db
from app.services.user_service import UserService


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
router = APIRouter(prefix="/auth", tags=["auth"])


def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
    payload = data.copy()
    payload["exp"] = datetime.utcnow() + expires_delta
    return jwt.encode(payload, settings.secret_key, algorithm="HS256")


@router.post("/token")
async def login(
    form: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    service = UserService(db)
    user = await service.get_by_email(form.username)
    if not user or not pwd_context.verify(form.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    token = create_access_token({"sub": str(user.id)})
    return {"access_token": token, "token_type": "bearer"}


# app/dependencies.py
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    service = UserService(db)
    user = await service.get_by_id(int(user_id))
    if user is None:
        raise credentials_exception
    return user

3. Django 核心:ORM、视图、模板与 REST Framework

Django 以"开箱即用"著称。其 ORM 通过 Python 类定义数据模型,自动生成迁移文件,内置的 Admin 界面可以零配置实现数据管理后台,Django REST Framework(DRF)则提供了完整的 API 构建工具集。

Django ORM:模型定义与查询

# blog/models.py
from django.db import models
from django.contrib.auth.models import User
from django.utils.text import slugify


class Category(models.Model):
    name = models.CharField(max_length=100, unique=True)
    slug = models.SlugField(unique=True)

    class Meta:
        verbose_name_plural = "categories"
        ordering = ["name"]

    def __str__(self):
        return self.name


class Post(models.Model):
    STATUS_DRAFT = "draft"
    STATUS_PUBLISHED = "published"
    STATUS_CHOICES = [(STATUS_DRAFT, "Draft"), (STATUS_PUBLISHED, "Published")]

    title = models.CharField(max_length=255)
    slug = models.SlugField(unique=True)
    author = models.ForeignKey(User, on_delete=models.CASCADE, related_name="posts")
    category = models.ForeignKey(
        Category, on_delete=models.SET_NULL, null=True, blank=True, related_name="posts"
    )
    tags = models.ManyToManyField("Tag", blank=True, related_name="posts")
    body = models.TextField()
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_DRAFT)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    published_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        ordering = ["-created_at"]
        indexes = [
            models.Index(fields=["status", "-published_at"]),
            models.Index(fields=["author", "status"]),
        ]

    def save(self, *args, **kwargs):
        if not self.slug:
            self.slug = slugify(self.title)
        super().save(*args, **kwargs)


# ORM queries — efficient patterns
# select_related() for ForeignKey (SQL JOIN, avoids N+1)
posts = Post.objects.select_related("author", "category").filter(
    status=Post.STATUS_PUBLISHED
).order_by("-published_at")[:20]

# prefetch_related() for ManyToMany (separate queries, Python-side join)
posts_with_tags = Post.objects.prefetch_related("tags").filter(
    status=Post.STATUS_PUBLISHED
)

# Aggregation
from django.db.models import Count, Avg
stats = Post.objects.values("category__name").annotate(
    post_count=Count("id")
).order_by("-post_count")

# only() — load specific fields (reduces memory)
titles = Post.objects.only("title", "slug", "published_at").filter(
    status=Post.STATUS_PUBLISHED
)

Django REST Framework (DRF)

# blog/serializers.py
from rest_framework import serializers
from .models import Post, Category


class CategorySerializer(serializers.ModelSerializer):
    class Meta:
        model = Category
        fields = ["id", "name", "slug"]


class PostSerializer(serializers.ModelSerializer):
    author_name = serializers.CharField(source="author.get_full_name", read_only=True)
    category = CategorySerializer(read_only=True)
    category_id = serializers.PrimaryKeyRelatedField(
        queryset=Category.objects.all(), source="category", write_only=True
    )

    class Meta:
        model = Post
        fields = [
            "id", "title", "slug", "author_name", "category", "category_id",
            "body", "status", "created_at", "published_at",
        ]
        read_only_fields = ["id", "slug", "created_at"]


# blog/views.py — ViewSet with Router generates full CRUD
from rest_framework import viewsets, permissions, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend


class PostViewSet(viewsets.ModelViewSet):
    serializer_class = PostSerializer
    permission_classes = [permissions.IsAuthenticatedOrReadOnly]
    filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
    filterset_fields = ["status", "category"]
    search_fields = ["title", "body"]
    ordering_fields = ["created_at", "published_at"]

    def get_queryset(self):
        return Post.objects.select_related("author", "category").prefetch_related("tags")

    def perform_create(self, serializer):
        serializer.save(author=self.request.user)

    @action(detail=False, methods=["get"])
    def published(self, request):
        qs = self.get_queryset().filter(status=Post.STATUS_PUBLISHED)
        serializer = self.get_serializer(qs, many=True)
        return Response(serializer.data)


# blog/urls.py
from rest_framework.routers import DefaultRouter
from .views import PostViewSet

router = DefaultRouter()
router.register(r"posts", PostViewSet, basename="post")
urlpatterns = router.urls
# Generates: GET/POST /posts/, GET/PUT/PATCH/DELETE /posts/{id}/, GET /posts/published/

4. Flask:轻量级应用、Blueprints 与 SQLAlchemy

Flask 是一个微框架,核心极为精简,通过扩展库按需添加功能。Flask Blueprints 用于模块化组织路由,Flask-SQLAlchemy 集成 SQLAlchemy ORM,Flask-Migrate(基于 Alembic)处理数据库迁移。

# pip install flask flask-sqlalchemy flask-migrate flask-jwt-extended

# app/__init__.py — Application factory pattern
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

db = SQLAlchemy()
migrate = Migrate()


def create_app(config=None):
    app = Flask(__name__)
    app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://user:pass@localhost/mydb"
    app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
    app.config["SECRET_KEY"] = "change-me-in-production"

    db.init_app(app)
    migrate.init_app(app, db)

    # Register blueprints
    from app.auth.routes import auth_bp
    from app.api.routes import api_bp
    app.register_blueprint(auth_bp, url_prefix="/auth")
    app.register_blueprint(api_bp, url_prefix="/api/v1")

    return app


# app/api/routes.py — Blueprint
from flask import Blueprint, jsonify, request, abort
from app import db
from app.models import Post

api_bp = Blueprint("api", __name__)


@api_bp.route("/posts", methods=["GET"])
def list_posts():
    page = request.args.get("page", 1, type=int)
    per_page = request.args.get("per_page", 20, type=int)
    pagination = Post.query.order_by(Post.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False
    )
    return jsonify({
        "posts": [p.to_dict() for p in pagination.items],
        "total": pagination.total,
        "pages": pagination.pages,
        "current_page": page,
    })


@api_bp.route("/posts/<int:post_id>", methods=["GET"])
def get_post(post_id):
    post = db.get_or_404(Post, post_id)
    return jsonify(post.to_dict())


@api_bp.errorhandler(404)
def not_found(error):
    return jsonify({"error": "Resource not found"}), 404

5. Python 异步编程:asyncio 与 aiohttp

Python 的 asyncio 模块提供了基于事件循环的异步 I/O 框架。在 Web 开发中,异步对于提高 I/O 密集型服务(数据库查询、外部 API 调用、文件操作)的并发性能至关重要。aiohttp 是最流行的异步 HTTP 客户端/服务器库。

# asyncio fundamentals
import asyncio
import aiohttp
from typing import List


# Basic coroutine
async def fetch_user(session: aiohttp.ClientSession, user_id: int) -> dict:
    url = f"https://api.example.com/users/{user_id}"
    async with session.get(url) as response:
        response.raise_for_status()
        return await response.json()


# Fetch multiple URLs concurrently (not sequentially)
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
    async with aiohttp.ClientSession() as session:
        # asyncio.gather runs coroutines concurrently
        tasks = [fetch_user(session, uid) for uid in user_ids]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if not isinstance(r, Exception)]


# Timeout and retry with asyncio
async def fetch_with_timeout(url: str, timeout_sec: float = 5.0) -> dict:
    timeout = aiohttp.ClientTimeout(total=timeout_sec)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        try:
            async with session.get(url) as resp:
                return await resp.json()
        except asyncio.TimeoutError:
            raise TimeoutError(f"Request to {url} timed out after {timeout_sec}s")


# Run CPU-bound work without blocking the event loop
import concurrent.futures


async def process_image_async(image_data: bytes) -> bytes:
    loop = asyncio.get_event_loop()
    # Run CPU-bound work in a thread pool
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, _process_image_sync, image_data)
    return result


def _process_image_sync(image_data: bytes) -> bytes:
    # Synchronous CPU-bound image processing
    from PIL import Image
    import io
    img = Image.open(io.BytesIO(image_data))
    img = img.resize((800, 600))
    output = io.BytesIO()
    img.save(output, format="JPEG", quality=85)
    return output.getvalue()


# asyncio.Semaphore — limit concurrent connections
async def fetch_with_semaphore(urls: List[str], max_concurrent: int = 10):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_one(session, url):
        async with semaphore:
            async with session.get(url) as resp:
                return await resp.text()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks)

6. SQLAlchemy 2.0:Core、ORM 与异步会话

SQLAlchemy 2.0 带来了重大更新:统一的 2.0 查询风格(不再使用 Session.query(),改用 select() 语句),原生 AsyncSession 支持,以及改进的类型注解。2.0 是非 Django 项目的首选 ORM。

ORM 模型定义(2.0 风格)

# app/models/user.py — SQLAlchemy 2.0 declarative style
from sqlalchemy import String, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from datetime import datetime
from typing import Optional, List


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

    # Relationship: one user has many posts
    posts: Mapped[List["Post"]] = relationship("Post", back_populates="author")


class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    title: Mapped[str] = mapped_column(String(255))
    body: Mapped[str] = mapped_column(Text)
    published: Mapped[bool] = mapped_column(Boolean, default=False)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

    author: Mapped["User"] = relationship("User", back_populates="posts")

异步会话与 CRUD 操作

# app/database.py — Async engine and session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.models import Base

# Use asyncpg driver for PostgreSQL async
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"

engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10, max_overflow=20)
AsyncSessionLocal = async_sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)


async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


# app/services/user_service.py — SQLAlchemy 2.0 query style
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from app.models.user import User
from app.schemas.user import UserCreate
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class UserService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def get_by_id(self, user_id: int) -> User | None:
        # SQLAlchemy 2.0 style: use select() instead of session.query()
        result = await self.db.execute(select(User).where(User.id == user_id))
        return result.scalar_one_or_none()

    async def get_by_email(self, email: str) -> User | None:
        result = await self.db.execute(select(User).where(User.email == email))
        return result.scalar_one_or_none()

    async def get_users(self, skip: int = 0, limit: int = 100) -> list[User]:
        result = await self.db.execute(
            select(User).where(User.is_active == True).offset(skip).limit(limit)
        )
        return list(result.scalars().all())

    async def create_user(self, user_in: UserCreate) -> User:
        user = User(
            email=user_in.email,
            username=user_in.username,
            hashed_password=pwd_context.hash(user_in.password),
        )
        self.db.add(user)
        await self.db.flush()  # Get the generated id without committing
        await self.db.refresh(user)
        return user

    async def deactivate_user(self, user_id: int) -> None:
        await self.db.execute(
            update(User).where(User.id == user_id).values(is_active=False)
        )

7. Alembic 数据库迁移

Alembic 是 SQLAlchemy 官方的数据库迁移工具,支持自动检测模型变更生成迁移脚本,也支持手动编写复杂迁移逻辑。所有迁移脚本必须提交到版本控制。

# Initialize Alembic in your project
pip install alembic
alembic init alembic

# alembic/env.py — configure for async SQLAlchemy
import asyncio
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from app.models import Base  # import your models

config = context.config
target_metadata = Base.metadata


def run_migrations_offline():
    context.configure(
        url=config.get_main_option("sqlalchemy.url"),
        target_metadata=target_metadata,
        literal_binds=True,
    )
    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online():
    connectable = create_async_engine(
        config.get_main_option("sqlalchemy.url")
    )
    async with connectable.connect() as connection:
        await connection.run_sync(
            lambda sync_conn: context.configure(
                connection=sync_conn,
                target_metadata=target_metadata,
            )
        )
        async with connection.begin():
            await connection.run_sync(lambda _: context.run_migrations())


if context.is_offline_mode():
    run_migrations_offline()
else:
    asyncio.run(run_migrations_online())


# Common Alembic commands
# Generate migration from model changes (auto-detect)
alembic revision --autogenerate -m "add users table"

# Apply all pending migrations
alembic upgrade head

# Roll back the last migration
alembic downgrade -1

# Roll back to a specific revision
alembic downgrade abc123

# View migration history
alembic history --verbose

# View current revision
alembic current

8. Celery 后台任务队列

Celery 是 Python 生态中最流行的分布式任务队列,支持异步任务、定时任务(Celery Beat)、任务重试、结果存储和任务链。常见的消息代理(Broker)是 Redis 或 RabbitMQ,任务结果后端推荐 Redis。

# pip install celery redis

# app/celery_app.py
from celery import Celery

celery_app = Celery(
    "myapp",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=["app.tasks"],
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    enable_utc=True,
    task_acks_late=True,           # Ack only after task completes
    worker_prefetch_multiplier=1,   # Prevent worker overload
)


# app/tasks.py
from app.celery_app import celery_app
import time


@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int, email: str):
    """Send welcome email — retries up to 3 times on failure."""
    try:
        # Your email sending logic here
        _send_email(to=email, subject="Welcome!", body="...")
    except Exception as exc:
        raise self.retry(exc=exc)


@celery_app.task
def process_video(video_id: int, output_format: str = "mp4"):
    """Long-running video processing task."""
    # Process video...
    return {"video_id": video_id, "status": "completed"}


# Call tasks from FastAPI/Django/Flask
# .delay() is a shortcut for .apply_async()
send_welcome_email.delay(user_id=42, email="user@example.com")

# .apply_async() with options
process_video.apply_async(
    args=[video_id],
    kwargs={"output_format": "webm"},
    countdown=10,         # Start after 10 seconds
    expires=3600,         # Discard if not consumed in 1 hour
    priority=5,           # 0-9, higher = higher priority
)


# Celery Beat — Periodic tasks (cron-like scheduling)
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "daily-report": {
        "task": "app.tasks.generate_daily_report",
        "schedule": crontab(hour=8, minute=0),  # Every day at 8:00 AM
    },
    "cleanup-sessions": {
        "task": "app.tasks.cleanup_expired_sessions",
        "schedule": 300.0,  # Every 300 seconds
    },
}


# Start workers
celery -A app.celery_app worker --loglevel=info --concurrency=4
celery -A app.celery_app beat --loglevel=info

9. 测试 Python Web 应用:pytest 与 FastAPI TestClient

pytest 是 Python 测试的事实标准。FastAPI 提供了基于 httpx 的 TestClient,可以在不启动服务器的情况下测试 API 端点。测试策略包括单元测试(业务逻辑)、集成测试(数据库交互)和端到端测试(完整 API 流程)。

# pip install pytest pytest-asyncio httpx pytest-cov

# tests/conftest.py — shared fixtures
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.main import app
from app.database import get_db
from app.models import Base

TEST_DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/test_db"


@pytest.fixture(scope="session")
def event_loop():
    import asyncio
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(scope="session")
async def test_engine():
    engine = create_async_engine(TEST_DATABASE_URL, echo=False)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield engine
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    await engine.dispose()


@pytest.fixture
async def db_session(test_engine):
    TestSession = async_sessionmaker(test_engine, expire_on_commit=False)
    async with TestSession() as session:
        yield session
        await session.rollback()  # Roll back after each test


@pytest.fixture
async def client(db_session):
    def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db
    async with AsyncClient(app=app, base_url="http://test") as ac:
        yield ac
    app.dependency_overrides.clear()


# tests/test_users.py
import pytest


@pytest.mark.asyncio
async def test_create_user(client):
    response = await client.post("/users/", json={
        "email": "test@example.com",
        "username": "testuser",
        "password": "securepassword123",
    })
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert "id" in data
    assert "password" not in data  # Never expose passwords


@pytest.mark.asyncio
async def test_duplicate_email_returns_409(client):
    payload = {"email": "dup@example.com", "username": "user1", "password": "pass12345"}
    await client.post("/users/", json=payload)
    # Second registration with same email
    response = await client.post("/users/", json={**payload, "username": "user2"})
    assert response.status_code == 409


@pytest.mark.asyncio
async def test_get_user_not_found(client):
    response = await client.get("/users/99999")
    assert response.status_code == 404


# Run tests
pytest tests/ -v --cov=app --cov-report=html

10. 生产部署:Gunicorn、Uvicorn 与 Docker

Python Web 应用的生产部署推荐使用 Gunicorn 作为进程管理器,配合 UvicornWorker(用于 ASGI/FastAPI)或默认同步 worker(用于 Django/Flask)。始终在 Nginx 或 Caddy 等反向代理后面运行 Python 服务,处理 SSL 终止、静态文件和请求缓冲。

Gunicorn + Uvicorn(FastAPI)

# FastAPI / ASGI app
# -w: worker count (recommended: 2 * CPU cores + 1)
# -k: worker class
# -b: bind address
# --timeout: worker timeout in seconds
gunicorn app.main:app \
    -w 4 \
    -k uvicorn.workers.UvicornWorker \
    -b 0.0.0.0:8000 \
    --timeout 120 \
    --keep-alive 5 \
    --access-logfile - \
    --error-logfile -

# Django / WSGI app
gunicorn myproject.wsgi:application \
    -w 4 \
    -b 0.0.0.0:8000 \
    --timeout 120

# gunicorn.conf.py — configuration file
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
timeout = 120
keepalive = 5
max_requests = 1000          # Restart workers after N requests (prevent memory leaks)
max_requests_jitter = 100    # Random jitter to prevent all workers restarting at once
loglevel = "info"
accesslog = "-"
errorlog = "-"
preload_app = True           # Load app code before forking workers (saves memory)

Docker 容器化

# Dockerfile — multi-stage for FastAPI
FROM python:3.12-slim AS builder

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt


# Production stage
FROM python:3.12-slim

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends libpq5 \
    && rm -rf /var/lib/apt/lists/*

# Copy installed packages from builder
COPY --from=builder /root/.local /root/.local

# Copy application code
COPY . .

# Create non-root user for security
RUN useradd --no-create-home --shell /bin/false appuser
USER appuser

ENV PATH=/root/.local/bin:$PATH
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

EXPOSE 8000

CMD ["gunicorn", "app.main:app", "-w", "4", "-k", \
     "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]


# docker-compose.yml
version: "3.9"
services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://postgres:password@db/mydb
      - SECRET_KEY=${SECRET_KEY}
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_started

  db:
    image: postgres:16-alpine
    volumes:
      - pgdata:/var/lib/postgresql/data
    environment:
      POSTGRES_PASSWORD: password
      POSTGRES_DB: mydb
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 5

  redis:
    image: redis:7-alpine

  worker:
    build: .
    command: celery -A app.celery_app worker -l info -c 4
    environment:
      - DATABASE_URL=postgresql+asyncpg://postgres:password@db/mydb
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis

volumes:
  pgdata:

11. 框架详细对比表

以下是 Django、FastAPI 和 Flask 在各维度的详细对比,帮助你根据项目需求做出最终选择:

维度DjangoFastAPIFlask
首次发布200520182010
架构模式MTV (Full-stack)ASGI (API-first)WSGI (Micro)
ORM内置 Django ORM无(推荐 SQLAlchemy)无(推荐 SQLAlchemy)
迁移工具内置 makemigrationsAlembicAlembic / Flask-Migrate
异步支持部分(3.1+,需 ASGI)原生(ASGI)不支持(WSGI)
数据验证Forms / DRF SerializersPydantic (built-in)Marshmallow / WTForms
API 文档DRF Browsable APIAuto Swagger + ReDoc无(需插件)
Admin 后台内置(零配置)无(需第三方)无(需 Flask-Admin)
认证系统内置 User/Session/JWTpython-jose + passlibFlask-Login + JWT
测试工具django.test.ClientTestClient (httpx)Flask test client
性能(相对)中等(同步 WSGI)高(异步 ASGI)中等(同步 WSGI)
学习曲线陡(概念多)中(Pydantic + async)低(极简)
最适合场景CMS、电商、内容平台API、ML 服务、微服务原型、简单应用

常见问题

Django、FastAPI 和 Flask 该如何选择?

选择依据:需要完整的 admin 后台、ORM 和认证系统时选 Django(适合 CMS、电商、内容平台);构建高性能 REST API 或需要自动 OpenAPI 文档时选 FastAPI(适合微服务、数据 API、ML 推理接口);构建轻量级原型或小型应用时选 Flask(灵活但需要自己搭建更多基础设施)。

FastAPI 比 Flask 快多少?

FastAPI 基于 ASGI(Starlette + Uvicorn),支持原生异步 I/O。在 I/O 密集型工作负载下,FastAPI 的吞吐量通常是 Flask(WSGI + Gunicorn)的 2-3 倍。对于 CPU 密集型任务,两者差异不大,因为都受 Python GIL 限制。真实场景下 FastAPI 每秒可处理数万个请求,而标准 Flask 配置通常在数千个。

SQLAlchemy 2.0 和 Django ORM 有什么区别?

Django ORM 与 Django 框架深度集成,API 更简洁,适合快速开发,但灵活性相对较低。SQLAlchemy 2.0 是独立的 ORM 库,可与任何框架配合使用,提供 Core(SQL 表达式语言)和 ORM 两层 API,复杂查询更灵活,支持原生异步会话(async Session)。新的 Flask/FastAPI 项目推荐使用 SQLAlchemy 2.0。

Python 异步 Web 开发中如何避免阻塞事件循环?

关键规则:所有 I/O 操作(数据库查询、HTTP 请求、文件读写)必须使用 async/await 版本的库(asyncpg、aiohttp、aiofiles)。CPU 密集型任务(图像处理、数据计算)应使用 asyncio.run_in_executor() 放入线程池或进程池执行。避免在 async 函数中调用同步的 time.sleep()、requests.get() 等阻塞函数。

Alembic 和 Django migrations 有什么区别?

Django migrations 与 Django ORM 深度集成,可自动检测模型变更并生成迁移文件(makemigrations),执行简单。Alembic 是 SQLAlchemy 的迁移工具,需要手动或半自动生成迁移脚本,但对自定义 SQL 和复杂迁移逻辑的支持更强。两者都应将迁移文件提交到版本控制,并在部署时优先于应用启动执行。

Celery 和 FastAPI 的 BackgroundTasks 有什么区别?

FastAPI BackgroundTasks 在同一进程内异步执行轻量级任务(发邮件通知、更新统计数据),任务随进程退出而终止,不支持重试或持久化。Celery 是独立的分布式任务队列,支持任务重试、定时调度、优先级队列、结果存储和跨服务分发,适合耗时任务(视频处理、大批量数据导入)。生产环境中复杂的后台任务应使用 Celery。

生产环境应该用 Gunicorn 还是 Uvicorn?

推荐组合:Gunicorn 作为进程管理器(管理多个工作进程的生命周期),配合 UvicornWorker 作为 ASGI 工作进程(处理实际请求)。命令:gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker。对于 Django(同步),使用 gunicorn -w 4 myproject.wsgi:application。纯 Uvicorn 适合容器环境(Docker + Kubernetes),由容器编排平台管理进程数量。

如何在 FastAPI 中实现 JWT 认证?

使用 python-jose 库处理 JWT,passlib 处理密码哈希。创建 /token 端点接受 OAuth2PasswordRequestForm,验证用户凭据后生成 JWT。使用 OAuth2PasswordBearer 依赖项提取 Bearer token,在保护路由中通过 Depends(get_current_user) 验证 token 并注入当前用户。将 JWT SECRET_KEY 存入环境变量,生产环境使用 RS256 非对称算法代替 HS256。

总结

Python Web 开发生态在 2026 年已经相当成熟。FastAPI 凭借原生异步支持、自动 OpenAPI 文档和 Pydantic 数据验证,成为构建新 API 服务的首选框架。Django 凭借其完整的生态和"开箱即用"的特性,在全栈 Web 应用开发中仍然占据重要地位。Flask 则在需要最大灵活性的轻量级场景中依然适用。

无论选择哪个框架,以下实践都应该坚守:使用 SQLAlchemy 2.0(或 Django ORM)管理数据层,Alembic(或 Django migrations)管理数据库变更,Celery 处理耗时后台任务,pytest 确保代码质量,以及 Docker + Gunicorn/Uvicorn + Nginx 实现生产部署。环境变量管理所有密钥,永远不要将密码或 API 密钥硬编码在代码中。

掌握这些工具和模式,你将能够构建可靠、高性能、易于维护的 Python Web 应用,从小型原型到处理每秒数万请求的生产系统。

𝕏 Twitterin LinkedIn
这篇文章有帮助吗?

保持更新

获取每周开发技巧和新工具通知。

无垃圾邮件,随时退订。

试试这些相关工具

{ }JSON Formatter.*Regex TesterB→Base64 Encoder

相关文章

Python数据科学指南:NumPy、Pandas、Scikit-learn和ML流水线

掌握Python数据科学。涵盖NumPy数组、Pandas DataFrame、Matplotlib/Seaborn可视化、数据清洗、Scikit-learn ML流水线、Jupyter笔记本、EDA到部署工作流以及Python vs R vs Julia对比。

Django指南:模型、视图、DRF REST API和部署

掌握Django Python Web框架。涵盖MTV模式、ORM和模型、视图和URL路由、Django REST Framework、JWT身份验证、Docker和Nginx部署以及Django vs Flask vs FastAPI对比。

Flask指南:路由、SQLAlchemy、REST API、JWT身份验证和部署

掌握Flask Python微框架。涵盖应用工厂模式、蓝图路由、SQLAlchemy ORM、Flask-RESTX API、JWT身份验证、pytest测试和Gunicorn/Nginx/Docker部署。