Python asyncio 异步 IO

异步 IO

参考链接

我们使用的是 Python 3 内置的模块 asyncio


概念说明

  • event_loop 事件循环:程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数

  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

    例如:

    # 这样就是注册了一个协程
    async def test1():
       pass
       return 'ok'

    coroutine = test1()
  • task 任务:一个协程对象就是一个原生可以挂起的函数,task 则是对协程进一步封装,其中包含了任务的各种状态

  • future: 代表将来执行或没有执行的任务的结果。它和task上没有本质上的区别

  • async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。


定义一个简单的协程

import asyncio

# 这里定义了一个协程函数
async def test():
   pass
return 'ok'

# 这里是提前写好的回调函数
def test(future):
   print(future)


# 这里是返回了一个协程对象
coroutine = test()

# 把协程对象封装成 task
task = asyncio.ensure_future(coroutine)

# 创建消息循环
loop = asyncio.get_event_loop()

# 设置回调函数,我们通过回调函数,当任务执行完毕,就会把结果传到回调函数中
task.add_done_callback(test)

# 把 task 加入到消息队列中去运行,这一步一定要到最后
# 这是一个阻塞式的操作
loop.run_until_complete(task)

 

await 关键字

await就类似于 yield from

使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权。协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行 。

这里我们用内置的 asyncio.sleep(1) ,模拟网络请求

import asyncio


async def test():
   # 这里就是使用 await 关键字,遇到这个,事件循环就会挂起该协程,然后执行其他协程。
   # 直到其他的协程也挂起或者执行完毕,才继续执行下面的代码,异步执行的,就是说总时间不是叠加,而是算最长的
   await asyncio.sleep(1)
   return 'ok'


def a(future):
   print(future.result())


coroutine = test()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
task.add_done_callback(a)
loop.run_until_complete(task)

并发操作(多个协程)

只有加入到 loop 里面的 task 才可以实现并发,用 create_task (ensure_future) 或者 gather

import asyncio
import time


async def do_something():
   tasks = [asyncio.sleep(1) for _ in range(10)]
   await asyncio.gather(*tasks)


async def main():
   start = time.time()
   tasks = [do_something() for _ in range(2)]
   await asyncio.gather(*tasks)
   end = time.time()
   print(end - start)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

并发操作 loop.ensure_future() 重要

import asyncio
import time


async def do_something():
   await asyncio.sleep(1)


async def main():
   start = time.time()
   
   # 使用 create_task 操作就相当于把协程加入到最外层的 loop 里面。
   tass1 = loop.ensure_future(do_something())
   tass2 = loop.ensure_future(do_something())

   await tass1
   await tass2

   end = time.time()
   print(end - start)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

 

run_until_complete 和 run_forever

我们一直通过 run_until_complete 来运行 loop ,等到 future 完成,run_until_complete 也就返回了。

import asyncio


async def do_some(n):
   print('等待1秒')
   await asyncio.sleep(n)
   print('完成')
   return 'ok'


loop = asyncio.get_event_loop()

coroutine = do_some(1)

task = asyncio.ensure_future(coroutine)

# 等所有的 task 都执行完了,这个函数就会结束,程序退出。
res = loop.run_until_complete(task)
print(res.result())

现在改用 run_forever

import asyncio

async def do_some(n):
   print('等待1秒')
   await asyncio.sleep(1)
   print('完成')


coro = do_some(1)
asyncio.ensure_future(coro)
loop = asyncio.get_event_loop()
# 一秒后结果输出,但是,线程一直被阻塞,程序没有退出。
loop.run_forever()

使用run_forever(),future 结束,但是程序并不会退出。run_forever会一直运行,知道 loop.stop()被调用,但是你不能像下面这样调用

loop.run_forever()
loop.stop()

run_forever()不返回,stop永远也不会被调用,所以我们只能在协程中调用 stop,如下

async def do_some(loop):
   print('Waiting 3 second')
   await asyncio.sleep(3)
   print('Done')
   loop.stop()

这样并非没有问题,假设有多个协程在 loop 里面运行

import asyncio

