当前位置:首页 > 软件 > 正文内容

一个完整的 FastAPI + SQLAlchemy 异步示例

一往无前1周前 (10-25)软件19

一个完整的 FastAPI + SQLAlchemy 异步示例

项目结构

fastapi_sqlalchemy_demo/
├── app/
│├── __init__.py
│├── main.py
│├── database.py
│├── models.py
│├── schemas.py
│└── crud.py
├── requirements.txt
└── README.md

1. 依赖文件 (requirements.txt)

fastapi==0.104.1
uvicorn==0.24.0
sqlalchemy==2.0.23
asyncpg==0.29.0
alembic==1.12.1
pydantic==2.5.0
python-dotenv==1.0.0
psycopg2-binary==2.9.9

2. 数据库配置 (app/database.py)

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
import os
from dotenv import load_dotenv

load_dotenv()

DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://user:password@localhost:5432/fastapi_db"
)

# 创建异步引擎
engine = create_async_engine(DATABASE_URL, echo=True)

# 创建异步会话工厂
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)

class Base(DeclarativeBase):
pass

# 依赖注入:获取数据库会话
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()

3. 数据模型 (app/models.py)

from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.sql import func
from .database import Base

class User(Base):
__tablename__ = "users"

id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True, nullable=False)
name = Column(String(100), nullable=False)
hashed_password = Column(String(255), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

class Post(Base):
__tablename__ = "posts"

id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
author_id = Column(Integer, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

4. Pydantic模式 (app/schemas.py)

from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optional

# User Schemas
class UserBase(BaseModel):
email: str
name: str

class UserCreate(UserBase):
password: str

class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
name: Optional[str] = None
password: Optional[str] = None

class User(UserBase):
id: int
created_at: datetime

class Config:
from_attributes = True

# Post Schemas
class PostBase(BaseModel):
title: str
content: str

class PostCreate(PostBase):
pass

class PostUpdate(BaseModel):
title: Optional[str] = None
content: Optional[str] = None

class Post(PostBase):
id: int
author_id: int
created_at: datetime
updated_at: Optional[datetime] = None

class Config:
from_attributes = True

class Postclass PostWithAuthor(Post):
author: User

5. CRUD操作 (app/crud.py)

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update, delete
from . import models, schemas
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)

# User CRUD Operations
class UserCRUD:
@staticmethod
async def get_user(db: AsyncSession, user_id: int):
result = await db.execute(select(models.User).where(models.User.id == user_id))
return result.scalar_one_or_none()

@staticmethod
async def get_user_by_email(db: AsyncSession, email: str):
result = await db.execute(select(models.User).where(models.User.email == email))
return result.scalar_one_or_none()

@staticmethod
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100):
result = await db.execute(select(models.User).offset(skip).limit(limit))
return result.scalars().all()

@staticmethod
async def create_user(db: AsyncSession, user: schemas.UserCreate):
hashed_password = get_password_hash(user.password)
db_user = models.User(
email=user.email,
name=user.name,
hashed_password=hashed_password
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user

@staticmethod
async def update_user(db: AsyncSession, user_id: int, user_update: schemas.UserUpdate):
update_data = user_update.model_dump(exclude_unset=True)

if 'password' in update_data:
update_data['hashed_password'] = get_password_hash(update_data.pop('password'))

stmt = (
update(models.User)
.where(models.User.id == user_id)
.values(**update_data)
.execution_options(synchronize_session="fetch")
)
await db.execute(stmt)
await db.commit()

return await UserCRUD.get_user(db, user_id user_id)

@staticmethod
async def delete_user(db: AsyncSession, user_id: int):
stmt = delete(models.User).where(models.User.id == user_id)
await db.execute(stmt)
await db.commit()

# Post CRUD Operations
class PostCRUD:
@staticmethod
async def get_post(db: AsyncSession, post_id: int):
result = await db.execute(select(models.Post).where(models.Post.id == post_id))
return result.scalar_one_or_none()

@staticmethod
async def get_posts(db: AsyncSession, skip: int = 0, limit: int = 100):
100):
result = await db.execute(select(models.Post).offset(skip).limit(limit))
return result.scalars().all()

