實戰(zhàn)必備!FastAPI 異步任務(wù) + 文件上傳:從需求到落地全流程
做 FastAPI 實戰(zhàn)開發(fā)時,你一定遇到過這樣的需求:
用戶上傳一個包含幾百條數(shù)據(jù)的 Excel 文件,要求批量導(dǎo)入系統(tǒng);導(dǎo)入過程可能需要解析數(shù)據(jù)、校驗格式、寫入數(shù)據(jù)庫,耗時幾十秒甚至更久——如果讓用戶一直等著接口響應(yīng),體驗肯定很差;而且接口長時間阻塞,還可能導(dǎo)致請求超時。
解決這個問題的核心方案就是:文件上傳 + 異步任務(wù)——讓用戶先快速完成文件上傳,后臺用異步任務(wù)慢慢處理,用戶可以隨時查詢處理進度,處理完成后還能收到通知。

今天這篇文章,就以“Excel 批量導(dǎo)入用戶數(shù)據(jù)”為實戰(zhàn)案例,手把手帶大家實現(xiàn)“文件上傳校驗 → 觸發(fā)異步任務(wù) → 任務(wù)狀態(tài)追蹤 → 處理結(jié)果反饋”的全流程。代碼可直接復(fù)制運行,新手也能輕松跟上!
一、先理清核心邏輯:為什么要“異步”?
先搞懂一個關(guān)鍵問題:為什么文件上傳后要走異步任務(wù),而不是直接在接口里處理?
舉個直觀的對比:
- 同步處理:用戶上傳文件 → 接口開始解析/導(dǎo)入 → 用戶等待30秒 → 得到結(jié)果。期間用戶頁面一直加載,稍有不慎就超時,體驗極差;
- 異步處理:用戶上傳文件 → 接口快速返回“上傳成功,任務(wù)已啟動”(耗時1秒內(nèi)) → 后臺異步處理導(dǎo)入 → 用戶可隨時查詢進度 → 處理完成后收到通知。用戶無需等待,體驗流暢。
本次實戰(zhàn)的核心技術(shù)棧:
- FastAPI:負責(zé)接收文件上傳、提供任務(wù)查詢接口;
- Celery:異步任務(wù)調(diào)度框架,負責(zé)管理和執(zhí)行后臺任務(wù);
- Redis:作為 Celery 的“消息隊列”(存放待執(zhí)行的任務(wù))和“結(jié)果存儲”(記錄任務(wù)狀態(tài)/結(jié)果);
- python-multipart:FastAPI 處理文件上傳的依賴;
- pandas:解析 Excel 文件(處理批量數(shù)據(jù)必備)。
簡單類比:Celery 是“后臺工作車間”,Redis 是“任務(wù)單倉庫”,F(xiàn)astAPI 是“前臺接待員”——用戶把文件(任務(wù)需求)交給前臺,前臺生成任務(wù)單放進倉庫,車間從倉庫拿任務(wù)單異步處理,用戶隨時可以問前臺任務(wù)進度。
二、環(huán)境準(zhǔn)備:先把“工具”裝到位
打開終端,復(fù)制以下命令安裝所有依賴,新手直接全選執(zhí)行即可:
# 核心框架與文件上傳
pip install fastapi uvicorn python-multipart
# 異步任務(wù)與消息隊列
pip install celery redis
# Excel 解析
pip install pandas openpyxl # openpyxl 用于讀取 .xlsx 格式
# 環(huán)境變量管理
pip install python-dotenv
# 數(shù)據(jù)庫(復(fù)用之前的 MySQL 配置,存儲導(dǎo)入的數(shù)據(jù))
pip install sqlalchemy aiomysql額外準(zhǔn)備:
- 安裝 Redis:Celery 依賴 Redis 存任務(wù)和結(jié)果,Windows 用戶可下載 Redis 安裝包(或用 WSL),Mac/Linux 可直接用 brew install redis / apt install redis-server 安裝;
- 啟動 Redis:安裝完成后,終端執(zhí)行 redis-server(默認端口6379,無需額外配置);
- 準(zhǔn)備測試 Excel:新建一個 users.xlsx 文件,包含3列:username(用戶名)、email(郵箱)、full_name(真實姓名),填幾條測試數(shù)據(jù)。
三、項目搭建:先搭好“骨架”,再填內(nèi)容
本次項目結(jié)構(gòu)清晰,新手可直接復(fù)制這個目錄創(chuàng)建文件:
fastapi-async-file/
├── .env # 環(huán)境變量(數(shù)據(jù)庫、Redis 配置)
├── main.py # 主程序(FastAPI 實例、文件上傳/任務(wù)查詢接口)
├── celery_config.py # Celery 配置(連接 Redis、任務(wù)設(shè)置)
├── tasks.py # 異步任務(wù)(解析 Excel、批量導(dǎo)入數(shù)據(jù)庫)
├── database.py # 數(shù)據(jù)庫配置(復(fù)用之前的 MySQL 異步配置)
├── models/ # 數(shù)據(jù)模型
│ ├── db.py # SQLAlchemy 數(shù)據(jù)庫模型(用戶表)
│ └── schemas.py # Pydantic 校驗?zāi)P停蛇x,用于數(shù)據(jù)校驗)
└── utils.py # 工具函數(shù)(Excel 解析、數(shù)據(jù)校驗)1. 環(huán)境變量配置(.env)
寫入數(shù)據(jù)庫和 Redis 配置,替換成自己的信息:
# .env
# MySQL 數(shù)據(jù)庫配置(批量導(dǎo)入的數(shù)據(jù)存這里)
DATABASE_URL=mysql+aiomysql://root:123456@localhost:3306/fastapi_demo
# Redis 配置(Celery 用)
REDIS_URL=redis://localhost:6379/0 # 0 是 Redis 的數(shù)據(jù)庫編號
# 任務(wù)相關(guān)配置
TASK_RESULT_EXPIRES=3600 # 任務(wù)結(jié)果保留 1 小時(3600 秒)2. 數(shù)據(jù)庫配置(database.py)
復(fù)用之前的 MySQL 異步配置,直接復(fù)制即可:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import declarative_base
from dotenv import load_dotenv
import os
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
# 異步引擎
async_engine = create_async_engine(
DATABASE_URL,
echo=True, # 開發(fā)時打印 SQL,便于調(diào)試
future=True
)
# 異步會話工廠
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
autoflush=False,
autocommit=False,
expire_on_commit=False
)
# 數(shù)據(jù)庫模型基類
Base = declarative_base()
# 依賴項:獲取數(shù)據(jù)庫會話
async def get_db():
db = AsyncSessionLocal()
try:
yield db
finally:
await db.close()3. 數(shù)據(jù)模型(models/db.py + models/schemas.py)
還是用“用戶表”存儲導(dǎo)入的數(shù)據(jù),模型復(fù)用之前的,稍作調(diào)整:
# models/db.py
from sqlalchemy import Column, Integer, String, Boolean, func
from sqlalchemy.sql.sqltypes import DateTime
from database import Base
class DBUser(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True, nullable=False)
email = Column(String, unique=True, index=True, nullable=False)
full_name = Column(String, index=True, nullable=True)
disabled = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())Pydantic 校驗?zāi)P停ㄓ糜诮馕?Excel 數(shù)據(jù)時校驗,避免臟數(shù)據(jù)):
# models/schemas.py
from pydantic import BaseModel, Field, validator
from typing import Optional
class UserImport(BaseModel):
"""Excel 導(dǎo)入數(shù)據(jù)的校驗?zāi)P?""
username: str = Field(..., min_length=3, max_length=20, description="用戶名3-20字符")
email: str = Field(..., pattern=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$", description="郵箱格式正確")
full_name: Optional[str] = Field(None, description="真實姓名可選")
# 可選:自定義校驗,比如用戶名不能包含特殊字符
@validator("username")
def username_no_special_chars(cls, v):
if not v.isalnum(): # 只允許字母和數(shù)字
raise ValueError("用戶名只能包含字母和數(shù)字")
return v四、核心實現(xiàn):異步任務(wù) + 文件上傳聯(lián)動
這是本次實戰(zhàn)的重點,分3步實現(xiàn):配置 Celery 異步任務(wù) → 編寫 Excel 解析與導(dǎo)入任務(wù) → 實現(xiàn) FastAPI 文件上傳接口。
1. Celery 異步任務(wù)配置(celery_config.py)
告訴 Celery 如何連接 Redis、如何存儲任務(wù)結(jié)果:
from dotenv import load_dotenv
import os
load_dotenv()
# Celery 配置
CELERY_CONFIG = {
# 連接 Redis(消息隊列)
"broker_url": os.getenv("REDIS_URL"),
# 存儲任務(wù)結(jié)果(便于查詢?nèi)蝿?wù)狀態(tài))
"result_backend": os.getenv("REDIS_URL"),
# 任務(wù)結(jié)果保留時間(秒)
"result_expires": int(os.getenv("TASK_RESULT_EXPIRES", 3600)),
# 任務(wù)序列化方式
"task_serializer": "json",
# 結(jié)果序列化方式
"result_serializer": "json",
# 接受的內(nèi)容類型
"accept_content": ["json"],
}2. 編寫異步任務(wù)(tasks.py)
核心任務(wù):解析上傳的 Excel 文件 → 校驗數(shù)據(jù) → 批量導(dǎo)入 MySQL。注意:Celery 任務(wù)是同步的,但運行在獨立進程,不會阻塞 FastAPI 主線程。
from celery import Celery
from celery_config import CELERY_CONFIG
import pandas as pd
from sqlalchemy.ext.asyncio import AsyncSession
from database import AsyncSessionLocal
from models.db import DBUser
from models.schemas import UserImport
from typing import List, Dict
import traceback
# 初始化 Celery 實例
celery_app = Celery("fastapi_tasks", config_source=CELERY_CONFIG)
# 定義異步任務(wù):解析 Excel 并批量導(dǎo)入用戶
@celery_app.task(bind=True, name="import_users_from_excel")
def import_users_from_excel(self, file_path: str) -> Dict[str, any]:
"""
異步任務(wù):從 Excel 文件批量導(dǎo)入用戶
:param self: 任務(wù)實例(用于記錄任務(wù)狀態(tài))
:param file_path: 上傳的 Excel 文件路徑
:return: 任務(wù)結(jié)果(成功數(shù)量、失敗數(shù)量、失敗原因)
"""
result = {
"success_count": 0,
"fail_count": 0,
"failures": [], # 存儲失敗的數(shù)據(jù)和原因
"status": "completed"
}
try:
# 1. 解析 Excel 文件(pandas 讀取 .xlsx)
df = pd.read_excel(file_path, engine="openpyxl")
# 確保列名正確(Excel 第一行必須是 username、email、full_name)
required_columns = ["username", "email", "full_name"]
if not all(col in df.columns for col in required_columns):
raise ValueError(f"Excel 缺少必需列!需要:{required_columns}")
# 2. 數(shù)據(jù)校驗(用 Pydantic 過濾臟數(shù)據(jù))
valid_users: List[Dict] = []
for idx, row in df.iterrows():
row_data = row.to_dict()
try:
# 校驗數(shù)據(jù)格式
user = UserImport(**row_data)
valid_users.append(user.dict())
except Exception as e:
# 記錄失敗數(shù)據(jù)和原因
result["fail_count"] += 1
result["failures"].append({
"row": idx + 2, # Excel 行號從 2 開始(第一行是表頭)
"data": row_data,
"reason": str(e)
})
# 3. 批量導(dǎo)入數(shù)據(jù)庫(異步會話,需要用 asyncio.run 執(zhí)行)
if valid_users:
import asyncio
success_num = asyncio.run(_batch_insert_users(valid_users))
result["success_count"] = success_num
result["fail_count"] += len(valid_users) - success_num # 可能有重復(fù)用戶名/郵箱導(dǎo)致插入失敗
except Exception as e:
# 任務(wù)執(zhí)行失敗(如文件損壞、數(shù)據(jù)庫連接錯誤)
result["status"] = "failed"
result["error"] = str(e)
result["traceback"] = traceback.format_exc()
return result
async def _batch_insert_users(users: List[Dict]) -> int:
"""批量插入用戶到數(shù)據(jù)庫(異步函數(shù))"""
async with AsyncSessionLocal() as db:
success_count = 0
for user_data in users:
try:
# 檢查用戶名/郵箱是否已存在
from sqlalchemy import select
result = await db.execute(
select(DBUser).where(
(DBUser.username == user_data["username"]) |
(DBUser.email == user_data["email"])
)
)
if not result.scalar_one_or_none():
# 不存在則插入
db_user = DBUser(**user_data)
db.add(db_user)
success_count += 1
except Exception as e:
print(f"插入用戶失敗:{user_data},原因:{e}")
await db.commit()
return success_count關(guān)鍵說明:
- @celery_app.task:標(biāo)記這是一個 Celery 異步任務(wù),bind=True 可以獲取任務(wù)實例,用于后續(xù)擴展(如更新任務(wù)進度);
- pandas 解析 Excel:指定 engine="openpyxl" 才能讀取 .xlsx 格式,.xls 格式需要安裝 xlrd 依賴;
- 數(shù)據(jù)校驗:用 Pydantic 的 UserImport 模型過濾臟數(shù)據(jù),避免無效數(shù)據(jù)導(dǎo)入數(shù)據(jù)庫;
- 批量插入:用異步數(shù)據(jù)庫會話批量處理,提高效率;同時檢查用戶名/郵箱重復(fù),避免插入失敗。
3. FastAPI 文件上傳接口(main.py)
實現(xiàn)兩個核心接口:文件上傳接口(觸發(fā)異步任務(wù))、任務(wù)狀態(tài)查詢接口(讓用戶查看處理進度):
from fastapi import FastAPI, File, UploadFile, HTTPException, Depends
from fastapi.responses import JSONResponse
from celery.result import AsyncResult
from tasks import celery_app
from database import get_db, Base, async_engine
from sqlalchemy.ext.asyncio import AsyncSession
import os
from contextlib import asynccontextmanager
from typing import AsyncGenerator
# 定義生命周期:啟動時創(chuàng)建數(shù)據(jù)庫表
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 初始化 FastAPI 實例
app = FastAPI(
title="FastAPI 異步任務(wù) + 文件上傳實戰(zhàn)",
description="Excel 批量導(dǎo)入用戶數(shù)據(jù)全流程",
lifespan=lifespan
)
# 配置文件上傳路徑(確保目錄存在)
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True) # 不存在則創(chuàng)建
# --- 1. 文件上傳接口:上傳 Excel 并觸發(fā)異步任務(wù) ---
@app.post("/upload/excel/", summary="上傳 Excel 批量導(dǎo)入用戶")
async def upload_excel(
file: UploadFile = File(..., description="上傳 .xlsx 格式的 Excel 文件"),
db: AsyncSession = Depends(get_db)
):
# 第一步:校驗文件格式和大小
# 校驗格式:只允許 .xlsx
if not file.filename.endswith(".xlsx"):
raise HTTPException(
status_code=400,
detail="只支持 .xlsx 格式的 Excel 文件!"
)
# 校驗大小:限制 10MB 以內(nèi)(可根據(jù)需求調(diào)整)
file_size = 0
contents = await file.read()
file_size = len(contents)
MAX_SIZE = 10 * 1024 * 1024 # 10MB
if file_size > MAX_SIZE:
raise HTTPException(
status_code=400,
detail=f"文件大小超過限制!最大支持 10MB,當(dāng)前文件 {file_size/1024/1024:.2f}MB"
)
# 第二步:保存文件到本地(uploads 目錄)
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as f:
f.write(contents)
# 第三步:觸發(fā)異步任務(wù)(celery_app.send_task 提交任務(wù))
task = celery_app.send_task(
"import_users_from_excel", # 任務(wù)名稱(和 tasks.py 中定義的一致)
args=[file_path] # 傳遞給任務(wù)的參數(shù)(文件路徑)
)
# 第四步:返回任務(wù) ID,讓用戶后續(xù)查詢狀態(tài)
return JSONResponse(
status_code=200,
content={
"message": "文件上傳成功,異步導(dǎo)入任務(wù)已啟動!",
"task_id": task.id, # 任務(wù)唯一 ID
"file_name": file.filename,
"file_size": f"{file_size/1024:.2f}KB"
}
)
# --- 2. 任務(wù)狀態(tài)查詢接口 ---
@app.get("/task/{task_id}/", summary="查詢異步任務(wù)狀態(tài)和結(jié)果")
async def get_task_status(task_id: str):
# 獲取任務(wù)結(jié)果對象
task_result = AsyncResult(task_id, app=celery_app)
# 構(gòu)造任務(wù)狀態(tài)信息
result = {
"task_id": task_id,
"status": task_result.status, # PENDING(待執(zhí)行)/ RUNNING(執(zhí)行中)/ SUCCESS(成功)/ FAILURE(失敗)
"result": None,
"error": None
}
# 根據(jù)任務(wù)狀態(tài)返回不同信息
if task_result.status == "SUCCESS":
result["result"] = task_result.result # 任務(wù)執(zhí)行成功的結(jié)果(成功數(shù)量、失敗數(shù)量等)
elif task_result.status == "FAILURE":
result["error"] = str(task_result.result) # 任務(wù)失敗的原因
return result
# 運行程序
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)重點說明:
- 文件校驗:必須校驗格式(只允許 .xlsx)和大小(避免超大文件占用資源);
- 文件保存:先把上傳的文件保存到本地 uploads 目錄,再把文件路徑傳給異步任務(wù)(任務(wù)需要讀取文件);
- 任務(wù)觸發(fā):用 celery_app.send_task() 提交任務(wù),返回的 task.id 是查詢?nèi)蝿?wù)狀態(tài)的關(guān)鍵;
- 狀態(tài)查詢:通過 AsyncResult 獲取任務(wù)狀態(tài),支持 4 種核心狀態(tài),用戶可實時查看進度。
五、測試驗證:3步跑通全流程
所有代碼寫完后,按以下步驟啟動服務(wù)并測試,新手跟著操作即可:
第一步:啟動依賴服務(wù)
- 啟動 Redis:終端執(zhí)行 redis-server(保持運行,不要關(guān)閉);
- 啟動 MySQL:確保本地 MySQL 服務(wù)已啟動,且 fastapi_demo 數(shù)據(jù)庫已創(chuàng)建。
第二步:啟動 Celery worker(后臺任務(wù)車間)
新建一個終端,進入項目目錄,執(zhí)行以下命令啟動 Celery worker:
celery -A tasks.celery_app worker --loglevel=info看到 “celery@xxx ready” 說明啟動成功(保持運行,不要關(guān)閉)。
第三步:啟動 FastAPI 服務(wù)并測試
- 啟動 FastAPI:終端執(zhí)行 python main.py;
- 訪問測試界面:打開瀏覽器訪問 http://127.0.0.1:8000/docs;
- 測試文件上傳:找到 /upload/excel/ 接口,點擊「Try it out」;
- 選擇準(zhǔn)備好的 users.xlsx 文件,點擊「Execute」;
- 成功后會返回 task_id(如 "task_id": "a1b2c3d4-xxxx-xxxx-xxxx-xxxx")。
- 測試任務(wù)查詢:找到 /task/{task_id}/ 接口,點擊「Try it out」;
- 輸入剛才得到的 task_id,點擊「Execute」;
- 查看結(jié)果:如果任務(wù)完成,會返回成功數(shù)量、失敗數(shù)量(如有臟數(shù)據(jù));如果失敗,會返回錯誤原因。
六、實戰(zhàn)避坑指南:新手最容易踩的五個坑
(1) Redis 未啟動/連接失敗:Celery 啟動會報錯“Cannot connect to Redis”,
解決:確保 redis-server 已啟動,.env 里的 REDIS_URL 正確(默認 redis://localhost:6379/0);
(2) Excel 格式錯誤:上傳 .xls 格式或非 Excel 文件會報錯,
解決:接口里已做格式校驗,嚴格要求 .xlsx 格式;
(3) Celery 任務(wù)名稱不匹配:觸發(fā)任務(wù)后 Celery 沒反應(yīng),
解決:send_task 的任務(wù)名稱必須和 tasks.py 中 @celery_app.task(name="xxx") 定義的一致;
(4) 文件路徑問題:Celery 任務(wù)提示“文件不存在”,
解決:確保 uploads 目錄已創(chuàng)建(代碼里用了 os.makedirs(UPLOAD_DIR, exist_ok=True),已規(guī)避);
(5) 數(shù)據(jù)庫異步任務(wù)執(zhí)行失敗:任務(wù)中調(diào)用異步數(shù)據(jù)庫函數(shù)報錯,
解決:用 asyncio.run() 執(zhí)行異步函數(shù)(如代碼中 asyncio.run(_batch_insert_users(valid_users)))。
七、進階擴展:讓功能更貼近生產(chǎn)環(huán)境
本次實現(xiàn)的是基礎(chǔ)版本,實際生產(chǎn)環(huán)境可擴展以下功能:
- 文件清理:任務(wù)執(zhí)行完成后,刪除本地 uploads 目錄的 Excel 文件(避免占用磁盤空間);
- 任務(wù)進度實時更新:在 import_users_from_excel 任務(wù)中,用 self.update_state(state="PROGRESS", meta={"current": 10, "total": 100}) 更新進度,接口返回當(dāng)前進度;
- 處理結(jié)果通知:任務(wù)完成后,通過郵件/短信/站內(nèi)信通知用戶(可集成 smtplib 發(fā)郵件);
- 分布式部署:Celery 支持多 worker 分布式部署,應(yīng)對高并發(fā)場景;
- 文件存儲優(yōu)化:大文件可上傳到 MinIO/OSS 等對象存儲,而不是本地磁盤。
八、核心總結(jié)
本次實戰(zhàn)的核心價值,是掌握“文件上傳 + 異步任務(wù)”的聯(lián)動邏輯——這是 FastAPI 開發(fā)中處理“耗時操作 + 用戶交互”場景的標(biāo)準(zhǔn)方案:
- FastAPI 負責(zé)“前端交互”:接收文件、校驗格式、返回任務(wù) ID;
- Celery + Redis 負責(zé)“后臺處理”:調(diào)度異步任務(wù)、執(zhí)行耗時操作、存儲任務(wù)結(jié)果;
- 全流程閉環(huán):用戶上傳文件 → 觸發(fā)異步任務(wù) → 隨時查詢進度 → 收到處理結(jié)果,體驗流暢且不阻塞接口。
新手不用一開始就追求進階功能,先把基礎(chǔ)流程跑通,再逐步擴展。本次代碼可直接作為項目模板,替換 Excel 解析和數(shù)據(jù)導(dǎo)入邏輯,就能適配“批量導(dǎo)入商品”“批量導(dǎo)出數(shù)據(jù)”等類似場景。


































