Task取消和子协程调用原理

 

task取消的例子

import asyncio
import time

async def get_html(sleep_time):
  print("waiting")
  await asyncio.sleep(sleep_time)
  print("done after {}s".format(sleep_times))

if __name__ == "__main__":
  task1 = get_html(2)
  task2 = get_html(3)
  task3 = get_html(4)

  tasks = [task1,task2,task3]

  loop = asyncio.get_event_loop()

try:
  loop.run_until_complete(asyncio.wait(tasks))
  except KeyboardInterrupt as e:
    all_tasks = asynico.Task.all_tasks()
    for task in all_tasks:
      print("cancel task")
      print(task.cancel())
      loop.stop()
      loop.run_forever() #不加会报错
  finally:
    loop.close() 

关于线程取消的问题

下面我有一段代码:


async def cancel_me2():
    try:
        # Wait for 1 hour
        await asyncio.sleep(3600) # 这里看作是一个异步http请求
    except asyncio.CancelledError:
        print('cancel_me()2: cancel sleep')
        raise


async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await cancel_me2() #看作是发起了一个http请求
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main(loop):
    # Create a "cancel_me" Task
    task = loop.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

#except ouput
#cancel_me(): before sleep
#cancel_me()2: cancel sleep
#cancel_me(): cancel sleep
#cancel_me(): after sleep
#main(): cancel_me is cancelled now

在上面的这段代码中我们的called_me() 任务被取消了,然而,第一个捕捉到asyncio.CancelledError的则是在cancel_me2的协程里面。

假设main ( a 主机),它向cancel_me (b主机)发送http请求,camel_me 又向c主机发送http 请求,网络请求链路为 a->b->c

假设a主机中断(比如超时)了和b主机的连接(cancel),那么第一个捕捉到asyncio.CancelledError 错误是会被捕捉到在 b 主机向 c 主机发送请求的那段代码上,而不是b主机接受a主机请求的代码的代码上。这种机制使得debug超时请求这类任务很麻烦,很容易以为是b主机向c主机请求超时

子协程调用


# 协程嵌套 https://docs.python.org/3/library/asyncio-task.html
import asyncio


async def compute(x, y):
    print('Compute %s %s' % (x, y))
    await  asyncio.sleep(1)
    return x + y


async def print_sum(x, y):
    result = await  compute(x, y)
    print('%s + %s = %s' % (x, y, result))


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1, 2))
    loop.close()

协程调用流程图

当事件循环开始运行时,它会在Task中寻找coroutine来执行调度,因为事件循环注册了print_sum()因此print_sum()被调用,执行result = await compute(x, y)这条语句(等同于result = yield from compute(x, y)),因为compute()自身就是一个coroutine,因此print_sum()这个协程就会暂时被挂起,compute()被加入到事件循环中,程序流执行compute()中的print语句,打印”Compute %s + %s …”,然后执行了await asyncio.sleep(1.0),因为asyncio.sleep()也是一个coroutine,接着compute()就会被挂起,等待计时器读秒,在这1秒的过程中,事件循环会在队列中查询可以被调度的coroutine,而因为此前print_sum()与compute()都被挂起了,因此事件循环会停下来等待协程的调度,当计时器读秒结束后,程序流便会返回到compute()中执行return语句,结果会返回到print_sum()中的result中,最后打印result,事件队列中没有可以调度的任务了,此时loop.close()把事件队列关闭,程序结束

1.从compute 返回到Task,没有经过print_sum(),因为await已经建立了调用方和子协程之间的通道

2.因为event loop 中已经没有可调用协程了,event loop 会休息一秒,然后再直接与compute()通信