@staticmethod
async def get_posts_by_author(db: AsyncSession, author_id: int, skip: int = 0, limit: limit: int = 100):
result = await db.execute(
select(models.Post)
.where(models.Post.author_id == author_id)
.offset(skip)
.limit(limit)
)
return result.scalars().all()

@staticmethod
async def create_post(db: AsyncSession, post: schemas.PostCreate, author_id: int):
db_post = models.Post(**post.model_dump(), author_id=author_id)
db.add(db_post)
await db.commit()
await db.refresh(db_post)
return db_post

@staticmethod
async def update_post(db: AsyncSession, post_id: int, post_update: schemas.PostUpdate):
stmt = (
update(models.Post)
.where(models.Post.id == post_id)
.values(**post_update.model_dump(exclude_unset=True))
.execution_options(synchronize_session="fetch")
)
await db.execute(stmt)
await db.commit()
return await PostCRUD.get_post(db, post_id)

@staticmethod
async def delete_post(db: AsyncSession, post_id: int):
stmt = delete(models.Post).where(models.Post.id == post_id)
await db.execute(stmt)
await db.commit()

@staticmethod
async def get_post_with_author(db: AsyncSession, post_id: int):
result = await db.execute(
select(models.Post, models.User)
.join(models.User, models.Post.author_id == models.User.id)
.where(models.Post.id == post_id)
)
row = result.first()
if row:
post, author = row
post.author = author
return post
return None

6. 主应用文件 (app/main.py)

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List

from . import crud, models, schemas
from .database import engine, get_db, Base

app = FastAPI(title="FastAPI SQLAlchemy Async Demo", version="1.0.0")

@app.on_event("startup")
async def startup():
# 在生产环境中,应该使用 Alembic 进行数据库迁移
async with engine.begin() as conn:
# await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)

@app.on_event("shutdown")
async def shutdown():
await engine.dispose()

# User Routes
@app.post("/users/", response_model=schemas.User, status_code=status.HTTP_201_CREATED)
async def create_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(
status_code=400,
detail="Email already registered"
)
return await crud.UserCRUD.create_user(db=db, user=user)

@app.get("/users/", response_model=List[schemas.User])
async def read_users(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db)):
users = await crud.UserCRUD.get_users(db, skip=skip, limit=limit)
return users

