关键词: fastapi body pending

概括

倘若在中间件中消费了请求体,会导致程序卡死在下一步骤处理函数call_next中。

要想对请求体做预处理,需要通过自定义请求+APIRouter解决。

问题代码示例

当使用request_json = await request.json()消费了请求对象后,程序会卡在response = await call_next(request)无法进行下去。

经过测试,通过await request.json()或者await request.body()消费后,程序均会卡在await call_next(request)

而使用request.headers.get("X-Sign")获取请求头信息则不会出现这种情况

@app.middleware("http")
async def sync_middleware(request: Request, call_next):
    request_json = await request.json()
    _data = {
        "ip":request.client.host,
        "X-Sign":request.headers.get("X-Sign"),
        "body":request_json,
    }
    # 同步代码,做鉴权
    result = await run_in_threadpool(sync_code, _data)
    if result != 200:
        return Response(status_code=result)
    
    response = await call_next(request)
    return response

问题原因

这是FastAPI的设计问题,可以在FastAPI GitHub的issues中找到不少相关问题,例如:

https://github.com/fastapi/fastapi/issues/394

https://github.com/fastapi/fastapi/issues/5386

这是一个起码从2019年便被发现并存在至今的问题。

用一句话描述就是请求体只能被读取一次,如果在中间件中已经读取了请求体,那么后续的任何尝试再次读取请求体的操作都将陷入无限等待。

详细原因可以看GitHub中的讨论,这里不细说。

解决办法可以参考官网这篇教程:

https://fastapi.tiangolo.com/how-to/custom-request-and-route/#create-a-custom-gziproute-class

中文的问题分析和自定义请求编写可以参考如下文章:

https://www.modb.pro/db/144294

https://www.cnblogs.com/a00ium/p/13662335.html

问题解决

from fastapi import FastAPI, Request, Response, APIRouter
from typing import Callable
from fastapi.routing import APIRoute
from pydantic import BaseModel

class middlewareRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            
            # 这里可以获取的我们的请求的体的信息----
            
            response: Response = await original_route_handler(request)

            request_body = await request.body()

            _data = {
                "ip":request.client.host,
                "X-Sign":request.headers.get("X-Sign"),
                "body":request_body,
            }

            # 同步代码,做鉴权
            result = await run_in_threadpool(sync_code, _data)
            if result != 200:
                return Response(status_code=result)
            
            # 下面可以处理我们的响应体的报文信息
            
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            # print(f"route duration: {duration}")
            # print(f"route response: {response}")
            # print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler
        
app = FastAPI()
router = APIRouter(route_class=middlewareRoute)

@app.get("/")
def index():
    return {}

class SignData(BaseModel):
    Zzz: str
    Abc: str
    Def: str

@router.post("/func")
def func_test(signData:SignData):
    # 省略功能代码
    return {}

app.include_router(router)
最后修改:2024 年 09 月 25 日
如果觉得我的文章对你有用,请随意赞赏