帮我看用 flask async框架写的一段代码
async def async_func(data): logger.debug('run async func...') await asyncio.sleep(3)
testDao.save(data)
logger.debug('run async func res:%s', data)
return data
async def run_async_func(data: dict): loop = asyncio.get_running_loop() task = loop.create_task(async_func(data))
@http_app.route('/test') async def index(): logger.debug('/test') name = str(uuid.uuid1()) data = { 'name': name, 'time': time.time() }
result = await run_async_func(data)
return {"code":0}
上面 async_func 没有执行完,一直挂在await asyncio.sleep(3),看日志没有执行 logger.debug('run async func res:%s', data)
根据您提供的代码,似乎您忘记在 run_async_func
中添加 await task
来等待 async_func
函数完成了。因此,当它调用时,没有等待异步函数完成就返回了。
您可以将 run_async_func
函数更改为以下内容来解决此问题:
async def run_async_func(data: dict):
loop = asyncio.get_running_loop()
task = loop.create_task(async_func(data))
await task
return task.result()
这样,create_task
将创建一个任务,但是 await task
会暂停执行直到异步函数完成。然后,它将返回 async_func
的结果。
如果加上 await task 就变成阻塞同步了,我希望不要阻塞
如果您想保持异步,可以在 run_async_func
中返回任务对象,然后在视图函数中使用 await
语句来等待任务完成并获取结果。例如:
async def run_async_func(data: dict):
loop = asyncio.get_running_loop()
task = loop.create_task(async_func(data))
return task
@http_app.route('/test')
async def index():
logger.debug('/test')
name = str(uuid.uuid1())
data = {
'name': name,
'time': time.time()
}
task = await run_async_func(data)
result = task.result()
return {"code":0}
这样,run_async_func
将创建一个任务并返回它。然后,在视图函数中,使用 await
等待任务完成并获取其结果。注意,这里必须使用 await
来等待任务完成,否则视图函数将在任务还未完成时立即返回。
这样改/test接口还是变成阻塞同步了, 我希望的是不要等待async_func直接返回 ,async_func在后台执行
我看到您的 run_async_func
没有返回结果,所以在 index
视图函数中调用 await run_async_func(data)
返回的是 None
。这会阻塞视图函数,直到 async_func
函数执行完成为止。
如果您想要在后台执行 async_func
函数且不阻塞 index
视图函数,可以在 run_async_func
函数内使用 create_task
来创建一个任务对象,并立即返回该任务对象,然后你可以通过一些方式获取任务执行结果。
例如,您可以将 run_async_func
更改如下:
async def run_async_func(data: dict):
loop = asyncio.get_running_loop()
task = loop.create_task(async_func(data))
return task
现在,在 index
视图函数中调用 run_async_func
时,它将立即返回一个任务对象。接下来,您可以在另一个异步函数或协程中等待任务完成并获取结果。
例如:
@http_app.route('/test')
async def index():
logger.debug('/test')
name = str(uuid.uuid1())
data = {
'name': name,
'time': time.time()
}
# 在后台执行 async_func
task = await run_async_func(data)
# 这里可以执行一些其他耗时操作
# 等待 async_func 完成并获取结果
result = await task
return {"code":0}
现在,当您调用 run_async_func
时,它将立即返回一个任务对象。接下来,您可以执行一些其他操作,如向客户端发送响应或执行其他的异步任务。最后,当您需要获取 async_func
的结果时,只需通过 await task
来等待任务完成并获取其返回值即可。这样,视图函数就不会阻塞,而异步函数仍然在后台执行。
这样改/test还是等待结果返回了,我不要等结果,我会通过/result接口取查询结果
根据您的代码,run_async_func
函数没有返回结果。因此,当您在视图函数中调用 result = await run_async_func(data)
时,将等待异步函数完成并返回结果。这会阻塞视图函数,直到异步函数完成为止。
如果您想在后台执行异步函数且不阻塞视图函数,可以使用 create_task
来创建一个任务对象,并立即返回该任务对象,然后你可以通过一些方式获取任务执行结果。
例如,您可以将 run_async_func
更改如下:
async def run_async_func(data: dict):
loop = asyncio.get_running_loop()
task = loop.create_task(async_func(data))
return task
现在,在 index
视图函数中调用 run_async_func
时,它将立即返回一个任务对象。接下来,您可以在另一个异步函数或协程中等待任务完成并获取结果。
例如,您可以添加另一个视图函数 /result
,以便客户端可以查询异步函数的结果。例如:
@http_app.route('/result/<task_id>')
async def get_result(task_id):
task = tasks.get(task_id)
if task is None:
return {'status': 'error', 'message': 'Task not found'}
elif task.done():
try:
result = task.result()
return {'status': 'success', 'result': result}
except Exception as e:
return {'status': 'error', 'message': str(e)}
else:
return {'status': 'running'}
这里,get_result
视图函数接受一个 task_id
参数,并使用它来获取对应的任务对象。如果任务未完成,它将返回 'status': 'running'
。如果任务已经完成,它将尝试获取任务结果并返回 { 'status': 'success', 'result': result }
。如果在执行异步函数时出现错误,则返回 { 'status': 'error', 'message': str(e) }
。
然后,在视图函数中,您可以像下面这样调用 run_async_func
函数:
@http_app.route('/test')
async def index():
logger.debug('/test')
name = str(uuid.uuid1())
data = {
'name': name,
'time': time.time()
}
# 在后台执行 async_func
task = await run_async_func(data)
task_id = str(id(task))
tasks[task_id] = task
return {'status': 'accepted', 'task_id': task_id}
现在,当您调用 run_async_func
时,它将立即返回一个任务对象。接下来,您可以将该任务对象存储到一个全局字典中(如 tasks
)。然后,将任务 ID 返回给客户端作为响应,以便客户端可以稍后使用 /result
视图函数查询任务结果。
请注意,您需要实现一个机制来保存和管理任务,在异步函数完成之前,您需要能够将任务 ID 传递给客户端。
上面 @http_app.route('/result/<task_id>') async def get_result(task_id): task = tasks.get(task_id) if task is None: return {'status': 'error', 'message': 'Task not found'} elif task.done(): try: result = task.result() return {'status': 'success', 'result': result} except Exception as e: return {'status': 'error', 'message': str(e)} else: return {'status': 'running'} 报错 result = task.result() asyncio.exceptions.CancelledError
关于第一个问题,您可以将 run_async_func
函数修改为:
async def run_async_func(data: dict):
loop = asyncio.get_running_loop()
return await loop.create_task(async_func(data))
这样就可以等待异步函数执行完成并返回结果。
关于第二个问题,看起来任务已经被取消了。可能是在某些情况下取消了任务。为了避免这种情况,您可以使用 try/except
语句来捕获 CancelledError
异常,并根据需要进行处理。例如:
@http_app.route('/result/<task_id>')
async def get_result(task_id):
task = tasks.get(task_id)
if task is None:
return {'status': 'error', 'message': 'Task not found'}
elif task.done():
try:
result = task.result()
return {'status': 'success', 'result': result}
except asyncio.CancelledError:
return {'status': 'cancelled'}
except Exception as e:
return {'status': 'error', 'message': str(e)}
else:
return {'status': 'running'}
这里,我们添加了一个 try/except
语句来捕获 asyncio.CancelledError
异常。如果任务被取消,则返回 { 'status': 'cancelled' }
。