在FastAPI中实现背景任务可以使用Python的asyncio库来实现。以下是一个简单的示例代码:
from fastapi import BackgroundTasks, FastAPIimport asyncioapp = FastAPI()def background_task(): # 模拟一个长时间运行的任务 asyncio.sleep(5) print("Background task completed")@app.post("/send-notification/{message}")async def send_notification(message: str, background_tasks: BackgroundTasks): background_tasks.add_task(background_task) return {"message": f"Notification '{message}' sent in the background"}在上面的示例中,我们定义了一个背景任务background_task,它模拟了一个长时间运行的任务。然后我们定义了一个路由/send-notification/{message},当用户访问这个路由时,会触发发送通知的操作,并将背景任务background_task添加到BackgroundTasks中。这样在接收到请求后,就会异步执行这个背景任务,不会阻塞主线程。
请注意,需要在启动应用程序时运行uvicorn服务器时添加--reload参数,以便在代码更改时重新加载应用程序。


