Flask Async代码分析:async_func函数未执行完,日志无记录

帮我看用 flask async框架写的一段代码

async def async_func(data): logger.debug('run async func...') await asyncio.sleep(3)

testDao.save(data)
logger.debug('run async func res:%s', data)
return data

async def run_async_func(data: dict): loop = asyncio.get_running_loop() task = loop.create_task(async_func(data))

@http_app.route('/test') async def index(): logger.debug('/test') name = str(uuid.uuid1()) data = { 'name': name, 'time': time.time() }

result = await run_async_func(data)
return {"code":0}

上面 async_func 没有执行完,一直挂在await asyncio.sleep(3),看日志没有执行 logger.debug('run async func res:%s', data)

根据您提供的代码,似乎您忘记在 run_async_func 中添加 await task 来等待 async_func 函数完成了。因此,当它调用时,没有等待异步函数完成就返回了。

您可以将 run_async_func 函数更改为以下内容来解决此问题:

async def run_async_func(data: dict):
    loop = asyncio.get_running_loop()
    task = loop.create_task(async_func(data))
    await task
    return task.result()

这样,create_task 将创建一个任务,但是 await task 会暂停执行直到异步函数完成。然后,它将返回 async_func 的结果。

如果加上 await task 就变成阻塞同步了,我希望不要阻塞

如果您想保持异步,可以在 run_async_func 中返回任务对象,然后在视图函数中使用 await 语句来等待任务完成并获取结果。例如:

async def run_async_func(data: dict):
    loop = asyncio.get_running_loop()
    task = loop.create_task(async_func(data))
    return task

@http_app.route('/test')
async def index():
    logger.debug('/test')
    name = str(uuid.uuid1())
    data = {
        'name': name,
        'time': time.time()
    }
    
    task = await run_async_func(data)
    result = task.result()
    return {"code":0}

这样,run_async_func 将创建一个任务并返回它。然后,在视图函数中,使用 await 等待任务完成并获取其结果。注意,这里必须使用 await 来等待任务完成,否则视图函数将在任务还未完成时立即返回。

这样改/test接口还是变成阻塞同步了, 我希望的是不要等待async_func直接返回 ,async_func在后台执行

我看到您的 run_async_func 没有返回结果,所以在 index 视图函数中调用 await run_async_func(data) 返回的是 None。这会阻塞视图函数,直到 async_func 函数执行完成为止。

如果您想要在后台执行 async_func 函数且不阻塞 index 视图函数,可以在 run_async_func 函数内使用 create_task 来创建一个任务对象,并立即返回该任务对象,然后你可以通过一些方式获取任务执行结果。

例如,您可以将 run_async_func 更改如下:

async def run_async_func(data: dict):
    loop = asyncio.get_running_loop()
    task = loop.create_task(async_func(data))
    return task

现在,在 index 视图函数中调用 run_async_func 时,它将立即返回一个任务对象。接下来,您可以在另一个异步函数或协程中等待任务完成并获取结果。

例如:

@http_app.route('/test')
async def index():
    logger.debug('/test')
    name = str(uuid.uuid1())
    data = {
        'name': name,
        'time': time.time()
    }

    # 在后台执行 async_func
    task = await run_async_func(data)
    
    # 这里可以执行一些其他耗时操作
    
    # 等待 async_func 完成并获取结果
    result = await task
    
    return {"code":0}

现在,当您调用 run_async_func 时,它将立即返回一个任务对象。接下来,您可以执行一些其他操作,如向客户端发送响应或执行其他的异步任务。最后,当您需要获取 async_func 的结果时,只需通过 await task 来等待任务完成并获取其返回值即可。这样,视图函数就不会阻塞,而异步函数仍然在后台执行。

这样改/test还是等待结果返回了,我不要等结果,我会通过/result接口取查询结果

根据您的代码,run_async_func 函数没有返回结果。因此,当您在视图函数中调用 result = await run_async_func(data) 时,将等待异步函数完成并返回结果。这会阻塞视图函数,直到异步函数完成为止。

如果您想在后台执行异步函数且不阻塞视图函数,可以使用 create_task 来创建一个任务对象,并立即返回该任务对象,然后你可以通过一些方式获取任务执行结果。

例如,您可以将 run_async_func 更改如下:

async def run_async_func(data: dict):
    loop = asyncio.get_running_loop()
    task = loop.create_task(async_func(data))
    return task

现在,在 index 视图函数中调用 run_async_func 时,它将立即返回一个任务对象。接下来,您可以在另一个异步函数或协程中等待任务完成并获取结果。

例如,您可以添加另一个视图函数 /result,以便客户端可以查询异步函数的结果。例如:

@http_app.route('/result/<task_id>')
async def get_result(task_id):
    task = tasks.get(task_id)
    if task is None:
        return {'status': 'error', 'message': 'Task not found'}
    elif task.done():
        try:
            result = task.result()
            return {'status': 'success', 'result': result}
        except Exception as e:
            return {'status': 'error', 'message': str(e)}
    else:
        return {'status': 'running'}

这里,get_result 视图函数接受一个 task_id 参数,并使用它来获取对应的任务对象。如果任务未完成,它将返回 'status': 'running'。如果任务已经完成,它将尝试获取任务结果并返回 { 'status': 'success', 'result': result } 。如果在执行异步函数时出现错误,则返回 { 'status': 'error', 'message': str(e) }

然后,在视图函数中,您可以像下面这样调用 run_async_func 函数:

@http_app.route('/test')
async def index():
    logger.debug('/test')
    name = str(uuid.uuid1())
    data = {
        'name': name,
        'time': time.time()
    }

    # 在后台执行 async_func
    task = await run_async_func(data)
    task_id = str(id(task))
    tasks[task_id] = task
    
    return {'status': 'accepted', 'task_id': task_id}

现在,当您调用 run_async_func 时,它将立即返回一个任务对象。接下来,您可以将该任务对象存储到一个全局字典中(如 tasks)。然后,将任务 ID 返回给客户端作为响应,以便客户端可以稍后使用 /result 视图函数查询任务结果。

请注意,您需要实现一个机制来保存和管理任务,在异步函数完成之前,您需要能够将任务 ID 传递给客户端。

上面 @http_app.route('/result/<task_id>') async def get_result(task_id): task = tasks.get(task_id) if task is None: return {'status': 'error', 'message': 'Task not found'} elif task.done(): try: result = task.result() return {'status': 'success', 'result': result} except Exception as e: return {'status': 'error', 'message': str(e)} else: return {'status': 'running'} 报错 result = task.result() asyncio.exceptions.CancelledError

关于第一个问题,您可以将 run_async_func 函数修改为:

async def run_async_func(data: dict):
    loop = asyncio.get_running_loop()
    return await loop.create_task(async_func(data))

这样就可以等待异步函数执行完成并返回结果。

关于第二个问题,看起来任务已经被取消了。可能是在某些情况下取消了任务。为了避免这种情况,您可以使用 try/except 语句来捕获 CancelledError 异常,并根据需要进行处理。例如:

@http_app.route('/result/<task_id>')
async def get_result(task_id):
    task = tasks.get(task_id)
    if task is None:
        return {'status': 'error', 'message': 'Task not found'}
    elif task.done():
        try:
            result = task.result()
            return {'status': 'success', 'result': result}
        except asyncio.CancelledError:
            return {'status': 'cancelled'}
        except Exception as e:
            return {'status': 'error', 'message': str(e)}
    else:
        return {'status': 'running'}

这里,我们添加了一个 try/except 语句来捕获 asyncio.CancelledError 异常。如果任务被取消,则返回 { 'status': 'cancelled' }