FastAPI+SQLite实现数据表操作
我将帮你创建一个使用FastAPI和SQLite的数据库服务。以下是完整的实现:
项目结构
fastapi_sqlite_app/
├── main.py
├── database.py
├── models.py
├── schemas.py
└── test.db
1. 首先安装依赖
pip install fastapi uvicorn sqlalchemy
2. 创建数据库模型 (models.py)
from sqlalchemy import Column, Integer, String, Index
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class DataRecord(Base):
__tablename__ = "data_records"
i = Column(Integer, primary_key=True, autoincrement=True)
t = Column(String, nullable=False)
r = Column(String, nullable=False)
c = Column(String, nullable=False)
v = Column(String, nullable=False)
# 创建复合索引
__table_args__ = (
Index('idx_t_r_c', 't', 'r', 'c'),
)
3. 创建数据库连接 (database.py)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base
# SQLite数据库连接
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建数据库表
def init_db():
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
4. 创建Pydantic模型 (schemas.py)
from pydantic import BaseModel
class DataRecordBase(BaseModel):
t: str
r: str
c: str
v: str
class DataRecordCreate(DataRecordBase):
pass
class DataRecord(DataRecordBase):
i: int
class Config:
orm_mode = True
5. 创建主应用 (main.py)
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import Optional
import models
import schemas
from database import get_db, init_db
# 初始化应用
app = FastAPI(title="SQLite Data API")
# 启动时初始化数据库
@app.on_event("startup")
def on_startup():
init_db()
# 路由1: 根据 t, r, c 读取v的值
@app.get("/data/", response_model=schemas.DataRecord)
def read_data(t: str, r: str, c: str, db: Session = Depends(get_db)):
# 查询数据
record = db.query(models.DataRecord).filter(
models.DataRecord.t == t,
models.DataRecord.r == r,
models.DataRecord.c == c
).first()
if not record:
raise HTTPException(status_code=404, detail="Record not found")
return record
# 路由2: 查找并写入数据,如果不存在则插入新记录
@app.get("/data/upsert/", response_model=schemas.DataRecord)
def upsert_data(t: str, r: str, c: str, v: str, db: Session = Depends(get_db)):
# 首先尝试查找现有记录
record = db.query(models.DataRecord).filter(
models.DataRecord.t == t,
models.DataRecord.r == r,
models.DataRecord.c == c
).first()
if record:
# 如果找到记录,更新v值
record.v = v
db.commit()
db.refresh(record)
return record
else:
# 如果没有找到,创建新记录
new_record = models.DataRecord(t=t, r=r, c=c, v=v)
db.add(new_record)
db.commit()
db.refresh(new_record)
return new_record
# 可选:获取所有数据的路由
@app.get("/data/all/", response_model=list[schemas.DataRecord])
def read_all_data(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
records = db.query(models.DataRecord).offset(skip).limit(limit).all()
return records
# 健康检查端点
@app.get("/health")
def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
6. 运行应用
python main.py
或者使用uvicorn直接运行:
uvicorn main:app --reload --host 0.0.0.0 --port 8000
API端点说明
1. 读取数据
GET /data/?t={t_value}&r={r_value}&c={c_value}
- 根据t, r, c参数查询对应的记录
- 如果找到,返回包含v值的完整记录
- 如果没找到,返回404错误
2. 插入/更新数据
GET /data/upsert/?t={t_value}&r={r_value}&c={c_value}&v={v_value}
- 根据t, r, c参数查找记录
- 如果找到,更新v值
- 如果没找到,插入新记录
3. 获取所有数据
GET /data/all/?skip=0&limit=100
- 获取所有记录(支持分页)
使用示例
插入数据:
curl "http://localhost:8000/data/upsert/?t=type1&r=resource1&c=category1&v=value1"
读取数据:
curl "http://localhost:8000/data/?t=type1&r=resource1&c=category1"
更新数据:
curl "http://localhost:8000/data/upsert/?t=type1&r=resource1&c=category1&v=new_value"
注意事项
- 数据库文件:SQLite数据库文件
test.db
会在首次运行时自动创建 - 索引优化:已经为(t, r, c)字段创建了复合索引,提高查询性能
- 线程安全:配置了
connect_args={"check_same_thread": False}
以确保线程安全 - 自动提交:使用SQLAlchemy的session管理,确保数据一致性
这个实现提供了你要求的所有功能,并且遵循了FastAPI和SQLAlchemy的最佳实践。