Asyncio代码例子

 

asyncio包含以下功能

1.包含各种特定系统实现的模块化事件循环模块

2.传输和协议抽象

3.对TCP,UDP,SSL,子进程延时调用以及其他的具体支持

4.模仿future模块但是适用于事件循环的Feature类

5.基于yield from 的协议和任务,让我们用顺序的方法编写并发代码

6.必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池。

Asyncio代码使用实例

import asyncio
import time


async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)  # 不可以使用同步阻塞的方法 time.sleep(2) 来操作
    print('end get url')


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # tasks = [get_html('https://blgo.ronething.cn') for i in range(10)]
    # run_until_complete 相当于线程的 join 阻塞

    # wait 和 gather 的区别
    # gather 更加 high-level 可以分组 可以取消任务
    # loop.run_until_complete(asyncio.wait(tasks))
    # loop.run_until_complete(asyncio.gather(*tasks))
    group1 = [get_html('https://blgo.ronething.cn') for i in range(10)]
    group2 = [get_html('https://blgo.ronething.cn') for i in range(10)]
    group1 = asyncio.gather(*group1)
    group2 = asyncio.gather(*group2)
    group2.cancel()
    loop.run_until_complete(asyncio.gather(group1, group2))
    print(time.time() - start_time)

在get_html中,sleep不能用time.sleep,这是一个阻塞的操作,使用asyncio.sleep的时候会返回一个Feature,然后退出协程,在下一轮loop的时候会判断时间有没到,到了再继续往下运行。因此在协程是单线程的,因此使用了同步的驱动(pymysql)会一直阻塞线程。

获取协程结果

如果想要获取每个协程的结果,可以使用以下代码



import asyncio
import time


async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)  # 不可以使用同步阻塞的方法 time.sleep(2) 来操作
    return "bobby"

#使用feature
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    get_feature = asynico_ensure_feature(get_html('https://blgo.ronething.cn'))
    loop.run_until_complete(task)
    print(get_feature.result())

#使用task
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create(get_html('https://blgo.ronething.cn'))
    loop.run_until_complete(task)
    print(task.result())

Task 和 Featue是asyncio两个比较重要的对象,在使用featue的时候,在ensure_feature函数内部会获取一次loop,并将协程注册到loop中,一个线程只有一个loop。在使用task的时候,create_task将任务注册进去。

Task指定callback

Task 可以指定callback,让我们在协程执行完之后再做其它的处理动作



import asyncio
import time


async def get_html(url):
    print('start get url')
    await asyncio.sleep(2)  # 不可以使用同步阻塞的方法 time.sleep(2) 来操作
    return "bobby"

def callback(feature):
  #feature 即task 被传入到这个callback中
  #task 是feature的子类
  print("finish")

#使用task
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html('https://blgo.ronething.cn'))
    task.add_done_callback(callback)
    loop.run_until_complete(task)
    print(task.result())


#add_done_callback只能够接收函数参数,为了给callback添加参数。使用partial
#新加入的参数得加到前面
def callback(url,feature):
  #feature 即task 被传入到这个callback中
  print(url)
  print("finish")

#使用task
if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html('https://blgo.ronething.cn'))
    task.add_done_callback(partial(callback),'https://blgo.ronething.cn')
    loop.run_until_complete(task)
    print(task.result())

线层池和asycio 结合

当我们有驱动是阻塞的驱动的时候,只能将阻塞io的函数放到其它线程中使用


# -*- coding:utf-8 _*-  
""" 
@author: ronething 
@time: 2019-04-02 16:38 
@mail: axingfly@gmail.com
Less is more.
"""

import asyncio

from concurrent.futures import ThreadPoolExecutor

import socket
from urllib.parse import urlparse


# 将阻塞的方法放置于线程池中执行 与 asyncio 结合使用


def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    print(host, path)
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # client.setblocking(False)
    client.connect((host, 80))  # 阻塞不会消耗cpu

    # 不停的询问连接是否建立好, 需要while循环不停的去检查状态
    # 做计算任务或者再次发起其他的连接请求

    client.send(
        "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(
            path, host).encode("utf8"))

    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break

    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()
    
    
if __name__ == '__main__':
    import time

    start_time = time.time()
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    tasks = []
    for url in range(20):
        url = 'http://shop.projectsedu.com/goods/{}/'.format(url)
        task = loop.run_in_executor(executor, get_url, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print('use time:{}'.format(time.time() - start_time))