async def do_some(loop, n):
   print('等待{}秒'.format(n))
   await asyncio.sleep(n)
   print('{} 完成'.format(n))
   
   # 第二个协程还没有结束,loop 就停止了,被先完成的那个协程给 stop
   loop.stop()


loop = asyncio.get_event_loop()
coro1 = do_some(loop, 1)
coro2 = do_some(loop, 2)
asyncio.ensure_future(coro1)
asyncio.ensure_future(coro2)

loop.run_forever()

要解决这个问题,那我们就要使用 gather把多个协程合并为一个 future ,并添加回调。然后在回调中停止。

 

asyncio.gather 和 asyncio.ensure_future 可以把新的任务加入到事件循环当中

import asyncio
import functools


async def do_some(n):
   print('等待{}秒'.format(n))
   await asyncio.sleep(n)
   print('{} 完成'.format(n))

# 这个 future 是我们执行后的结果集
def done_callback(loop, future):
   loop.stop()
   pass


loop = asyncio.get_event_loop()
futures = asyncio.gather(do_some(1), do_some(3))
# 这里式使用了偏函数,当然我们也可以使用闭包
futures.add_done_callback(functools.partial(done_callback, loop))
loop.run_forever()

--------------------------------------------
# 使用闭包

def done_callback(loop):
   def inner(future):
       loop.stop()
   return inner

loop = asyncio.get_event_loop()
futures = asyncio.gather(do_some(1), do_some(3))
futures.add_done_callback(done_callback(loop))
loop.run_forever()
---------------------------------------------

这样就可以啦,其实我们这里就是手动的实现了 run_until_complete,而且 run_until_complete内部也是调用了run_forever


 

Close Loop ?

以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢? 简单来说,loop 只要不关闭,就还可以再运行。:

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

但是如果关闭了,就不能再运行了:

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
# 此处异常
loop.run_until_complete(do_some_work(loop, 3))  
建议调用 loop.close,以彻底清理 loop 对象防止误用。

 


 

生产者消费者

未完待续

import asyncio
from asyncio import Queue


async def producer(queue):
   print('开始生成数据')
   for i in range(100):
       await queue.put(i)
       print('已经生产数据 {}'.format(i))

       if queue.qsize() >= 10:
           await queue.join()


async def comsumer(queue):
   print('开始消费数据')
   while True:
       item = await queue.get()
       print('消费了数据 {}'.format(item))
       queue.task_done()


loop = asyncio.get_event_loop()
queue = Queue()
loop.run_until_complete(asyncio.gather(producer(queue), comsumer(queue)))

 

协程 API

loop = asyncio.get_event_loop() # 获取事件循环

loop = run_until_complete() # 开启循环

asyncio.ensure_future() # 把协程加入到循环,直到调用 loop.run_forever() 才会执行

asyncio.gather() # 把更多的协程加入到循环

asyncio.Semaphore(5) # 限制同时运行的协程数量

 

Semaphore 例子

 

import asyncio
import aiohttp


# Semaphore 只要放在协程里面就可以,用 async with 修饰,windows 限制了 Semap 的数量,Linux 没有限制。

class AsyncRequest(object):
   sem = asyncio.Semaphore(500)
   @classmethod
   async def request(cls, url=None, method='GET', cookies=None, params=None, data=None, headers=None, encode='utf8'):
       async with cls.sem:
           async with aiohttp.ClientSession(cookies=cookies) as session:
               if method == 'GET':
                   async with session.get(url, params=params, headers=headers) as response:
                       return {
                           'text': await response.text(encoding=encode),
                           'url': response.url,
                           'headers': response.headers,
                           'read': await response.read(),
                           'status_code': response.status,
                           'cookies': response.cookies
                      }
               elif method == 'POST':
                   async with session.post(url, data=data, headers=headers) as response:
                       return {
                           'text': await response.text(encoding=encode),
                           'url': response.url,
                           'headers': response.headers,
                           'read': await response.read(),
                           'status_code': response.status,
                           'cookies': response.cookies
                      }
最新评论
暂无评论
登录后评论