一个完整的 FastAPI + SQLAlchemy 异步示例
一个完整的 FastAPI + SQLAlchemy 异步示例
项目结构
fastapi_sqlalchemy_demo/
├── app/
│├── __init__.py
│├── main.py
│├── database.py
│├── models.py
│├── schemas.py
│└── crud.py
├── requirements.txt
└── README.md
1. 依赖文件 (requirements.txt)
fastapi==0.104.1
uvicorn==0.24.0
sqlalchemy==2.0.23
asyncpg==0.29.0
alembic==1.12.1
pydantic==2.5.0
python-dotenv==1.0.0
psycopg2-binary==2.9.9
2. 数据库配置 (app/database.py)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
import os
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://user:password@localhost:5432/fastapi_db"
)
# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)
# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
class Base(DeclarativeBase):
pass
# 依赖注入:获取数据库会话
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
3. 数据模型 (app/models.py)
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.sql import func
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True, nullable=False)
name = Column(String(100), nullable=False)
hashed_password = Column(String(255), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
author_id = Column(Integer, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
4. Pydantic模式 (app/schemas.py)
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optional
# User Schemas
class UserBase(BaseModel):
email: str
name: str
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
name: Optional[str] = None
password: Optional[str] = None
class User(UserBase):
id: int
created_at: datetime
class Config:
from_attributes = True
# Post Schemas
class PostBase(BaseModel):
title: str
content: str
class PostCreate(PostBase):
pass
class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None
class Post(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class Postclass PostWithAuthor(Post):
author: User
5. CRUD操作 (app/crud.py)
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update, delete
from . import models, schemas
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# User CRUD Operations
class UserCRUD:
@staticmethod
async def get_user(db: AsyncSession, user_id: int):
result = await db.execute(select(models.User).where(models.User.id == user_id))
return result.scalar_one_or_none()
@staticmethod
async def get_user_by_email(db: AsyncSession, email: str):
result = await db.execute(select(models.User).where(models.User.email == email))
return result.scalar_one_or_none()
@staticmethod
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100):
result = await db.execute(select(models.User).offset(skip).limit(limit))
return result.scalars().all()
@staticmethod
async def create_user(db: AsyncSession, user: schemas.UserCreate):
hashed_password = get_password_hash(user.password)
db_user = models.User(
email=user.email,
name=user.name,
hashed_password=hashed_password
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
@staticmethod
async def update_user(db: AsyncSession, user_id: int, user_update: schemas.UserUpdate):
update_data = user_update.model_dump(exclude_unset=True)
if 'password' in update_data:
update_data['hashed_password'] = get_password_hash(update_data.pop('password'))
stmt = (
update(models.User)
.where(models.User.id == user_id)
.values(**update_data)
.execution_options(synchronize_session="fetch")
)
await db.execute(stmt)
await db.commit()
return await UserCRUD.get_user(db, user_id user_id)
@staticmethod
async def delete_user(db: AsyncSession, user_id: int):
stmt = delete(models.User).where(models.User.id == user_id)
await db.execute(stmt)
await db.commit()
# Post CRUD Operations
class PostCRUD:
@staticmethod
async def get_post(db: AsyncSession, post_id: int):
result = await db.execute(select(models.Post).where(models.Post.id == post_id))
return result.scalar_one_or_none()
@staticmethod
async def get_posts(db: AsyncSession, skip: int = 0, limit: int = 100):
100):
result = await db.execute(select(models.Post).offset(skip).limit(limit))
return result.scalars().all()
@staticmethod
async def get_posts_by_author(db: AsyncSession, author_id: int, skip: int = 0, limit: limit: int = 100):
result = await db.execute(
select(models.Post)
.where(models.Post.author_id == author_id)
.offset(skip)
.limit(limit)
)
return result.scalars().all()
@staticmethod
async def create_post(db: AsyncSession, post: schemas.PostCreate, author_id: int):
db_post = models.Post(**post.model_dump(), author_id=author_id)
db.add(db_post)
await db.commit()
await db.refresh(db_post)
return db_post
@staticmethod
async def update_post(db: AsyncSession, post_id: int, post_update: schemas.PostUpdate):
stmt = (
update(models.Post)
.where(models.Post.id == post_id)
.values(**post_update.model_dump(exclude_unset=True))
.execution_options(synchronize_session="fetch")
)
await db.execute(stmt)
await db.commit()
return await PostCRUD.get_post(db, post_id)
@staticmethod
async def delete_post(db: AsyncSession, post_id: int):
stmt = delete(models.Post).where(models.Post.id == post_id)
await db.execute(stmt)
await db.commit()
@staticmethod
async def get_post_with_author(db: AsyncSession, post_id: int):
result = await db.execute(
select(models.Post, models.User)
.join(models.User, models.Post.author_id == models.User.id)
.where(models.Post.id == post_id)
)
row = result.first()
if row:
post, author = row
post.author = author
return post
return None
6. 主应用文件 (app/main.py)
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from . import crud, models, schemas
from .database import engine, get_db, Base
app = FastAPI(title="FastAPI SQLAlchemy Async Demo", version="1.0.0")
@app.on_event("startup")
async def startup():
# 在生产环境中,应该使用 Alembic 进行数据库迁移
async with engine.begin() as conn:
# await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
@app.on_event("shutdown")
async def shutdown():
await engine.dispose()
# User Routes
@app.post("/users/", response_model=schemas.User, status_code=status.HTTP_201_CREATED)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(
status_code=400,
detail="Email already registered"
)
return await crud.UserCRUD.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.User])
async def read_users(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db)):
users = await crud.UserCRUD.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.User)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.get_user(db, user_id user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.put("/users/{user_id}", response_model=schemas.User)
async def update_user(user_id: int, user_update: schemas.UserUpdate, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.update_user(db, user_id=user_id, user_update=user_update)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
await crud.UserCRUD.delete_user(db, user_id=user_id)
return {"message": "User deleted successfully"}
# Post Routes
@app.post("/posts/", response_model=schemas.Post, status_code=status.HTTP_201_CREATED)
async def create_post(post: schemas.PostCreate, author_id: int, db: AsyncSession = Depends(get_db)):
# 验证作者是否存在
author = await crud.UserCRUD.get_user(db, user_id=author_id)
if author is None:
raise HTTPException(status_code=404, detail="Author not found")
return await crud.PostCRUD.create_post(db=db, post=post, author_id=author_id)
@app.get("/posts/", response_model=List[schemas.Post])
async def read_posts(skip: int = 0, limit: limit: int = 100, db: AsyncSession = Depends(get_db)):
posts = await crud.PostCRUD.get_posts(db, skip=skip, limit=limit)
return posts
@app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor)
async def read_post(post_id: int, db: AsyncSession = Depends(get_db)):
db_post = await crud.PostCRUD.get_post_with_author(db, post_id=post_id)
if db_post is None:
raise HTTPException(status_code=404, detail="Post not found")
return db_post
@app.put("/posts/{post_id}", response_model=schemas.Post)
async def update_post(post_id: int, post_update: schemas.PostUpdate, db: AsyncSession = Depends(get_db)):
db_post = await crud.PostCRUD.update_post(db, post_id=post_id, post_update=post_update)
if db_post is None:
raise HTTPException(status_code=404, detail="Post not found")
return db_post
@app.delete("/posts/{post_id}")
async def delete_post(post_id: int, db: AsyncSession = Depends(get_db)):
db_post = await crud.PostCRUD.get_post(db, post_id=post_id)
if db_post is None:
raise HTTPException(status_code=404, detail="Post not found")
await crud.PostCRUD.delete_post(db, post_id=post_id)
return {"message": "Post deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
7. 环境变量配置 (.env)
DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/fastapi_db
8. 运行应用
# 安装依赖
pip install -r requirements.txt
# 运行服务
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
访问 http://localhost:8000/docs 查看自动生成的 API 文档。
关键特性说明:
- 完全异步: 所有数据库操作都使用 
async/await - 依赖注入: 使用 FastAPI 的依赖注入系统管理数据库会话
 - 类型安全: 使用 Pydantic 进行数据验证和序列化
 - 密码加密: 使用 bcrypt 对密码进行哈希处理
 - 错误处理: 完善的异常处理机制
 - RESTful API: 遵循 REST 原则设计 API 端点
 
这个示例提供了用户和博客文章的基本 CRUD 操作,你可以根据需要进行扩展。