@app.get("/users/{user_id}", response_model=schemas.User)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.get_user(db, user_id user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user

@app.put("/users/{user_id}", response_model=schemas.User)
async def update_user(user_id: int, user_update: schemas.UserUpdate, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.update_user(db, user_id=user_id, user_update=user_update)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user

@app.delete("/users/{user_id}")
async def delete_user(user_id: int, db: AsyncSession = Depends(get_db)):
db_user = await crud.UserCRUD.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

await crud.UserCRUD.delete_user(db, user_id=user_id)
return {"message": "User deleted successfully"}

# Post Routes
@app.post("/posts/", response_model=schemas.Post, status_code=status.HTTP_201_CREATED)
async def create_post(post: schemas.PostCreate, author_id: int, db: AsyncSession = Depends(get_db)):
# 验证作者是否存在
author = await crud.UserCRUD.get_user(db, user_id=author_id)
if author is None:
raise HTTPException(status_code=404, detail="Author not found")

return await crud.PostCRUD.create_post(db=db, post=post, author_id=author_id)

@app.get("/posts/", response_model=List[schemas.Post])
async def read_posts(skip: int = 0, limit: limit: int = 100, db: AsyncSession = Depends(get_db)):
posts = await crud.PostCRUD.get_posts(db, skip=skip, limit=limit)
return posts

@app.get("/posts/{post_id}", response_model=schemas.PostWithAuthor)
async def read_post(post_id: int, db: AsyncSession = Depends(get_db)):
db_post = await crud.PostCRUD.get_post_with_author(db, post_id=post_id)
if db_post is None:
raise HTTPException(status_code=404, detail="Post not found")
return db_post

@app.put("/posts/{post_id}", response_model=schemas.Post)
async def update_post(post_id: int, post_update: schemas.PostUpdate, db: AsyncSession = Depends(get_db)):
db_post = await crud.PostCRUD.update_post(db, post_id=post_id, post_update=post_update)
if db_post is None:
raise HTTPException(status_code=404, detail="Post not found")
return db_post

@app.delete("/posts/{post_id}")
async def delete_post(post_id: int, db: AsyncSession = Depends(get_db)):
db_post = await crud.PostCRUD.get_post(db, post_id=post_id)
if db_post is None:
raise HTTPException(status_code=404, detail="Post not found")

await crud.PostCRUD.delete_post(db, post_id=post_id)
return {"message": "Post deleted successfully"}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

7. 环境变量配置 (.env)

DATABASE_URL=postgresql+asyncpg://username:password@localhost:5432/fastapi_db

8. 运行应用

# 安装依赖
pip install -r requirements.txt

# 运行服务
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

访问 http://localhost:8000/docs 查看自动生成的 API 文档。

关键特性说明:

  1. 完全异步: 所有数据库操作都使用 async/await
  2. 依赖注入: 使用 FastAPI 的依赖注入系统管理数据库会话
  3. 类型安全: 使用 Pydantic 进行数据验证和序列化
  4. 密码加密: 使用 bcrypt 对密码进行哈希处理
  5. 错误处理: 完善的异常处理机制
  6. RESTful API: 遵循 REST 原则设计 API 端点

这个示例提供了用户和博客文章的基本 CRUD 操作,你可以根据需要进行扩展。

“一个完整的 FastAPI + SQLAlchemy 异步示例” 的相关文章

Python SQLite 数据库使用指南

Python SQLite 数据库使用指南

Python SQLite 详解SQLite 是一个轻量级的嵌入式数据库,Python 通过 sqlite3 模块提供了对 SQLite 数据库的支持。以下是详细的使用指南。 目录 基本概念连接数据库创建表插入数据查询数据更新和删除数据事务处理使用上下文管理器错误处理高级功能最佳实践 基本概念...

Windows10使用命令行配置NTP时间服务器和更新时间频率

Windows10使用命令行配置NTP时间服务器和更新时间频率

在 Windows 10 中,你可以使用命令行(cmd)或 PowerShell 来配置 NTP 时间服务器和更新时间频率。以下是具体步骤: 1. 查看当前时间配置在开始修改前,可以先查看当前的时间服务器和同步状态: w32tm /query /status w32tm /query /c...

Python提供HTTP文件服务的几种方式

Python提供HTTP文件服务的几种方式

Python提供HTTP文件服务的几种方式Python有多种方式可以用来提供HTTP文件服务,以下是主要的几种方法: 1. 使用内置模块http.server (Python 3)python -m http.server 8000 # 或指定目录 python -m http.server...

值得推荐的 Windows 版运维面板

值得推荐的 Windows 版运维面板

除了宝塔面板外,还有云帮手、护卫神、UPUPW绿色服务器平台等多款值得推荐的Windows版运维面板,以下是具体介绍: 云帮手:全面兼容所有云服务商,同时兼容Windows、CentOS、Ubuntu等多种云服务器操作系统。它自带远程连接功能,无需额外工具即可进行远程桌面控制,安装和添加服务器...

FastAPI+SQLite实现数据表操作

FastAPI+SQLite实现数据表操作

我将帮你创建一个使用FastAPI和SQLite的数据库服务。以下是完整的实现: 项目结构fastapi_sqlite_app/ ├── main.py ├── database.py ├── models.py ├── schemas.py └── test.db 1. 首先安装...

一个简单的短链API程序

一个简单的短链API程序

以下是一个使用 FastAPI 和 SQLite 实现的简单短链 API 程序: 🔗 功能说明: 通过 POST 请求访问 /u?url=原网址,可以创建一个短链(如 /u/abc123)。通过 GET 请求访问 /u/短码(如 /u/abc123),可以跳转到原网址。 📦 项目结构(简单...

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。