TL;DR
新项目 API 开发选 FastAPI(异步、自动文档、Pydantic 验证);需要完整 admin 后台和 ORM 的全栈 Web 应用选 Django;轻量级微服务或原型开发选 Flask。非 Django 项目的标准 ORM 是 SQLAlchemy 2.0,迁移用 Alembic,后台任务用 Celery,测试用 pytest,生产环境用 Gunicorn+Uvicorn 配合 Nginx。
核心要点
- FastAPI 适合高性能 API 开发,内置 Pydantic 验证、自动 OpenAPI 文档和原生异步支持。
- Django 的"开箱即用"优势(ORM、Admin、Auth、迁移)使其成为全栈 Web 应用的首选。
- SQLAlchemy 2.0 引入了统一的 2.0 查询风格和原生异步会话,是非 Django 项目的标准 ORM。
- Alembic 迁移脚本必须提交到版本控制,并在每次部署时优先于应用启动执行。
- 生产环境使用 Gunicorn + UvicornWorker(ASGI)或 Gunicorn + sync worker(WSGI/Django),配合 Nginx 反向代理。
- 使用 pytest 和 FastAPI TestClient 或 Django test client 编写测试,永远不要在没有测试覆盖的情况下部署。
1. Django vs FastAPI vs Flask — 如何选择?
Python 生态中有三个主流 Web 框架,各有其适用场景。选择正确的框架是项目成功的第一步,错误的选择会在后期带来大量技术债务。
Framework Comparison: Django vs FastAPI vs Flask
=================================================
Feature Django FastAPI Flask
------- ------ ------- -----
Type Full-stack API-first Micro
ORM Built-in None (SQLAlch.) None (SQLAlch.)
Admin Panel Built-in None None
Auth System Built-in Manual/Libs Manual/Libs
Migrations Built-in Alembic Alembic
Async Support Partial (3.1+) Native (ASGI) No (WSGI)
Data Validation Forms/DRF Pydantic Manual/Marshmallow
Auto API Docs No Yes (OpenAPI) No
Performance Medium Very High Medium
Learning Curve Steep Medium Low
Best For Full-stack apps APIs/Microsvcs Prototypes/Simple
Choose Django when:
- You need a relational database with admin interface
- Building a CMS, e-commerce, or content platform
- Team is familiar with Django ecosystem
- Need batteries-included: auth, sessions, forms
Choose FastAPI when:
- Building REST APIs or GraphQL backends
- Need high throughput with async I/O
- Serving ML models or data science APIs
- Want automatic Swagger/ReDoc documentation
Choose Flask when:
- Building a small microservice or prototype
- Need maximum flexibility in stack assembly
- Minimal overhead and simplicity are priorities2. FastAPI 深度指南:异步路由、Pydantic 与 JWT 认证
FastAPI 基于 Starlette(ASGI 框架)和 Pydantic(数据验证),提供接近 Node.js 和 Go 的性能,同时保持 Python 的开发体验。其自动生成的 OpenAPI 文档(Swagger UI 和 ReDoc)是构建公共 API 和团队协作的重要特性。
项目结构与基本设置
# Install dependencies
pip install fastapi uvicorn[standard] pydantic pydantic-settings
pip install sqlalchemy asyncpg alembic python-jose[cryptography] passlib[bcrypt]
# Recommended project structure
myapp/
app/
__init__.py
main.py # FastAPI app instance, startup/shutdown events
config.py # Settings via pydantic-settings
database.py # SQLAlchemy engine and session
models/ # SQLAlchemy ORM models
__init__.py
user.py
schemas/ # Pydantic request/response models
__init__.py
user.py
routers/ # APIRouter modules
__init__.py
users.py
auth.py
services/ # Business logic
user_service.py
dependencies.py # Shared Depends() functions
tests/
conftest.py
test_users.py
alembic/
versions/
alembic.ini
requirements.txt
Dockerfile异步路由与 Pydantic 模型
# app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
username: str = Field(..., min_length=3, max_length=50, pattern=r"^[a-zA-Z0-9_]+$")
full_name: Optional[str] = Field(None, max_length=100)
class UserCreate(UserBase):
password: str = Field(..., min_length=8)
class UserUpdate(BaseModel):
full_name: Optional[str] = None
email: Optional[EmailStr] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
model_config = {"from_attributes": True} # Pydantic v2: replaces orm_mode
# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from app.database import get_db
from app.schemas.user import UserCreate, UserResponse, UserUpdate
from app.services.user_service import UserService
from app.dependencies import get_current_user
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/", response_model=List[UserResponse])
async def list_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user),
):
"""List all users (paginated). Requires authentication."""
service = UserService(db)
return await service.get_users(skip=skip, limit=limit)
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db),
):
"""Register a new user."""
service = UserService(db)
existing = await service.get_by_email(user_in.email)
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered",
)
return await service.create_user(user_in)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
):
service = UserService(db)
user = await service.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return userJWT 认证实现
# app/routers/auth.py
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.services.user_service import UserService
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
router = APIRouter(prefix="/auth", tags=["auth"])
def create_access_token(data: dict, expires_delta: timedelta = timedelta(hours=1)):
payload = data.copy()
payload["exp"] = datetime.utcnow() + expires_delta
return jwt.encode(payload, settings.secret_key, algorithm="HS256")
@router.post("/token")
async def login(
form: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
service = UserService(db)
user = await service.get_by_email(form.username)
if not user or not pwd_context.verify(form.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
token = create_access_token({"sub": str(user.id)})
return {"access_token": token, "token_type": "bearer"}
# app/dependencies.py
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
service = UserService(db)
user = await service.get_by_id(int(user_id))
if user is None:
raise credentials_exception
return user3. Django 核心:ORM、视图、模板与 REST Framework
Django 以"开箱即用"著称。其 ORM 通过 Python 类定义数据模型,自动生成迁移文件,内置的 Admin 界面可以零配置实现数据管理后台,Django REST Framework(DRF)则提供了完整的 API 构建工具集。
Django ORM:模型定义与查询
# blog/models.py
from django.db import models
from django.contrib.auth.models import User
from django.utils.text import slugify
class Category(models.Model):
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(unique=True)
class Meta:
verbose_name_plural = "categories"
ordering = ["name"]
def __str__(self):
return self.name
class Post(models.Model):
STATUS_DRAFT = "draft"
STATUS_PUBLISHED = "published"
STATUS_CHOICES = [(STATUS_DRAFT, "Draft"), (STATUS_PUBLISHED, "Published")]
title = models.CharField(max_length=255)
slug = models.SlugField(unique=True)
author = models.ForeignKey(User, on_delete=models.CASCADE, related_name="posts")
category = models.ForeignKey(
Category, on_delete=models.SET_NULL, null=True, blank=True, related_name="posts"
)
tags = models.ManyToManyField("Tag", blank=True, related_name="posts")
body = models.TextField()
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_DRAFT)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
published_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ["-created_at"]
indexes = [
models.Index(fields=["status", "-published_at"]),
models.Index(fields=["author", "status"]),
]
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.title)
super().save(*args, **kwargs)
# ORM queries — efficient patterns
# select_related() for ForeignKey (SQL JOIN, avoids N+1)
posts = Post.objects.select_related("author", "category").filter(
status=Post.STATUS_PUBLISHED
).order_by("-published_at")[:20]
# prefetch_related() for ManyToMany (separate queries, Python-side join)
posts_with_tags = Post.objects.prefetch_related("tags").filter(
status=Post.STATUS_PUBLISHED
)
# Aggregation
from django.db.models import Count, Avg
stats = Post.objects.values("category__name").annotate(
post_count=Count("id")
).order_by("-post_count")
# only() — load specific fields (reduces memory)
titles = Post.objects.only("title", "slug", "published_at").filter(
status=Post.STATUS_PUBLISHED
)Django REST Framework (DRF)
# blog/serializers.py
from rest_framework import serializers
from .models import Post, Category
class CategorySerializer(serializers.ModelSerializer):
class Meta:
model = Category
fields = ["id", "name", "slug"]
class PostSerializer(serializers.ModelSerializer):
author_name = serializers.CharField(source="author.get_full_name", read_only=True)
category = CategorySerializer(read_only=True)
category_id = serializers.PrimaryKeyRelatedField(
queryset=Category.objects.all(), source="category", write_only=True
)
class Meta:
model = Post
fields = [
"id", "title", "slug", "author_name", "category", "category_id",
"body", "status", "created_at", "published_at",
]
read_only_fields = ["id", "slug", "created_at"]
# blog/views.py — ViewSet with Router generates full CRUD
from rest_framework import viewsets, permissions, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from django_filters.rest_framework import DjangoFilterBackend
class PostViewSet(viewsets.ModelViewSet):
serializer_class = PostSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_fields = ["status", "category"]
search_fields = ["title", "body"]
ordering_fields = ["created_at", "published_at"]
def get_queryset(self):
return Post.objects.select_related("author", "category").prefetch_related("tags")
def perform_create(self, serializer):
serializer.save(author=self.request.user)
@action(detail=False, methods=["get"])
def published(self, request):
qs = self.get_queryset().filter(status=Post.STATUS_PUBLISHED)
serializer = self.get_serializer(qs, many=True)
return Response(serializer.data)
# blog/urls.py
from rest_framework.routers import DefaultRouter
from .views import PostViewSet
router = DefaultRouter()
router.register(r"posts", PostViewSet, basename="post")
urlpatterns = router.urls
# Generates: GET/POST /posts/, GET/PUT/PATCH/DELETE /posts/{id}/, GET /posts/published/4. Flask:轻量级应用、Blueprints 与 SQLAlchemy
Flask 是一个微框架,核心极为精简,通过扩展库按需添加功能。Flask Blueprints 用于模块化组织路由,Flask-SQLAlchemy 集成 SQLAlchemy ORM,Flask-Migrate(基于 Alembic)处理数据库迁移。
# pip install flask flask-sqlalchemy flask-migrate flask-jwt-extended
# app/__init__.py — Application factory pattern
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
def create_app(config=None):
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "postgresql://user:pass@localhost/mydb"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["SECRET_KEY"] = "change-me-in-production"
db.init_app(app)
migrate.init_app(app, db)
# Register blueprints
from app.auth.routes import auth_bp
from app.api.routes import api_bp
app.register_blueprint(auth_bp, url_prefix="/auth")
app.register_blueprint(api_bp, url_prefix="/api/v1")
return app
# app/api/routes.py — Blueprint
from flask import Blueprint, jsonify, request, abort
from app import db
from app.models import Post
api_bp = Blueprint("api", __name__)
@api_bp.route("/posts", methods=["GET"])
def list_posts():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 20, type=int)
pagination = Post.query.order_by(Post.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
"posts": [p.to_dict() for p in pagination.items],
"total": pagination.total,
"pages": pagination.pages,
"current_page": page,
})
@api_bp.route("/posts/<int:post_id>", methods=["GET"])
def get_post(post_id):
post = db.get_or_404(Post, post_id)
return jsonify(post.to_dict())
@api_bp.errorhandler(404)
def not_found(error):
return jsonify({"error": "Resource not found"}), 4045. Python 异步编程:asyncio 与 aiohttp
Python 的 asyncio 模块提供了基于事件循环的异步 I/O 框架。在 Web 开发中,异步对于提高 I/O 密集型服务(数据库查询、外部 API 调用、文件操作)的并发性能至关重要。aiohttp 是最流行的异步 HTTP 客户端/服务器库。
# asyncio fundamentals
import asyncio
import aiohttp
from typing import List
# Basic coroutine
async def fetch_user(session: aiohttp.ClientSession, user_id: int) -> dict:
url = f"https://api.example.com/users/{user_id}"
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
# Fetch multiple URLs concurrently (not sequentially)
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
async with aiohttp.ClientSession() as session:
# asyncio.gather runs coroutines concurrently
tasks = [fetch_user(session, uid) for uid in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
# Timeout and retry with asyncio
async def fetch_with_timeout(url: str, timeout_sec: float = 5.0) -> dict:
timeout = aiohttp.ClientTimeout(total=timeout_sec)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as resp:
return await resp.json()
except asyncio.TimeoutError:
raise TimeoutError(f"Request to {url} timed out after {timeout_sec}s")
# Run CPU-bound work without blocking the event loop
import concurrent.futures
async def process_image_async(image_data: bytes) -> bytes:
loop = asyncio.get_event_loop()
# Run CPU-bound work in a thread pool
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, _process_image_sync, image_data)
return result
def _process_image_sync(image_data: bytes) -> bytes:
# Synchronous CPU-bound image processing
from PIL import Image
import io
img = Image.open(io.BytesIO(image_data))
img = img.resize((800, 600))
output = io.BytesIO()
img.save(output, format="JPEG", quality=85)
return output.getvalue()
# asyncio.Semaphore — limit concurrent connections
async def fetch_with_semaphore(urls: List[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_one(session, url):
async with semaphore:
async with session.get(url) as resp:
return await resp.text()
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks)6. SQLAlchemy 2.0:Core、ORM 与异步会话
SQLAlchemy 2.0 带来了重大更新:统一的 2.0 查询风格(不再使用 Session.query(),改用 select() 语句),原生 AsyncSession 支持,以及改进的类型注解。2.0 是非 Django 项目的首选 ORM。
ORM 模型定义(2.0 风格)
# app/models/user.py — SQLAlchemy 2.0 declarative style
from sqlalchemy import String, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from datetime import datetime
from typing import Optional, List
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
# Relationship: one user has many posts
posts: Mapped[List["Post"]] = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
title: Mapped[str] = mapped_column(String(255))
body: Mapped[str] = mapped_column(Text)
published: Mapped[bool] = mapped_column(Boolean, default=False)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
author: Mapped["User"] = relationship("User", back_populates="posts")异步会话与 CRUD 操作
# app/database.py — Async engine and session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.models import Base
# Use asyncpg driver for PostgreSQL async
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10, max_overflow=20)
AsyncSessionLocal = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# app/services/user_service.py — SQLAlchemy 2.0 query style
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, delete
from app.models.user import User
from app.schemas.user import UserCreate
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
async def get_by_id(self, user_id: int) -> User | None:
# SQLAlchemy 2.0 style: use select() instead of session.query()
result = await self.db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> User | None:
result = await self.db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def get_users(self, skip: int = 0, limit: int = 100) -> list[User]:
result = await self.db.execute(
select(User).where(User.is_active == True).offset(skip).limit(limit)
)
return list(result.scalars().all())
async def create_user(self, user_in: UserCreate) -> User:
user = User(
email=user_in.email,
username=user_in.username,
hashed_password=pwd_context.hash(user_in.password),
)
self.db.add(user)
await self.db.flush() # Get the generated id without committing
await self.db.refresh(user)
return user
async def deactivate_user(self, user_id: int) -> None:
await self.db.execute(
update(User).where(User.id == user_id).values(is_active=False)
)7. Alembic 数据库迁移
Alembic 是 SQLAlchemy 官方的数据库迁移工具,支持自动检测模型变更生成迁移脚本,也支持手动编写复杂迁移逻辑。所有迁移脚本必须提交到版本控制。
# Initialize Alembic in your project
pip install alembic
alembic init alembic
# alembic/env.py — configure for async SQLAlchemy
import asyncio
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from app.models import Base # import your models
config = context.config
target_metadata = Base.metadata
def run_migrations_offline():
context.configure(
url=config.get_main_option("sqlalchemy.url"),
target_metadata=target_metadata,
literal_binds=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online():
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url")
)
async with connectable.connect() as connection:
await connection.run_sync(
lambda sync_conn: context.configure(
connection=sync_conn,
target_metadata=target_metadata,
)
)
async with connection.begin():
await connection.run_sync(lambda _: context.run_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
# Common Alembic commands
# Generate migration from model changes (auto-detect)
alembic revision --autogenerate -m "add users table"
# Apply all pending migrations
alembic upgrade head
# Roll back the last migration
alembic downgrade -1
# Roll back to a specific revision
alembic downgrade abc123
# View migration history
alembic history --verbose
# View current revision
alembic current8. Celery 后台任务队列
Celery 是 Python 生态中最流行的分布式任务队列,支持异步任务、定时任务(Celery Beat)、任务重试、结果存储和任务链。常见的消息代理(Broker)是 Redis 或 RabbitMQ,任务结果后端推荐 Redis。
# pip install celery redis
# app/celery_app.py
from celery import Celery
celery_app = Celery(
"myapp",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=["app.tasks"],
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
task_acks_late=True, # Ack only after task completes
worker_prefetch_multiplier=1, # Prevent worker overload
)
# app/tasks.py
from app.celery_app import celery_app
import time
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int, email: str):
"""Send welcome email — retries up to 3 times on failure."""
try:
# Your email sending logic here
_send_email(to=email, subject="Welcome!", body="...")
except Exception as exc:
raise self.retry(exc=exc)
@celery_app.task
def process_video(video_id: int, output_format: str = "mp4"):
"""Long-running video processing task."""
# Process video...
return {"video_id": video_id, "status": "completed"}
# Call tasks from FastAPI/Django/Flask
# .delay() is a shortcut for .apply_async()
send_welcome_email.delay(user_id=42, email="user@example.com")
# .apply_async() with options
process_video.apply_async(
args=[video_id],
kwargs={"output_format": "webm"},
countdown=10, # Start after 10 seconds
expires=3600, # Discard if not consumed in 1 hour
priority=5, # 0-9, higher = higher priority
)
# Celery Beat — Periodic tasks (cron-like scheduling)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"daily-report": {
"task": "app.tasks.generate_daily_report",
"schedule": crontab(hour=8, minute=0), # Every day at 8:00 AM
},
"cleanup-sessions": {
"task": "app.tasks.cleanup_expired_sessions",
"schedule": 300.0, # Every 300 seconds
},
}
# Start workers
celery -A app.celery_app worker --loglevel=info --concurrency=4
celery -A app.celery_app beat --loglevel=info9. 测试 Python Web 应用:pytest 与 FastAPI TestClient
pytest 是 Python 测试的事实标准。FastAPI 提供了基于 httpx 的 TestClient,可以在不启动服务器的情况下测试 API 端点。测试策略包括单元测试(业务逻辑)、集成测试(数据库交互)和端到端测试(完整 API 流程)。
# pip install pytest pytest-asyncio httpx pytest-cov
# tests/conftest.py — shared fixtures
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.main import app
from app.database import get_db
from app.models import Base
TEST_DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/test_db"
@pytest.fixture(scope="session")
def event_loop():
import asyncio
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
async def test_engine():
engine = create_async_engine(TEST_DATABASE_URL, echo=False)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def db_session(test_engine):
TestSession = async_sessionmaker(test_engine, expire_on_commit=False)
async with TestSession() as session:
yield session
await session.rollback() # Roll back after each test
@pytest.fixture
async def client(db_session):
def override_get_db():
yield db_session
app.dependency_overrides[get_db] = override_get_db
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
app.dependency_overrides.clear()
# tests/test_users.py
import pytest
@pytest.mark.asyncio
async def test_create_user(client):
response = await client.post("/users/", json={
"email": "test@example.com",
"username": "testuser",
"password": "securepassword123",
})
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert "id" in data
assert "password" not in data # Never expose passwords
@pytest.mark.asyncio
async def test_duplicate_email_returns_409(client):
payload = {"email": "dup@example.com", "username": "user1", "password": "pass12345"}
await client.post("/users/", json=payload)
# Second registration with same email
response = await client.post("/users/", json={**payload, "username": "user2"})
assert response.status_code == 409
@pytest.mark.asyncio
async def test_get_user_not_found(client):
response = await client.get("/users/99999")
assert response.status_code == 404
# Run tests
pytest tests/ -v --cov=app --cov-report=html10. 生产部署:Gunicorn、Uvicorn 与 Docker
Python Web 应用的生产部署推荐使用 Gunicorn 作为进程管理器,配合 UvicornWorker(用于 ASGI/FastAPI)或默认同步 worker(用于 Django/Flask)。始终在 Nginx 或 Caddy 等反向代理后面运行 Python 服务,处理 SSL 终止、静态文件和请求缓冲。
Gunicorn + Uvicorn(FastAPI)
# FastAPI / ASGI app
# -w: worker count (recommended: 2 * CPU cores + 1)
# -k: worker class
# -b: bind address
# --timeout: worker timeout in seconds
gunicorn app.main:app \
-w 4 \
-k uvicorn.workers.UvicornWorker \
-b 0.0.0.0:8000 \
--timeout 120 \
--keep-alive 5 \
--access-logfile - \
--error-logfile -
# Django / WSGI app
gunicorn myproject.wsgi:application \
-w 4 \
-b 0.0.0.0:8000 \
--timeout 120
# gunicorn.conf.py — configuration file
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
timeout = 120
keepalive = 5
max_requests = 1000 # Restart workers after N requests (prevent memory leaks)
max_requests_jitter = 100 # Random jitter to prevent all workers restarting at once
loglevel = "info"
accesslog = "-"
errorlog = "-"
preload_app = True # Load app code before forking workers (saves memory)Docker 容器化
# Dockerfile — multi-stage for FastAPI
FROM python:3.12-slim AS builder
WORKDIR /app
# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
# Production stage
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends libpq5 \
&& rm -rf /var/lib/apt/lists/*
# Copy installed packages from builder
COPY --from=builder /root/.local /root/.local
# Copy application code
COPY . .
# Create non-root user for security
RUN useradd --no-create-home --shell /bin/false appuser
USER appuser
ENV PATH=/root/.local/bin:$PATH
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", \
"uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://postgres:password@db/mydb
- SECRET_KEY=${SECRET_KEY}
- REDIS_URL=redis://redis:6379/0
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
db:
image: postgres:16-alpine
volumes:
- pgdata:/var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: password
POSTGRES_DB: mydb
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
worker:
build: .
command: celery -A app.celery_app worker -l info -c 4
environment:
- DATABASE_URL=postgresql+asyncpg://postgres:password@db/mydb
- REDIS_URL=redis://redis:6379/0
depends_on:
- db
- redis
volumes:
pgdata:11. 框架详细对比表
以下是 Django、FastAPI 和 Flask 在各维度的详细对比,帮助你根据项目需求做出最终选择:
| 维度 | Django | FastAPI | Flask |
|---|---|---|---|
| 首次发布 | 2005 | 2018 | 2010 |
| 架构模式 | MTV (Full-stack) | ASGI (API-first) | WSGI (Micro) |
| ORM | 内置 Django ORM | 无(推荐 SQLAlchemy) | 无(推荐 SQLAlchemy) |
| 迁移工具 | 内置 makemigrations | Alembic | Alembic / Flask-Migrate |
| 异步支持 | 部分(3.1+,需 ASGI) | 原生(ASGI) | 不支持(WSGI) |
| 数据验证 | Forms / DRF Serializers | Pydantic (built-in) | Marshmallow / WTForms |
| API 文档 | DRF Browsable API | Auto Swagger + ReDoc | 无(需插件) |
| Admin 后台 | 内置(零配置) | 无(需第三方) | 无(需 Flask-Admin) |
| 认证系统 | 内置 User/Session/JWT | python-jose + passlib | Flask-Login + JWT |
| 测试工具 | django.test.Client | TestClient (httpx) | Flask test client |
| 性能(相对) | 中等(同步 WSGI) | 高(异步 ASGI) | 中等(同步 WSGI) |
| 学习曲线 | 陡(概念多) | 中(Pydantic + async) | 低(极简) |
| 最适合场景 | CMS、电商、内容平台 | API、ML 服务、微服务 | 原型、简单应用 |
常见问题
Django、FastAPI 和 Flask 该如何选择?
选择依据:需要完整的 admin 后台、ORM 和认证系统时选 Django(适合 CMS、电商、内容平台);构建高性能 REST API 或需要自动 OpenAPI 文档时选 FastAPI(适合微服务、数据 API、ML 推理接口);构建轻量级原型或小型应用时选 Flask(灵活但需要自己搭建更多基础设施)。
FastAPI 比 Flask 快多少?
FastAPI 基于 ASGI(Starlette + Uvicorn),支持原生异步 I/O。在 I/O 密集型工作负载下,FastAPI 的吞吐量通常是 Flask(WSGI + Gunicorn)的 2-3 倍。对于 CPU 密集型任务,两者差异不大,因为都受 Python GIL 限制。真实场景下 FastAPI 每秒可处理数万个请求,而标准 Flask 配置通常在数千个。
SQLAlchemy 2.0 和 Django ORM 有什么区别?
Django ORM 与 Django 框架深度集成,API 更简洁,适合快速开发,但灵活性相对较低。SQLAlchemy 2.0 是独立的 ORM 库,可与任何框架配合使用,提供 Core(SQL 表达式语言)和 ORM 两层 API,复杂查询更灵活,支持原生异步会话(async Session)。新的 Flask/FastAPI 项目推荐使用 SQLAlchemy 2.0。
Python 异步 Web 开发中如何避免阻塞事件循环?
关键规则:所有 I/O 操作(数据库查询、HTTP 请求、文件读写)必须使用 async/await 版本的库(asyncpg、aiohttp、aiofiles)。CPU 密集型任务(图像处理、数据计算)应使用 asyncio.run_in_executor() 放入线程池或进程池执行。避免在 async 函数中调用同步的 time.sleep()、requests.get() 等阻塞函数。
Alembic 和 Django migrations 有什么区别?
Django migrations 与 Django ORM 深度集成,可自动检测模型变更并生成迁移文件(makemigrations),执行简单。Alembic 是 SQLAlchemy 的迁移工具,需要手动或半自动生成迁移脚本,但对自定义 SQL 和复杂迁移逻辑的支持更强。两者都应将迁移文件提交到版本控制,并在部署时优先于应用启动执行。
Celery 和 FastAPI 的 BackgroundTasks 有什么区别?
FastAPI BackgroundTasks 在同一进程内异步执行轻量级任务(发邮件通知、更新统计数据),任务随进程退出而终止,不支持重试或持久化。Celery 是独立的分布式任务队列,支持任务重试、定时调度、优先级队列、结果存储和跨服务分发,适合耗时任务(视频处理、大批量数据导入)。生产环境中复杂的后台任务应使用 Celery。
生产环境应该用 Gunicorn 还是 Uvicorn?
推荐组合:Gunicorn 作为进程管理器(管理多个工作进程的生命周期),配合 UvicornWorker 作为 ASGI 工作进程(处理实际请求)。命令:gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker。对于 Django(同步),使用 gunicorn -w 4 myproject.wsgi:application。纯 Uvicorn 适合容器环境(Docker + Kubernetes),由容器编排平台管理进程数量。
如何在 FastAPI 中实现 JWT 认证?
使用 python-jose 库处理 JWT,passlib 处理密码哈希。创建 /token 端点接受 OAuth2PasswordRequestForm,验证用户凭据后生成 JWT。使用 OAuth2PasswordBearer 依赖项提取 Bearer token,在保护路由中通过 Depends(get_current_user) 验证 token 并注入当前用户。将 JWT SECRET_KEY 存入环境变量,生产环境使用 RS256 非对称算法代替 HS256。
总结
Python Web 开发生态在 2026 年已经相当成熟。FastAPI 凭借原生异步支持、自动 OpenAPI 文档和 Pydantic 数据验证,成为构建新 API 服务的首选框架。Django 凭借其完整的生态和"开箱即用"的特性,在全栈 Web 应用开发中仍然占据重要地位。Flask 则在需要最大灵活性的轻量级场景中依然适用。
无论选择哪个框架,以下实践都应该坚守:使用 SQLAlchemy 2.0(或 Django ORM)管理数据层,Alembic(或 Django migrations)管理数据库变更,Celery 处理耗时后台任务,pytest 确保代码质量,以及 Docker + Gunicorn/Uvicorn + Nginx 实现生产部署。环境变量管理所有密钥,永远不要将密码或 API 密钥硬编码在代码中。
掌握这些工具和模式,你将能够构建可靠、高性能、易于维护的 Python Web 应用,从小型原型到处理每秒数万请求的生产系统。