FastAPI+SQLite实现数据表操作
我将帮你创建一个使用FastAPI和SQLite的数据库服务。以下是完整的实现:
项目结构
fastapi_sqlite_app/
├── main.py
├── database.py
├── models.py
├── schemas.py
└── test.db
1. 首先安装依赖
pip install fastapi uvicorn sqlalchemy
2. 创建数据库模型 (models.py)
from sqlalchemy import Column, Integer, String, Index
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class DataRecord(Base):
    __tablename__ = "data_records"
    
    i = Column(Integer, primary_key=True, autoincrement=True)
    t = Column(String, nullable=False)
    r = Column(String, nullable=False)
    c = Column(String, nullable=False)
    v = Column(String, nullable=False)
    
    # 创建复合索引
    __table_args__ = (
        Index('idx_t_r_c', 't', 'r', 'c'),
    )
3. 创建数据库连接 (database.py)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Base
# SQLite数据库连接
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, 
    connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建数据库表
def init_db():
    Base.metadata.create_all(bind=engine)
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
4. 创建Pydantic模型 (schemas.py)
from pydantic import BaseModel
class DataRecordBase(BaseModel):
    t: str
    r: str
    c: str
    v: str
class DataRecordCreate(DataRecordBase):
    pass
class DataRecord(DataRecordBase):
    i: int
    class Config:
        orm_mode = True
5. 创建主应用 (main.py)
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import Optional
import models
import schemas
from database import get_db, init_db
# 初始化应用
app = FastAPI(title="SQLite Data API")
# 启动时初始化数据库
@app.on_event("startup")
def on_startup():
    init_db()
# 路由1: 根据 t, r, c 读取v的值
@app.get("/data/", response_model=schemas.DataRecord)
def read_data(t: str, r: str, c: str, db: Session = Depends(get_db)):
    # 查询数据
    record = db.query(models.DataRecord).filter(
        models.DataRecord.t == t,
        models.DataRecord.r == r,
        models.DataRecord.c == c
    ).first()
    
    if not record:
        raise HTTPException(status_code=404, detail="Record not found")
    
    return record
# 路由2: 查找并写入数据,如果不存在则插入新记录
@app.get("/data/upsert/", response_model=schemas.DataRecord)
def upsert_data(t: str, r: str, c: str, v: str, db: Session = Depends(get_db)):
    # 首先尝试查找现有记录
    record = db.query(models.DataRecord).filter(
        models.DataRecord.t == t,
        models.DataRecord.r == r,
        models.DataRecord.c == c
    ).first()
    
    if record:
        # 如果找到记录,更新v值
        record.v = v
        db.commit()
        db.refresh(record)
        return record
    else:
        # 如果没有找到,创建新记录
        new_record = models.DataRecord(t=t, r=r, c=c, v=v)
        db.add(new_record)
        db.commit()
        db.refresh(new_record)
        return new_record
# 可选:获取所有数据的路由
@app.get("/data/all/", response_model=list[schemas.DataRecord])
def read_all_data(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    records = db.query(models.DataRecord).offset(skip).limit(limit).all()
    return records
# 健康检查端点
@app.get("/health")
def health_check():
    return {"status": "healthy"}
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
6. 运行应用
python main.py
或者使用uvicorn直接运行:
uvicorn main:app --reload --host 0.0.0.0 --port 8000
API端点说明
1. 读取数据
GET /data/?t={t_value}&r={r_value}&c={c_value}
- 根据t, r, c参数查询对应的记录
 - 如果找到,返回包含v值的完整记录
 - 如果没找到,返回404错误
 
2. 插入/更新数据
GET /data/upsert/?t={t_value}&r={r_value}&c={c_value}&v={v_value}
- 根据t, r, c参数查找记录
 - 如果找到,更新v值
 - 如果没找到,插入新记录
 
3. 获取所有数据
GET /data/all/?skip=0&limit=100
- 获取所有记录(支持分页)
 
使用示例
插入数据:
curl "http://localhost:8000/data/upsert/?t=type1&r=resource1&c=category1&v=value1"
读取数据:
curl "http://localhost:8000/data/?t=type1&r=resource1&c=category1"
更新数据:
curl "http://localhost:8000/data/upsert/?t=type1&r=resource1&c=category1&v=new_value"
注意事项
- 数据库文件:SQLite数据库文件
test.db会在首次运行时自动创建 - 索引优化:已经为(t, r, c)字段创建了复合索引,提高查询性能
 - 线程安全:配置了
connect_args={"check_same_thread": False}以确保线程安全 - 自动提交:使用SQLAlchemy的session管理,确保数据一致性
 
这个实现提供了你要求的所有功能,并且遵循了FastAPI和SQLAlchemy的最佳实践。





