网站首页 全球最实用的IT互联网站!

人工智能P2P分享Wind搜索发布信息网站地图标签大全

当前位置:诺佳网 > 软件工程 > 后端开发 > Python >

FastAPI异步方法中调用同步方法

时间:2026-01-06 16:07

人气:

作者:admin

标签:

导读:前言 在异步方法中调用同步方法,会直接阻塞整个事件循环,导致应用在执行同步方法期间无法处理其他任何并发请求,从而拖垮整个服务的性能。 为了解决这个问题,核心思路是将同...

前言

在异步方法中调用同步方法,会直接阻塞整个事件循环,导致应用在执行同步方法期间无法处理其他任何并发请求,从而拖垮整个服务的性能。

为了解决这个问题,核心思路是将同步方法交给外部线程池去执行。

方法1, 使用 to_thread

Python 3.9 后可以使用 asyncio.to_thread 方法,将同步函数跑在独立的线程中,并返回一个协程供 await

import asyncio
import time
from fastapi import FastAPI

app = FastAPI()

def sync_task(name: str):
    time.sleep(2) 
    return f"Hello {name}, sync task done!"

@app.get("/async-call")
async def async_endpoint():
    result = await asyncio.to_thread(sync_task, "World")
    
    return {"message": result}

方法2, 直接定义同步路由

FastAPI支持定义同步路由,FastAPI会自动在一个外部线程池中运行该函数。不过出于代码整体设计的考虑,个人不建议这么做。

方法3, 使用 run_in_threadpool

FastAPI 基于 Starlette, 而 Starlette 提供一个工具函数 run_in_threadpool,这种方式类似于 asyncio.to_thread,在某些老版本的 FastAPI 或特定的 contextvars 传递场景下更常用。

from fastapi.concurrency import run_in_threadpool

@app.get("/method3")
async def starlette_endpoint():
    result = await run_in_threadpool(sync_task, "Starlette")
    return {"message": result}

方法4, 使用进程池

对于CPU密集型任务,应该使用多进程ProcessPoolExecutor来操作

import concurrent.futures
import math
from fastapi import FastAPI

app = FastAPI()
# 创建一个全局进程池
executor = concurrent.futures.ProcessPoolExecutor()

def cpu_intensive_calculation(n: int):
    # 模拟重度 CPU 计算
    return sum(math.isqrt(i) for i in range(n))

@app.get("/cpu-bound-task")
async def cpu_task():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(executor, cpu_intensive_calculation, 10**7)
    return {"result": result}

本文来自博客园,作者:花酒锄作田,转载请注明原文链接:https://www.cnblogs.com/XY-Heruo/p/19448294

温馨提示:以上内容整理于网络,仅供参考,如果对您有帮助,留下您的阅读感言吧!
相关阅读
本类排行
相关标签
本类推荐

CPU | 内存 | 硬盘 | 显卡 | 显示器 | 主板 | 电源 | 键鼠 | 网站地图

Copyright © 2025-2035 诺佳网 版权所有 备案号:赣ICP备2025066733号
本站资料均来源互联网收集整理,作品版权归作者所有,如果侵犯了您的版权,请跟我们联系。

关注微信