Python3.7 asyncio 补充
asyncio Python3.7 版本
import asyncio


async def main():
   await asyncio.sleep(2)
   print('hello')

# 可以直接用 asyncio.run() 不需要 loop
asyncio.run(main())
和3.6版本一样,可以直接用 asyncio.create_task(core)
import time
import asyncio
async def do_something():
   await asyncio.sleep(1)
   
async def main():
   
   # 这样的话,task1和task2会加入到最外层的loop中,实现并发。
   # 这里不能 await asyncio.create_task(do_something())
   task1 = asyncio.create_task(do_something())
   task2 = asyncio.create_task(do_something())
   
   await task1
   await task2
   
asyncio.run(main())
使用 asyncio.gather() 会自动创建 task ,会把协程放到最外层的loop运行,实现并发
import asyncio
import time


async def do_something():
   tasks = [asyncio.sleep(1) for _ in range(10)]
   await asyncio.gather(*tasks)


async def main():
   start = time.time()
   tasks = [do_something() for _ in range(2)]
   await asyncio.gather(*tasks)
   end = time.time()
   print(end - start)


asyncio.run(main())

 

同步和异步结合 run_in_executor

通过 run_in_executor 可以把耗时的同步操作使用新线程或者进程执行,不影响协程。

import asyncio
import time
import concurrent.futures
import multiprocessing
import threading


async def do_something():
   await asyncio.sleep(1)
   print('do_something')
   
   # 需要获取当前的loop
   loop = asyncio.get_running_loop()
   # 使用传入一个 execute ,
   # 或者直接这样 await loop.run_in_executor(None, test) 不要 with,这样默认就是线程。
   with concurrent.futures.ThreadPoolExecutor() as ex:
       await loop.run_in_executor(ex, test)


def test():
   print('开始睡眠')
   print(threading.current_thread())
   time.sleep(3)
   print('睡眠结束')


async def main():
   start = time.time()
   await asyncio.gather(do_something(), do_something())
   end = time.time()
   print(end - start)


if __name__ == "__main__":
   asyncio.run(main())
单独的线程运行 loop,实现并发

一共有2个线程,一个主线程,一个子线程,主线程和子线程中都有一个事件循环 loop

import asyncio
import threading


def run_event_loop(my_loop):
   asyncio.set_event_loop(my_loop)
   my_loop.run_forever()


async def main():
   # 这里就把协程,放入指定的loop中运行,这个loop在另一个线程中执行。这个方法是非阻塞的,可以实现并发.
   future1 = asyncio.run_coroutine_threadsafe(work(1), loop)
   future2 = asyncio.run_coroutine_threadsafe(work(2), loop)

   print(future1.result())
   print(future2.result())



async def work():
   print('hello')
   await asyncio.sleep(2)
   print('hi')
   return 1


if __name__ == "__main__":
   # 建议使用 new_event_loop 而不是, get_event_loop
   loop = asyncio.new_event_loop()
   loop_thread = threading.Thread(target=run_event_loop, args=(loop,))
   loop_thread.start()

   # 这里内部也开了一个loop
   asyncio.run(main())
封装 aiohttp
import aiohttp
import json
import asyncio


class BaseRequest(object):

   def __init__(self):
       pass

   async def get(self, url, cookies=None, headers=None, **kwargs):
       async with aiohttp.request('GET', url=url, cookies=cookies, headers=headers, **kwargs) as res:
           content = await res.content.read()
           status = res.status
           response = BaseResponse(content, status)
           return response

   async def post(self, url, cookies=None, headers=None, data=None, **kwargs):
       async with aiohttp.request('POST', url=url, cookies=cookies, headers=headers, data=data, **kwargs) as res:
           content = await res.content.read()
           status = res.status
           response = BaseResponse(content, status)
           return response


class BaseResponse(object):

   def __init__(self, content, status):
       self.content = content
       self.status = status

   async def text(self, encoding=None):
       return self.content.decode(encoding=encoding)

   async def json(self, encoding=None):
       return json.loads(self.content.decode(encoding=encoding))

   def __repr__(self):
       return "[{} ({})]".format(str(self.__class__.__name__), self.status)


class Requests(BaseRequest):
   pass


async def main():
   requests = Requests()
   response = await requests.get('http://www.baidu.com')
   print(await response.text(encoding='utf8'))


if __name__ == "__main__":
   asyncio.run(main())

 

最新评论
暂无评论
登录后评论